标签: kafka-producer-api

在获取主题元数据时,Kafka消费者"未能找到领导者"

当我尝试使用Kafka生产者和消费者(0.9.0)脚本来推送/拉取主题中的消息时,我得到以下错误.

制片人错误

[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
Run Code Online (Sandbox Code Playgroud)

消费者错误

> [2016-01-13 02:47:18,620] WARN
> [console-consumer-90116_f89a0b380f19-1452653212738-9f857257-leader-finder-thread],
> Failed to find leader for Set([test,0])
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker
> [ArrayBuffer(BrokerEndPoint(0,192.168.99.100,9092))] failed   at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)    at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)    at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.EOFException   at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>   at
> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper

10
推荐指数
1
解决办法
1万
查看次数

通过不同的线程使用Kafka Producer

我有基于java的Web应用程序的kafka生产者将消息推送到Kafka.根据文档,我可以看到kafka生产者是线程安全的.这是否意味着我可以拥有Kafka生产者的单个实例,并通过不同的线程(Web请求)使用它们,每个都将在我的情况下打开和关闭生产者.这会产生任何问题吗?或者最好根据请求启动生产者?

apache-kafka kafka-producer-api

10
推荐指数
1
解决办法
7715
查看次数

Kafka抛出java.nio.channels.ClosedChannelException

当我尝试使用kafka控制台工具(V 0.9.0.1,我认为这使用旧的消费者API)来消费来自ec2中托管的kafka服务器的消息时,我得到以下异常.我怎么能克服这个?

#./ kafka-console-consumer.sh --zookeeper zookeeper1.xx.com:2181 - 主题MY_TOPIC - 从头开始

[2016-04-06 14:34:58,219] WARN Fetching topic metadata with correlation id 0 for topics [Set(MY_TOPIC)] from broker [BrokerEndPoint(1014,kafka3.xx.com,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-04-06 14:34:58,222] WARN Fetching topic metadata with correlation id 0 for topics [Set(MY_TOPIC)] from broker [BrokerEndPoint(1013,kafka22.xx.com,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api

10
推荐指数
1
解决办法
1万
查看次数

NoBrokersAvailable:NoBrokersAvailable-Kafka 错误

我已经开始学习卡夫卡了。尝试对其进行基本操作。我一直坚持关于“经纪人”的观点。

我的 kafka 正在运行,但是当我想创建一个分区时。

 from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
 consumer.assign([TopicPartition('foobar', 2)])
 msg = next(consumer)
Run Code Online (Sandbox Code Playgroud)

回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”中,第 284 行,在init self._client = KafkaClient(metrics=self._metrics, **self.config) 文件 "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in init self.config['api_version '] = self.check_version(timeout=check_timeout) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 791 行,在 check_version 中引发 Errors.NoBrokersAvailable() kafka.errors。 NoBrokersAvailable:NoBrokersAvailable

python apache-kafka kafka-consumer-api kafka-python kafka-producer-api

10
推荐指数
4
解决办法
3万
查看次数

Kafka获取错误在引导服务器中给出了无可解析的引导程序URL

我很确定bootstrap.servers是正确的.Maven中有什么冲突或Kafka有什么问题吗?

在此之前它成功地运作了.我添加了一些Maven或Spark然后出了点问题..

谁能知道如何解决它?

这是java中的kafka代码

Properties props = new Properties();
        props.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
        props.put("metadata.broker.list", "x.xx.xxx.xxx:9091, x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093");

        props.put("producer.type", "async");
        props.put("batch.size", "500");
        props.put("compression.codec", "1");
        props.put("compression.topic", topicName);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(
                    props);
Run Code Online (Sandbox Code Playgroud)

获取错误在引导服务器中没有给出可解析的引导程序URL,

[err]   at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
    [err]   at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
    [err]   at com.wra.controller.ParserController.GetResumeUpload(ParserController.java:98)
    [err]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    [err]   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
    [err]   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
    [err]   at java.lang.reflect.Method.invoke(Method.java:508)
    [err]   at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    [err]   at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    [err]   at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
    [err]   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
    [err]   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
    [err]   at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    [err]   at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
    [err] …
Run Code Online (Sandbox Code Playgroud)

java maven apache-kafka kafka-producer-api

10
推荐指数
3
解决办法
2万
查看次数

发送到 kafka 主题时序列化消息时出错

我需要测试包含标题的消息,所以我需要使用 MessageBuilder,但我无法序列化。

我尝试在生产者道具上添加序列化设置,但没有奏效。

有人能帮我吗?

这个错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Run Code Online (Sandbox Code Playgroud)

我的测试班:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api spring-cloud-stream spring-kafka

10
推荐指数
1
解决办法
3万
查看次数

有没有办法为 kafka 生产者发送的消息设置延迟?

或者甚至是一种延迟消费者收到消息的方法。我需要每 90 秒后在 nodejs 中进行一次函数调用,所以我想为每个 kafka 消息添加 90 秒的延迟

apache-kafka kafka-consumer-api kafka-producer-api

10
推荐指数
1
解决办法
2777
查看次数

KafkaTimeoutError:60.0 秒后无法更新元数据

我有一个高吞吐量 kafka 生产者的用例,我想每秒推送数千条 json 消息。

我有一个 3 节点 kafka 集群,我正在使用最新的 kafka-python 库,并有以下方法来生成消息

def publish_to_kafka(topic):
    data = get_data(topic)
    producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
    try:
        for obj in data:
           producer.send(topic, value=obj)
    except Exception as e:
            logger.error(e)
    finally:
        producer.close()
Run Code Online (Sandbox Code Playgroud)

我的主题有 3 个分区。

方法有时可以正常工作,但会失败并出现错误“KafkaTimeoutError:无法在 60.0 秒后更新元数据。”

我需要更改哪些设置才能使其顺利工作?

python apache-kafka kafka-python kafka-producer-api

10
推荐指数
1
解决办法
3万
查看次数

Kafka Consumer的poll()方法被阻止

我是Kafka 0.9的新手并测试了一些功能,我在Java实现的Consumer(KafkaConsumer)中发现了一个奇怪的行为.

Kafka经纪人位于Ambari外部机器中.

即使你我可以实现一个Producer并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(民意调查)时,它会被卡住.

我知道生产者工作得很好,因为我可以通过控制台消费者(在ambari本地工作)消费消息.但是当我执行Java Consumer时,什么都没发生,只是卡住了.调试代码我可以看到它在该poll()行被阻止:

    ConsumerRecords<String, String> records = consumer.poll(100);
Run Code Online (Sandbox Code Playgroud)

顺便说一句,超时没有任何作用.如果你输入0,100或1000毫秒无关紧要,消费者在这一行被阻止并且不会超时也不会抛出异常.

我尝试了所有类型的替代属性,例如advertised.host.name,advertised.listener,...等等,运气不好.

任何帮助将受到高度赞赏.提前致谢!

polling apache-kafka kafka-consumer-api kafka-producer-api

9
推荐指数
1
解决办法
5069
查看次数

在Windows中启动Confluent Schema Registry

我有windows环境和我自己的一套kafka和zookeeper运行.为了使用自定义对象,我开始使用Avro.但我需要启动注册表.下载了Confluent平台并运行了这个:

$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
/c/Confluent/confluent-3.0.0-2.11/confluent-3.0.0/bin/schema-registry-run-class: line 103: C:\Program: No such file or directory
Run Code Online (Sandbox Code Playgroud)

然后我在安装页面上看到了这个:

"Confluent目前不支持Windows.Windows用户可以下载并使用zip和tar档案,但必须直接运行jar文件,而不是使用bin /目录中的包装脚本."

我想知道如何在Windows环境中启动融合模式注册表?

查看脚本的内容,很难解读.

谢谢

avro apache-kafka confluence-rest-api kafka-producer-api confluent

9
推荐指数
2
解决办法
5951
查看次数