我想知道是否有办法限制cassandra java驱动程序同时执行的查询数量?
目前,我执行了很多查询,如下所示:
...
PreparedStatement stmt = session.prepare("SELECT * FROM users WHERE id = ?");
BoundStatement boundStatement = new BoundStatement(stmt);
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(list.length);
for(String id : list ) {
futures.add(session.executeAsync(boundStatement.bind(id)));
}
for (ListenableFuture<ResultSet> future : futures) {
ResultSet rs = future.get();
... // do some stuff
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,这可能导致NoHostAvailableException.
谢谢.
我试图以spark本地模式运行我的应用程序.为了全部设置,我按照本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(法语)显示构建本地kafka/ zookeeper环境的每个步骤.
而且,我使用IntelliJ以下配置:
val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]")
Run Code Online (Sandbox Code Playgroud)
我的运行配置,为消费者:
"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1"
Run Code Online (Sandbox Code Playgroud)
而对于制片人:
"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300
Run Code Online (Sandbox Code Playgroud)
在此之前,我检查如果消费者从默认的生产者收到的意见console-producer和console-consumer的kafka_2.10-0.9.0.1; 它确实.
但是,我面临以下错误:
java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921)
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348)
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) …Run Code Online (Sandbox Code Playgroud) producer-consumer apache-kafka apache-spark apache-zookeeper
我试图找出从 Pulsar 主题中删除所有消息(无论是逻辑上还是物理上)的最佳方法是什么,以便它们不再可以通过订阅使用?
我知道我们可以简单地做到$ pulsar-admin persistent delete persistent://tenant/namespace/topic。
但是,这个解决方案有一些缺点:它完全删除了主题(因此我们必须稍后重新创建它),然后不应该有活动的客户端连接到它(即:订阅或生产者)。
或者,是否有一种方法可以以编程方式使两个 MessageId 之间的所有消息对订阅不可用?
谢谢