我有一个用例,我从队列中消耗某些日志并使用该日志中的一些信息命中某些第三方API,以防第三方系统没有正确响应我希望为该特定日志实现重试逻辑.
我可以添加一个时间字段并将消息重新发送到同一队列,如果其时间字段有效(即小于当前时间),则此消息将再次消耗,如果不是,则再次将其推送到队列中.
但是这个逻辑会一次又一次地添加相同的日志,直到重试时间正确并且队列将不必要地增长.
是否有更好的方法在Kafka中实现重试逻辑?
我在我的Windows PC上构建了一个小测试环境,并记下以下代码来测试kafka(使用org.apache.kafka中的kafka_2.10:0.9.0.1).
package iii.functiontesting;
import java.text.ParseException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Hello world!
*
*/
public class test4
{
public static void main( String[] args ) throws ParseException
{
Properties producerProps=new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("request.required.acks","1");
KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps);
ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1");
kafkawriter.send(msg);
}
}
Run Code Online (Sandbox Code Playgroud)
我使用以下命令检查消息是否正确写入队列
D:\ Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\ kafka-console-consumer.bat --zookeeper localhost:2181 --topic TEST3 - from-beginning
但是,我发现kafka-console-consumer没有显示任何内容.
我怀疑我的kafka服务器运行不正常,所以我使用console-producer进行测试.
D:\ Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\ kafka-console-producer.bat --broker-list localhost:9092 --topic TEST3
AAAAA
这次我可以看到aaaaa清楚地显示在控制台消费者之下.我无法弄清楚会发生什么.谁能帮我?
我安装了新鲜的zookeeper和kafka.我开始他们两个.然后,当我想使用此命令查看主题列表时:
bin/kafka-topics.sh --list --zookeeper localhost 2181
Run Code Online (Sandbox Code Playgroud)
它让我的套接字连接关闭.这是屏幕截图:
darpanshah@darpan-ubuntu:/opt/Kafka$ bin/kafka-topics.sh --list --zookeeper localhost 2181
2016-08-17 10:44:44,053] INFO Accepted socket connection from /127.0.0.1:48452 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-08-17 10:44:44,059] INFO Client attempting to establish new session at /127.0.0.1:48452 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-08-17 10:44:44,069] INFO Established session 0x15698f5ac360001 with negotiated timeout 30000 for client /127.0.0.1:48452 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-08-17 10:44:44,095] INFO Processed session termination for sessionid: 0x15698f5ac360001 (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-08-17 10:44:44,105] INFO Closed socket connection for client /127.0.0.1:48452 which had sessionid 0x15698f5ac360001 (org.apache.zookeeper.server.NIOServerCnxn)
darpanshah@darpan-ubuntu:/opt/Kafka$
Run Code Online (Sandbox Code Playgroud)
提前致谢.被困.
apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper
我知道无法在Kafka中订购多个分区,并且只能为组内的单个使用者(对于单个分区)保证分区排序.然而,使用Kafka Streams 0.10现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都维护订单,那么在消费者方面,让我们说Kafka Streams 0.10现在可以吗?假设我们收到所有消息,我们可能不会根据消耗的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供消费?
目前我需要维护订购,但这意味着拥有一个带有单个消费者线程的分区.我想将其更改为多个分区以增加并行性,但不知何故"让它们按顺序排列".
有什么想法吗?谢谢.
apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
从confluent-kafka-python repo 中的AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
Run Code Online (Sandbox Code Playgroud)
看来这些文件ValueSchema.avsc和KeySchema.avsc是独立于 Avro Schema Registry 加载的。
这是正确的吗?引用 Avro 架构注册表的 URL,然后从磁盘加载键/值的架构有什么意义?
请说清楚。
如何在kafka中发送同步消息?
实现它的一种方法是设置属性参数
max.in.flight.requests.per.connection = 1。
但我想知道在 kafka 中是否有一种甚至直接或替代的方式发送同步消息。(类似于 producer.syncSend(...) 等)。
我们在 2 个服务器上以分布式模式运行 kafka。我正在通过 Java sdk 向 Kafka 发送消息到具有复制因子 2 和 1 分区的队列。
我们以异步模式运行。我在 Kafka 日志中没有发现任何异常。任何人都可以帮助找出可能的原因吗?
Properties props = new Properties();
props.put("bootstrap.servers", serverAdress);
props.put("acks", "all");
props.put("retries", "1");
props.put("linger.ms",0);
props.put("buffer.memory",10240000);
props.put("max.request.size", 1024000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, Object> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)
异常跟踪:
-2017-08-15T02:36:29,148 [kafka-producer-network-thread | producer-1] WARN producer.internals.Sender - 在主题分区 BPA_BinLogQ-0 上得到相关 ID 为 353736 的错误生成响应,正在重试(剩余 0 次尝试)。错误:NETWORK_EXCEPTION
因此,我正在尝试使用Kafka流进行交互式查询.我有Zookeeper和Kafka在本地运行(在Windows上).我使用C:\ temp作为存储文件夹,对于Zookeeper和Kafka.
我已经设置了这样的主题
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic
Run Code Online (Sandbox Code Playgroud)
阅读我已经完成了这个问题
我已阅读此文档页面:http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
我还在这里阅读了Java示例:https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic /KafkaMusicExample.java
并且还阅读了这篇类似的帖子,它最初听起来像我一样的问题:无法从StateStore的其他应用程序访问KTable
这就是我的设置.那么问题是什么?
所以我说我正在尝试创建自己的应用程序,它允许使用自定义Akka Http REST Api(推荐的RPC调用)进行交互式查询,以允许我查询我的KTable.实际的流处理似乎正在按预期发生,我能够打印出结果,KTable并且它们与主题产生的内容相匹配.
所以存储方面的东西似乎正在发挥作用
尝试使用该Streams.allMetadata()方法时,似乎会出现问题,它返回一个空列表.
我在用
制片人代码
这是我的制作人的代码
package Processing.Ratings {
import java.util.concurrent.TimeUnit
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import scala.util.Random
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.Serdes …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams
我有一个有 10 个分区的主题,1 个消费者组有 4 个消费者,工作人员大小为 3。
我可以看到分区中的消息分布不均匀,一个分区有很多数据,另一个是免费的。
如何让我的生产者将负载平均分配到所有分区,以便所有分区都得到正确利用?
这就是我产生消息的方式:
String json = gson.toJson(msg);
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaProducerConfig.getTopic(), json);
long startTime = System.currentTimeMillis();
try {
RecordMetadata meta = producer.send(record).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)
我有15这个主题的分区,我在制作时没有提到分区键,默认分配的分区是什么?