我们有10个Cassandra节点在运行Cassandra-2.1.8.我们最近升级到2.1.8版本.以前我们只使用3个运行Cassandra-2.1.2的节点.首先,我们将最初的3个节点从2.1.2升级到2.1.8(遵循升级Cassandra中描述的过程).然后我们在集群中添加了7个运行Cassandra-2.1.8的节点.然后我们启动了客户端程序.前几个小时一切正常,但几个小时后,我们在客户端程序日志中看到了一些错误
Thread-0 [29/07/15 17:41:23.356] ERROR com.cleartrail.entityprofiling.engine.InterpretationWriter - Error:com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [/172.50.33.161:9041, /172.50.33.162:9041, /172.50.33.95:9041, /172.50.33.96:9041, /172.50.33.165:9041, /172.50.33.166:9041, /172.50.33.163:9041, /172.50.33.164:9041, /172.50.33.42:9041, /172.50.33.167:9041] - use getErrors() for details)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:65)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:259)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:175)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
at com.cleartrail.entityprofiling.engine.InterpretationWriter.WriteInterpretation(InterpretationWriter.java:430)
at com.cleartrail.entityprofiling.engine.Profiler.buildProfile(Profiler.java:1042)
at com.cleartrail.messageconsumer.consumer.KafkaConsumer.run(KafkaConsumer.java:336)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [/172.50.33.161:9041, /172.50.33.162:9041, /172.50.33.95:9041, /172.50.33.96:9041, /172.50.33.165:9041, /172.50.33.166:9041, /172.50.33.163:9041, /172.50.33.164:9041, /172.50.33.42:9041, /172.50.33.167:9041] - use getErrors() for details)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:102)
at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:176)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Spark的直接方法(无接收器)用于Kafka,我有以下Kafka配置图:
configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");
Run Code Online (Sandbox Code Playgroud)
现在我的目标是,如果我的Spark管道崩溃并再次启动,则应该从使用者组提交的最新偏移量开始流.因此,为此目的,我想为消费者指定起始偏移量.我有关于每个分区中提交的偏移量的信息.我如何将此信息提供给流功能.目前我正在使用
JavaPairInputDStream<byte[], byte[]> kafkaData =
KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,configMap,topic);
Run Code Online (Sandbox Code Playgroud) 我在 QA 环境中的多节点集群中有一个键空间。我想将该键空间复制到我的本地单节点集群。有没有直接的方法可以做到这一点?我现在无法编写一些像 SSTableLoader 实现这样的代码。请建议最快的方法。
我想从时间戳修剪时间(这是数据框中的一列)并仅获取小时值并存储在数据框的新列中.请帮忙