我有一个连续运行的Spark Streaming作业.我如何优雅地停止工作?我已经阅读了在作业监视中附加关闭钩子并将SIGTERM发送到作业的通常建议.
sys.ShutdownHookThread {
logger.info("Gracefully stopping Application...")
ssc.stop(stopSparkContext = true, stopGracefully = true)
logger.info("Application stopped gracefully")
}
Run Code Online (Sandbox Code Playgroud)
它似乎工作,但看起来不是最简单的方法来阻止这项工作.我在这里错过了什么吗?
从代码的角度来看,它可能有意义,但您如何在群集环境中使用它?如果我们启动一个火花流工作(我们在集群中的所有节点上分配作业),我们将不得不跟踪作业的PID和运行它的节点.最后,当我们必须停止进程时,我们需要跟踪作业运行的节点以及该进程的PID.我只是希望流媒体作业有一种更简单的工作控制方式.
我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量.但是我无法让它发挥作用.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Run Code Online (Sandbox Code Playgroud)
但我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
.... …Run Code Online (Sandbox Code Playgroud) 如何将元组列表转换为列表元List[(A,B)]组(List[A], List[B])?
我试过跟随,但看起来粗糙,我希望有更好的方法来做到这一点
val flat: List[AnyRef] = aAndB.map{ x =>
x.map(y => List(y._1, y._2))
}.flatMap(x => x)
val typeA: List[A] = flat.filter {
case x: A => true
case _ => false
}.map(_.asInstanceOf[A])
val typeB: List[B] = flat.filter {
case x: B => true
case _ => false
}.map(_.asInstanceOf[B])
Run Code Online (Sandbox Code Playgroud) Spring Data Cassandra是否支持同一应用程序上下文中的多个键空间存储库?我正在使用以下JavaConfig类设置cassandra spring数据配置
@Configuration
@EnableCassandraRepositories(basePackages = "com.blah.repository")
public class CassandraConfig extends AbstractCassandraConfiguration {
@Override
public String getKeyspaceName() {
return "keyspace1";
}
Run Code Online (Sandbox Code Playgroud)
在将存储库类移动到另一个包之后,我尝试创建第二个配置类.
@Configuration
@EnableCassandraRepositories(basePackages = "com.blah.secondrepository")
public class SecondCassandraConfig extends AbstractCassandraConfiguration {
@Override
public String getKeyspaceName() {
return "keyspace2";
}
Run Code Online (Sandbox Code Playgroud)
但是,在这种情况下,第一个集合,如果存储库失败,因为在键空间中找不到实体的已配置列族.我认为它可能正在寻找第二个键空间中的列族.
spring-data-cassandra是否支持多个密钥空间存储库?我找到多个键空间的引用的唯一地方就在这里.但它没有解释是否可以使用存储库完成此操作?
我正在为 Spring Web 应用程序编写集成测试。为了设置配置,我在集成测试类上使用以下注释。
// Super class
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SpringBootRestApplication.class)
@IntegrationTest
@WebAppConfiguration
@ActiveProfiles("test")
public abstract class AbstractIT {
// Integration test
public class SampleResourceIT extends AbstractIT {
Run Code Online (Sandbox Code Playgroud)
为了减少重复,它们被定义在每个集成测试都继承的抽象基测试类中。
我想使用组合来加载集成测试而不是继承,因为它使我的测试的继承层次结构变得复杂。我想将所有测试配置存储在一个配置类中,而不使用继承。有办法做到吗?
我已经尝试过以下方法,但它不起作用。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public abstract class AbstractIT {
@SpringApplicationConfiguration(classes = SpringBootRestApplication.class)
@IntegrationTest
@WebAppConfiguration
@ActiveProfiles("test")
@Configuration
public class TestConfig {
}
Run Code Online (Sandbox Code Playgroud) 有没有办法绕过 a java.nio.file.DirectoryNotEmptyException?我希望能够删除其中包含内容的文件夹。
以下代码-1为index 生成一个值.这是为什么?
val values = Array(1.0, 2.0, 3.0, Double.NaN, 4.0)
val index = values.indexOf(Double.NaN)
println(s"index = $index")
Run Code Online (Sandbox Code Playgroud)
在这种情况下找到NaN指数的最佳方法是什么?我有以下解决方案,但不认为这是最优雅的解决方案.
val index2 = values.zipWithIndex.find(_._1.isNaN).get._2
println(s"index2 = $index2")
Run Code Online (Sandbox Code Playgroud) 我正在尝试对来自多个Cassandra表的数据运行Spark作业,这些表被分组为作业的一部分.我试图用一个巨大的数据集13m数据点结束运行并且它已经失败了多个点.当我修复这些故障并继续前进时,我遇到了我修复的下一个问题并再次重新启动作业.有没有办法加快实际数据的测试周期,以便我可以从特定检查点重新启动/恢复以前失败的作业?
java ×3
scala ×3
apache-spark ×2
spring ×2
apache-kafka ×1
file ×1
kafka-python ×1
nio ×1
python ×1
spring-data ×1
spring-test ×1
testing ×1