我正在尝试向kafka发送一个大的CSV.基本结构是读取CSV的一行并用标题压缩它.
a = dict(zip(header, line.split(",")
Run Code Online (Sandbox Code Playgroud)
然后将其转换为json:
message = json.dumps(a)
Run Code Online (Sandbox Code Playgroud)
然后我使用kafka-python库发送消息
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
Run Code Online (Sandbox Code Playgroud)
使用PYSPARK我很容易从CSV文件中创建RDD消息
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
Run Code Online (Sandbox Code Playgroud)
现在我想发送这些消息:我定义了一个函数
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
Run Code Online (Sandbox Code Playgroud)
然后我创建一个新的RDD来发送消息
sentRDD = messageRDD.map(lambda x: kafkasend(x))
Run Code Online (Sandbox Code Playgroud)
然后我调用sentRDD.count()
哪个开始搅拌和发送消息
不幸的是,这很慢.它每秒发送1000条消息.这是一个10节点的集群,每个集群有4个cpus和8GB的内存.
相比之下,在1000万行csv上创建消息大约需要7秒.〜约2gb
我认为问题是我在函数内实例化一个kafka生成器.但是,如果我不这样做就会引发抱怨,即使我尝试在全球范围内定义它,生产者也不存在.
也许有人可以阐明如何解决这个问题.
谢谢,
如果我在Producer中将Kafka配置参数设置为:
1. retries = 3
2. max.in.flight.requests.per.connection = 5
Run Code Online (Sandbox Code Playgroud)
那么一个分区内的消息可能不在send_order中.
Kafka是否采取任何额外步骤以确保分区中的消息仅保留在已发送的顺序中或者使用上述配置,可能在分区内出现乱序消息?
尝试使用CentOS上的Python客户端本地连接到Kafka 0.10.0.0时,我遇到了一个非常奇怪的问题.
我的连接选项非常简单且默认:
kafka_consumer = kafka.KafkaConsumer(
bootstrap_servers=['localhost:9092'],
client_id="python-test-consumer"
)
Run Code Online (Sandbox Code Playgroud)
当我在Kafka的server.properties文件中手动设置监听器选项时,如:
listeners=PLAINTEXT://localhost:9092
Run Code Online (Sandbox Code Playgroud)
我得到了kafka.errors.NoBrokersAvailable,尽管我仍然可以使用curl或其他linux东西轻松连接到Kafka代理服务器.
没有advertised.listeners或其他已弃用的广告选项有助于解决问题.因此,唯一正在工作的配置状态是没有监听器的状态.这当然是不可接受的,因为我们需要以某种方式设置本地集群.
似乎这个愚蠢问题的解决方案很简单并且在想,但我们自己无法想象.
我在消费者组中轮询来自 Kafka 的消息时遇到问题。\n我的消费者对象分配给给定的分区
\n\nself.ps = TopicPartition(topic, partition )\nRun Code Online (Sandbox Code Playgroud)\n\n之后消费者分配给该分区:
\n\nself.consumer.assign([self.ps])\nRun Code Online (Sandbox Code Playgroud)\n\n之后我就可以计算分区内的消息了
\n\nself.consumer.seek_to_beginning(self.ps)\npos = self.consumer.position(self.ps)\nRun Code Online (Sandbox Code Playgroud)\n\n和self.consumer.seek_to_end(self.ps)\n......
我的主题中有超过 30000 条消息。\n问题是我只收到一条消息。
\n\n消费者配置为:\n max_poll_records= 200\nAUTO_OFFSET_RESET是最早的
这是我的功能,我试图获取消息:
\n\n def poll_messages(self):\n\n\n data = []\n\n messages = self.consumer.poll(timeout_ms=6000)\n\n\n for partition, msgs in six.iteritems(messages):\n\n for msg in msgs:\n\n data.append(msg)\n\n return data\nRun Code Online (Sandbox Code Playgroud)\n\n即使我在开始轮询消息之前转到第一个可用偏移量\n我也只收到一条消息。
\n\nself.consumer.seek(self.ps, self.get_first_offset())\nRun Code Online (Sandbox Code Playgroud)\n\n我希望有人能解释我做错了什么。\n提前致谢。
\n\n最美好的祝愿\nJ\xc3\xb6rn
\n我想为Kafka配置一些主题及其独特的保留设置。因此,当我启动Kafka时,它将使用这些设置加载server.properties文件。
我发现的唯一方法是启动Kafka,然后使用kafka-topics.sh脚本启动和配置主题。
例:
bin/kafka-topics.sh --zookeeper zk.yoursite.com --alter --topic as-access --config retention.ms=86400000
Run Code Online (Sandbox Code Playgroud)
我在docker上使用Kafka,因此,除了将入口点设置为启动Kafka的实际脚本之外,我需要创建自己的脚本来启动Kafka并运行一些shell命令来配置这些主题。另外,如果我需要自己创建这些主题,则需要启动并创建一些与已经存在的主题有关的逻辑。而且我不想走那条路。
我正在使用Python 3.6编写一个Kafka生产者,Python-kafka客户端版本是1.4.4\xe3\x80\x82Kafka版本是:2.1.0和1.1.1(尝试了两个版本),但是当我写一条消息时向生产者抛出此错误:
\n\nKafkaTimeoutError(\'Failed to update metadata after 60.0 secs.\')\nRun Code Online (Sandbox Code Playgroud)\n\n这是我的客户端代码:
\n\nproducer = KafkaProducer(\n bootstrap_servers=[\'mq-server:9092\'],\n api_version = (0,10,2,0) # solve no broker error\n)\n\nproducer.send("dolphin-test".encode(\'utf-8\'),b"test")\nRun Code Online (Sandbox Code Playgroud)\n\n这是我修改的服务器配置:
\n\nlisteners=PLAINTEXT://10.142.0.2:9092\nadvertised.listeners=PLAINTEXT://10.142.0.2:9092\nRun Code Online (Sandbox Code Playgroud)\n\n当使用脚本生成和使用消息时,它工作正常!这是客户端跟踪输出:
\n\nD:\\project\\souce\\pydolphin-service>D:/Programs/Python/Python37/python.exe d:/project/souce/pydolphin-service/dolphin/producer.py\nTraceback (most recent call last):\n File "d:/project/souce/pydolphin-service/dolphin/producer.py", line 14, in <module>\n future = producer.send(\'my-topic\', b\'raw_bytes\')\n File "D:\\Programs\\Python\\Python37\\lib\\site-packages\\kafka\\producer\\kafka.py", line 555, in send\n self._wait_on_metadata(topic, self.config[\'max_block_ms\'] / 1000.0)\n File "D:\\Programs\\Python\\Python37\\lib\\site-packages\\kafka\\producer\\kafka.py", line 682, in _wait_on_metadata\n "Failed to update metadata after %.1f secs." % (max_wait,))\nkafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 …Run Code Online (Sandbox Code Playgroud) 我有一个基于 Django 的 Web 应用程序,我试图在这个名为kafka-python的库的帮助下集成 Kafka 。但是,当我尝试向特定主题发送消息时,我收到超时错误,指出:
Traceback (most recent call last):
File "/home/paras/vertex/vertex-1.6/vertex-portal-backend/vertex_app/kafka_service.py", line 67, in send_message
x = producer.send(topic, json_data)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/producer/kafka.py", line 555, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/home/paras/.local/lib/python3.6/site-packages/kafka/producer/kafka.py", line 682, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Run Code Online (Sandbox Code Playgroud)
生产消息:
def put_order_into_kafka(order,obj) :
try :
if order is None or offering is None :
raise Exception("Unable to put order into queue …Run Code Online (Sandbox Code Playgroud) 我有一个无服务器功能,正在尝试向卡夫卡发送一些数据。有时它可以工作,有时连接会断开并且数据丢失。
原因是 kafka 库没有引发异常,而是添加错误日志。所以我无法将我的代码添加到try:except.
这是我的日志中经常遇到的错误:
<BrokerConnection node_id=11 host=... port=9092>: Error receiving network data closing socket
Traceback (most recent call last):
File "/var/task/kafka/conn.py", line 745, in _recv
data = self._sock.recv(SOCK_CHUNK_BYTES)
ConnectionResetError: [Errno 104] Connection reset by peer
Run Code Online (Sandbox Code Playgroud)
上述函数_recv定义如下:
我仍在寻找解决方案,但在 try: except 中添加代码不起作用。
def _recv(self):
responses = []
SOCK_CHUNK_BYTES = 4096
while True:
try:
data = self._sock.recv(SOCK_CHUNK_BYTES)
# We expect socket.recv to raise an exception if there is not
# enough data to read the full bytes_to_read
# …Run Code Online (Sandbox Code Playgroud) 如何删除引号并像原始格式一样发送数据原始 JSON 格式是:
{
"@timestamp": "2020-06-02T09:38:03.183186Z"
}
Run Code Online (Sandbox Code Playgroud)
此数据在另一个主题中
"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"
Run Code Online (Sandbox Code Playgroud)
这是服务器之间发送数据的代码
def parse(d):
if str(type(d)) == "<class 'dict'>":
return (r)
return -1
producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
auto_offset_reset=param["AUTO_OFFSET_RESET"],
consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
enable_auto_commit=False,
auto_commit_interval_ms=60000,
group_id=param["GROUP_ID"],
client_id=param["CLIENT_ID"]
)
consumer.subscribe([param["TOPIC_IN"]])
while True:
num_rows = 0
for msg in consumer:
num_rows = num_rows + 1
m = json.loads(msg.value)
j = parse(m)
if j != -1:
data = json.dumps(j)
producer.send(param["TOPIC_OUT"], value=data)
Run Code Online (Sandbox Code Playgroud) 我正在使用 python-kafka 来收听 kafka 主题并使用该记录。我想让它无限轮询而不退出。这是我的代码如下:
def test():
consumer = KafkaConsumer('abc', 'localhost:9092', auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
Run Code Online (Sandbox Code Playgroud)
这段代码只是读取数据,直接退出。有没有办法即使没有推送消息也可以继续收听主题?
任何持续监控该主题的相关示例对我来说也很棒。
python apache-kafka kafka-consumer-api kafka-python kafka-topic
apache-kafka ×10
kafka-python ×10
python ×6
apache-spark ×1
consumer ×1
django ×1
docker ×1
kafka-topic ×1
pyspark ×1
python-3.x ×1