小编Sak*_*ket的帖子

如何停止火花流媒体工作?

我有一个连续运行的Spark Streaming作业.我如何优雅地停止工作?我已经阅读了在作业监视中附加关闭钩子并将SIGTERM发送到作业的通常建议.

sys.ShutdownHookThread {
  logger.info("Gracefully stopping Application...")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  logger.info("Application stopped gracefully")
}
Run Code Online (Sandbox Code Playgroud)

它似乎工作,但看起来不是最简单的方法来阻止这项工作.我在这里错过了什么吗?

从代码的角度来看,它可能有意义,但您如何在群集环境中使用它?如果我们启动一个火花流工作(我们在集群中的所有节点上分配作业),我们将不得不跟踪作业的PID和运行它的节点.最后,当我们必须停止进程时,我们需要跟踪作业运行的节点以及该进程的PID.我只是希望流媒体作业有一种更简单的工作控制方式.

apache-spark spark-streaming

28
推荐指数
1
解决办法
1万
查看次数

如何获取kafka主题的分区的最新偏移量?

我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量.但是我无法让它发挥作用.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Run Code Online (Sandbox Code Playgroud)

但我得到的输出是

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
.... …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-consumer-api kafka-python

21
推荐指数
4
解决办法
3万
查看次数

Scala将元组列表转换为列表元组

如何将元组列表转换为列表元List[(A,B)](List[A], List[B])

我试过跟随,但看起来粗糙,我希望有更好的方法来做到这一点

  val flat: List[AnyRef] = aAndB.map{ x =>
    x.map(y => List(y._1, y._2))
  }.flatMap(x => x)

  val typeA: List[A] = flat.filter {
    case x: A => true
    case _ => false
  }.map(_.asInstanceOf[A])     

  val typeB: List[B] = flat.filter {
    case x: B => true
    case _ => false
  }.map(_.asInstanceOf[B])
Run Code Online (Sandbox Code Playgroud)

scala

20
推荐指数
1
解决办法
1万
查看次数

spring-data-cassandra存储库支持多个密钥空间?

Spring Data Cassandra是否支持同一应用程序上下文中的多个键空间存储库?我正在使用以下JavaConfig类设置cassandra spring数据配置

@Configuration
@EnableCassandraRepositories(basePackages = "com.blah.repository")
public class CassandraConfig extends AbstractCassandraConfiguration {

@Override
public String getKeyspaceName() {
    return "keyspace1";
}
Run Code Online (Sandbox Code Playgroud)

在将存储库类移动到另一个包之后,我尝试创建第二个配置类.

@Configuration
@EnableCassandraRepositories(basePackages = "com.blah.secondrepository")
public class SecondCassandraConfig extends AbstractCassandraConfiguration {

@Override
public String getKeyspaceName() {
    return "keyspace2";
}
Run Code Online (Sandbox Code Playgroud)

但是,在这种情况下,第一个集合,如果存储库失败,因为在键空间中找不到实体的已配置列族.我认为它可能正在寻找第二个键空间中的列族.

spring-data-cassandra是否支持多个密钥空间存储库?我找到多个键空间的引用的唯一地方就在这里.但它没有解释是否可以使用存储库完成此操作?

java spring spring-data spring-data-cassandra

8
推荐指数
2
解决办法
5176
查看次数

编写 Spring 测试配置而不是继承它?

我正在为 Spring Web 应用程序编写集成测试。为了设置配置,我在集成测试类上使用以下注释。

// Super class
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SpringBootRestApplication.class)
@IntegrationTest
@WebAppConfiguration
@ActiveProfiles("test")
public abstract class AbstractIT {

// Integration test
public class SampleResourceIT extends AbstractIT {
Run Code Online (Sandbox Code Playgroud)

为了减少重复,它们被定义在每个集成测试都继承的抽象基测试类中。

我想使用组合来加载集成测试而不是继承,因为它使我的测试的继承层次结构变得复杂。我想将所有测试配置存储在一个配置类中,而不使用继承。有办法做到吗?

我已经尝试过以下方法,但它不起作用。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public abstract class AbstractIT {

@SpringApplicationConfiguration(classes = SpringBootRestApplication.class)
@IntegrationTest
@WebAppConfiguration
@ActiveProfiles("test")
@Configuration
public class TestConfig {
}
Run Code Online (Sandbox Code Playgroud)

java testing spring spring-test

5
推荐指数
0
解决办法
1163
查看次数

如何绕过java.nio.file.DirectoryNotEmptyException?

有没有办法绕过 a java.nio.file.DirectoryNotEmptyException?我希望能够删除其中包含内容的文件夹。

java nio file

5
推荐指数
1
解决办法
3万
查看次数

Array.indexOf的奇怪结果

以下代码-1为index 生成一个值.这是为什么?

val values = Array(1.0, 2.0, 3.0, Double.NaN, 4.0)

val index = values.indexOf(Double.NaN)
println(s"index = $index")
Run Code Online (Sandbox Code Playgroud)

在这种情况下找到NaN指数的最佳方法是什么?我有以下解决方案,但不认为这是最优雅的解决方案.

val index2 = values.zipWithIndex.find(_._1.isNaN).get._2
println(s"index2 = $index2")
Run Code Online (Sandbox Code Playgroud)

scala

4
推荐指数
2
解决办法
104
查看次数

是否可以恢复失败的Apache Spark作业?

我正在尝试对来自多个Cassandra表的数据运行Spark作业,这些表被分组为作业的一部分.我试图用一个巨大的数据集13m数据点结束运行并且它已经失败了多个点.当我修复这些故障并继续前进时,我遇到了我修复的下一个问题并再次重新启动作业.有没有办法加快实际数据的测试周期,以便我可以从特定检查点重新启动/恢复以前失败的作业?

scala apache-spark

2
推荐指数
1
解决办法
1205
查看次数