我们有一个在Kafka层测试数据的项目需求。所以 JSON 文件正在进入 hadoop 区域,而 kafka 正在读取 hadoop(原始 Json 文件)中的实时数据。现在我要测试从其他系统发送的数据和kafka读取的数据是否应该相同。
我可以在 kafka 验证数据吗?kafka 是否将消息内部存储在 HDFS 上?如果是,那么它是否存储在类似于 hive 内部保存的文件结构中,就像单个表的单个文件夹一样。
我们正在使用 kafka(0.9.0.0) 来编排不同微服务之间的命令消息。我们发现了一个间歇性问题,即重复的消息被传递到特定主题。下面给出了发生此问题时发生的日志。有人可以帮助理解这个问题吗
Wed, 21-Sep-2016 09:19:07 - WARNING Coordinator unknown during heartbeat -- will retry
Wed, 21-Sep-2016 09:19:07 - WARNING Heartbeat failed; retrying
Wed, 21-Sep-2016 09:19:07 - WARNING <BrokerConnection host=AZSG-D-BOT-DEV4 port=9092> timed out after 40000 ms. Closing connection.
Wed, 21-Sep-2016 09:19:07 - ERROR Fetch to node 1 failed: RequestTimedOutError - 7 - This error is thrown if the request exceeds the user-specified time limit in the request.
Wed, 21-Sep-2016 09:19:07 - INFO Marking the coordinator dead (node 1): None.
Wed, …Run Code Online (Sandbox Code Playgroud) 我正在使用kafka-python来使用来自 kafka 队列(kafka 版本 0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息。我怎样才能做到这一点?
谢谢
我有一个使用 Kafka 1.0 作为队列的应用程序。Kafka 主题有 80 个分区和 80 个正在运行的消费者。(Kafka-python 消费者)。
通过运行命令:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Run Code Online (Sandbox Code Playgroud)
我看到其中一个分区卡在一个偏移处,并且随着新记录的添加,延迟不断增加。
上述命令的输出如下所示:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
118 mytopic 37 1924 2782 858 kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic 38 2741 2742 1 kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic 39 2713 2713 0 kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic 40 2687 2688 1 kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
Run Code Online (Sandbox Code Playgroud)
这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移也是不可取的,因为可能不会定期手动监控此服务器。
客户端在 Linux m/c 中作为并行进程在后台运行:
consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
msg …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 ssl 创建 kafka 生产者。我需要有关如何在构造函数中设置 SSL 参数的信息,kafka-python 客户端中提供的信息不够描述。
ssl_certfile, ssl_cafile,ssl_keyfile参数是什么。我不确定在哪里可以找到这些文件。
producer = KafkaProducer(bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='SSL',
api_version=(0,10),
ssl_cafile='ca-certs.pem',ssl_certfile='server.pem',
ssl_keyfile='server.pem',ssl_password='xxx')
producer.send('rk976772_topic',{"test":0})
Run Code Online (Sandbox Code Playgroud)
回溯(最近一次调用最后一次):文件“”,第 1 行,在文件“/usr/lib/python2.7/site-packages/kafka/producer/kafka.py”中,第 543 行,在发送 self._wait_on_metadata(topic , self.config['max_block_ms'] / 1000.0) 文件 "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata "在 %.1f 后无法更新元数据秒。” % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: 60.0 秒后无法更新元数据。
如何在pykafka主题的特定分区上发布消息.在下面的代码片段中,测试主题有四个分区,我打算在其中一个分区中编写每个消息,但显然它不是那样工作的.
from pykafka import KafkaClient
import logging
logging.basicConfig()
client = KafkaClient(hosts='localhost:9092')
print client.topics
topic = client.topics['test']
with topic.get_producer() as producer:
for i in range(4):
producer.produce('another test message ' + str(i ** 2), partition_key='{}'.format(0))
Run Code Online (Sandbox Code Playgroud) 我有一个正在运行并经过测试的 Kafka 集群,我正在尝试使用 Python 脚本向代理发送消息。这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起。
我正在为消费者和生产者使用 kafka-python 库。当我使用 Python3 shell 时,我可以看到使用 Kafka GUI 工具 2.0.4 的主题中出现的消息我在 python 代码中尝试了各种循环和语句,但似乎没有任何东西让它“运行”完成。
>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
Run Code Online (Sandbox Code Playgroud)
这有效并且字节出现在代理主题数据中。
当我将与上面相同的代码放在 python .py 文件中并使用 Python3 执行时,它会完成,但没有数据发送到 Kafka 代理。也没有显示错误。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Run Code Online (Sandbox Code Playgroud) 我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
Run Code Online (Sandbox Code Playgroud)
消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client。
任何想法?
我有一个生产者代码,正在向 Kafka 发送消息。直到昨天我才能发送消息。从今天开始,我无法发送消息。不确定是否是版本兼容问题。没有失败或错误消息,代码被执行,但没有发送消息。
以下是 Python 模块版本:
kafka-python==2.0.1
Python 3.8.2
下面是我的代码:
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
Run Code Online (Sandbox Code Playgroud)
我也尝试记录行为,但不知道为什么生产者被关闭:
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed
Process …Run Code Online (Sandbox Code Playgroud) 我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
Run Code Online (Sandbox Code Playgroud)
group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()
我正在尝试复制正在传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流(在 pyspark 上),这是我的架构:
1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:
"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
Run Code Online (Sandbox Code Playgroud)
生产者代码:
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(0,10000):
lg_value = str(random.uniform(5000, 10000))
lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)
producer.flush()
Run Code Online (Sandbox Code Playgroud)
生产者工作正常,我在消费者中获取流数据(甚至在 Spark 中)
2- Spark Streaming正在接收这个流,我可以甚至pprint()它
Spark流处理代码
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})
lines = kvs.map(lambda …Run Code Online (Sandbox Code Playgroud) python-2.7 apache-spark spark-streaming pyspark kafka-python
kafka-python ×11
apache-kafka ×9
python ×7
apache-spark ×1
hadoop ×1
hdfs ×1
message-bus ×1
producer ×1
pyspark ×1
python-2.7 ×1
python-3.x ×1