小编fhu*_*ois的帖子

Cassandra - 有没有办法限制异步查询的数量?

我想知道是否有办法限制cassandra java驱动程序同时执行的查询数量?

目前,我执行了很多查询,如下所示:

... 
PreparedStatement stmt = session.prepare("SELECT * FROM users WHERE id = ?");
BoundStatement boundStatement = new BoundStatement(stmt);
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(list.length);

for(String id : list ) {
     futures.add(session.executeAsync(boundStatement.bind(id)));
}

for (ListenableFuture<ResultSet> future : futures) {
ResultSet rs = future.get();
... // do some stuff
}
Run Code Online (Sandbox Code Playgroud)

不幸的是,这可能导致NoHostAvailableException.

谢谢.

java asynchronous datastax cassandra-2.0

4
推荐指数
1
解决办法
1875
查看次数

本地Kafka应用程序失败:NoSuchMethodError:createEphemeral

我试图以spark本地模式运行我的应用程序.为了全部设置,我按照本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(法语)显示构建本地kafka/ zookeeper环境的每个步骤.

而且,我使用IntelliJ以下配置:

val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]")
Run Code Online (Sandbox Code Playgroud)

我的运行配置,为消费者:

"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1"
Run Code Online (Sandbox Code Playgroud)

而对于制片人:

"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300
Run Code Online (Sandbox Code Playgroud)

在此之前,我检查如果消费者从默认的生产者收到的意见console-producerconsole-consumerkafka_2.10-0.9.0.1; 它确实.

但是,我面临以下错误:

java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921)
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348)
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) …
Run Code Online (Sandbox Code Playgroud)

producer-consumer apache-kafka apache-spark apache-zookeeper

3
推荐指数
1
解决办法
1977
查看次数

删除/使 Apache Pulsar 主题中的所有消息过期的最有效方法是什么?

我试图找出从 Pulsar 主题中删除所有消息(无论是逻辑上还是物理上)的最佳方法是什么,以便它们不再可以通过订阅使用?

我知道我们可以简单地做到$ pulsar-admin persistent delete persistent://tenant/namespace/topic

但是,这个解决方案有一些缺点:它完全删除了主题(因此我们必须稍后重新创建它),然后不应该有活动的客户端连接到它(即:订阅或生产者)。

或者,是否有一种方法可以以编程方式使两个 MessageId 之间的所有消息对订阅不可用?

谢谢

apache-pulsar

3
推荐指数
1
解决办法
4188
查看次数