标签: kafka-python

Apache Kafka 是否将消息内部存储在 HDFS 或其他一些文件系统中

我们有一个在Kafka层测试数据的项目需求。所以 JSON 文件正在进入 hadoop 区域,而 kafka 正在读取 hadoop(原始 Json 文件)中的实时数据。现在我要测试从其他系统发送的数据和kafka读取的数据是否应该相同。

我可以在 kafka 验证数据吗?kafka 是否将消息内部存储在 HDFS 上?如果是,那么它是否存储在类似于 hive 内部保存的文件结构中,就像单个表的单个文件夹一样。

hadoop hdfs apache-kafka kafka-python kafka-producer-api

2
推荐指数
1
解决办法
8114
查看次数

Kafka 传递重复消息

我们正在使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息。我们发现了一个间歇性问题,即重复的消息被传递到特定主题。下面给出了发生此问题时发生的日志。有人可以帮助理解这个问题吗

Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection.
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
3199
查看次数

消费者重新启动后,kafka-python 从最后生成的消息中读取

我正在使用kafka-python来使用来自 kafka 队列(kafka 版本 0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息。我怎样才能做到这一点?

谢谢

python apache-kafka kafka-python

2
推荐指数
2
解决办法
9736
查看次数

Kafka分区滞后增加

我有一个使用 Kafka 1.0 作为队列的应用程序。Kafka 主题有 80 个分区和 80 个正在运行的消费者。(Kafka-python 消费者)。

通过运行命令:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup  --describe 
Run Code Online (Sandbox Code Playgroud)

我看到其中一个分区卡在一个偏移处,并且随着新记录的添加,延迟不断增加。

上述命令的输出如下所示:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST

118 mytopic                       37         1924            2782            858        kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic                       38         2741            2742            1          kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic                       39         2713            2713            0          kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic                       40         2687            2688            1          kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
Run Code Online (Sandbox Code Playgroud)

这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移也是不可取的,因为可能不会定期手动监控此服务器。

客户端在 Linux m/c 中作为并行进程在后台运行:

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
                     session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
                     auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
                     value_deserializer=lambda m: json.loads(m.decode('ascii')))

for message in consumer:
    msg …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-consumer-api kafka-python

2
推荐指数
1
解决办法
1万
查看次数

如何使用 ssl 配置创建 Kafka-python 生产者

我正在尝试使用 ssl 创建 kafka 生产者。我需要有关如何在构造函数中设置 SSL 参数的信息,kafka-python 客户端中提供的信息不够描述。

ssl_certfile, ssl_cafile,ssl_keyfile参数是什么。我不确定在哪里可以找到这些文件。

producer = KafkaProducer(bootstrap_servers=kafka_broker,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  security_protocol='SSL',
  api_version=(0,10),
  ssl_cafile='ca-certs.pem',ssl_certfile='server.pem',
  ssl_keyfile='server.pem',ssl_password='xxx')
producer.send('rk976772_topic',{"test":0})
Run Code Online (Sandbox Code Playgroud)

回溯(最近一次调用最后一次):文件“”,第 1 行,在文件“/usr/lib/python2.7/site-packages/kafka/producer/kafka.py”中,第 543 行,在发送 self._wait_on_metadata(topic , self.config['max_block_ms'] / 1000.0) 文件 "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata "在 %.1f 后无法更新元数据秒。” % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: 60.0 秒后无法更新元数据。

kafka-python

2
推荐指数
2
解决办法
1万
查看次数

使用pykafka在主题的特定分区上发布

如何在pykafka主题的特定分区上发布消息.在下面的代码片段中,测试主题有四个分区,我打算在其中一个分区中编写每个消息,但显然它不是那样工作的.

from pykafka import KafkaClient

import logging
logging.basicConfig()

client = KafkaClient(hosts='localhost:9092')
print client.topics
topic = client.topics['test']
with topic.get_producer() as producer:
        for i in range(4):
                producer.produce('another test message ' + str(i ** 2), partition_key='{}'.format(0))
Run Code Online (Sandbox Code Playgroud)

python producer message-bus apache-kafka kafka-python

1
推荐指数
1
解决办法
851
查看次数

Python Producer 可以通过 shell 发送,但不能通过 .py

我有一个正在运行并经过测试的 Kafka 集群,我正在尝试使用 Python 脚本向代理发送消息。这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起。

我正在为消费者和生产者使用 kafka-python 库。当我使用 Python3 shell 时,我可以看到使用 Kafka GUI 工具 2.0.4 的主题中出现的消息我在 python 代码中尝试了各种循环和语句,但似乎没有任何东西让它“运行”完成。

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
Run Code Online (Sandbox Code Playgroud)

这有效并且字节出现在代理主题数据中。

当我将与上面相同的代码放在 python .py 文件中并使用 Python3 执行时,它会完成,但没有数据发送到 Kafka 代理。也没有显示错误。

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka kafka-python

1
推荐指数
1
解决办法
1184
查看次数

使用 kafka-python 检索主题中的消息

我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))
Run Code Online (Sandbox Code Playgroud)

消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client

任何想法?

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
9556
查看次数

无法将消息发送到 Kafka Python 中的主题

我有一个生产者代码,正在向 Kafka 发送消息。直到昨天我才能发送消息。从今天开始,我无法发送消息。不确定是否是版本兼容问题。没有失败或错误消息,代码被执行,但没有发送消息。

以下是 Python 模块版本:

kafka-python==2.0.1
Python 3.8.2

下面是我的代码:

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
Run Code Online (Sandbox Code Playgroud)

我也尝试记录行为,但不知道为什么生产者被关闭:

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed

Process …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python

1
推荐指数
1
解决办法
1万
查看次数

如果将group_id设置为None,Kafka消费者会收到消息,但如果不是None,它不会收到任何消息?

我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。

consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=enable_auto_commit,
        group_id=group_id,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

for m in consumer:
Run Code Online (Sandbox Code Playgroud)

group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。

消费者控制台确实显示以下消息:

2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[]
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
4504
查看次数

PySpark 处理流数据并将处理后的数据保存到文件

我正在尝试复制正在传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流(在 pyspark 上),这是我的架构:

1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:

"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
Run Code Online (Sandbox Code Playgroud)

生产者代码:

from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(0,10000):
    lg_value = str(random.uniform(5000, 10000))
    lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)

producer.flush()
Run Code Online (Sandbox Code Playgroud)

生产者工作正常,我在消费者中获取流数据(甚至在 Spark 中)

2- Spark Streaming正在接收这个流,我可以甚至pprint()

Spark流处理代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})

lines = kvs.map(lambda …
Run Code Online (Sandbox Code Playgroud)

python-2.7 apache-spark spark-streaming pyspark kafka-python

-1
推荐指数
1
解决办法
3129
查看次数