标签: kafka-python

Docker中的Kafka无法正常工作

我正在尝试使用wurstmeister\kafka-docker图像 docker-compose,但我在连接所有内容方面遇到了实际问题.

我检查的所有帖子或问题,似乎没有任何问题,但我坦率地失去了.(并且在SO中至少有两个问题试图解决这个问题)

我认为问题在于我对网络的了解不足docker.所以问题是:

我可以从同一个kafka容器中使用和生产,但是,当我尝试创建另一个容器(或者使用我的笔记本电脑和python客户端)时,我得到了几个与advertised.host.name参数相关的错误(在图像中这个参数是KAFKA_ADVERTISED_HOST_NAME)

我已经尝试过多种方式设置这个变量,但它根本不起作用.

所以我正在寻找一个明确的答案(即如何自动设置这些参数及其含义)如何设置 docker-compose.yml

这是我的:

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"

kafka:
  image: wurstmeister/kafka
 # hostname: kafka
  ports:
    - "9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: "kafka"
    KAFKA_ADVERTISED_PORT: "9092"
    KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
Run Code Online (Sandbox Code Playgroud)

UPDATE

按照@dnephin的建议,我修改start-kafka.sh了以下几行:

...
if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
    export KAFKA_ADVERTISED_PORT=$(hostname -i)
fi
...
Run Code Online (Sandbox Code Playgroud)

KAFKA_ADVERTISED_HOST_NAME: "kafka"从中删除docker-compose.yml

我以规范的方式启动了容器:

docker-compose up -d
Run Code Online (Sandbox Code Playgroud)

两个容器都在运行:

$ docker-compose ps
           Name                          Command               State                     Ports                    
-----------------------------------------------------------------------------------------------------------------
infraestructura_kafka_1       start-kafka.sh                   Up …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker docker-compose kafka-python

47
推荐指数
3
解决办法
4万
查看次数

如何使用python在apache kafka中创建主题

到目前为止,我没有看到一个python客户端显式实现主题的创建,而不使用配置选项自动创建主题.

python apache-kafka kafka-python

22
推荐指数
3
解决办法
1万
查看次数

如何获取kafka主题的分区的最新偏移量?

我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量.但是我无法让它发挥作用.

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Run Code Online (Sandbox Code Playgroud)

但我得到的输出是

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
.... …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-consumer-api kafka-python

21
推荐指数
4
解决办法
3万
查看次数

kafka-python:生产者无法连接

连接到代理时kafka-python(1.0.0)抛出错误.同时/ usr/bin/kafka-console-producer和/ usr/bin/kafka-console-consumer工作正常.

Python应用程序过去也运行良好,但在zookeeper重新启动后,它不再能够连接.

我正在使用文档中的裸骨示例:

from kafka import KafkaProducer
from kafka.common import KafkaError

producer = KafkaProducer(bootstrap_servers=['hostname:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

Traceback (most recent call last):   File "pp.py", line 4, in <module>
    producer = KafkaProducer(bootstrap_servers=['hostname:9092'])   File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
    self.config['api_version'] = client.check_version()   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
    connect(node_id)   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
    raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-python

12
推荐指数
2
解决办法
2万
查看次数

NoBrokersAvailable:NoBrokersAvailable-Kafka 错误

我已经开始学习卡夫卡了。尝试对其进行基本操作。我一直坚持关于“经纪人”的观点。

我的 kafka 正在运行,但是当我想创建一个分区时。

 from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
 consumer.assign([TopicPartition('foobar', 2)])
 msg = next(consumer)
Run Code Online (Sandbox Code Playgroud)

回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”中,第 284 行,在init self._client = KafkaClient(metrics=self._metrics, **self.config) 文件 "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in init self.config['api_version '] = self.check_version(timeout=check_timeout) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 791 行,在 check_version 中引发 Errors.NoBrokersAvailable() kafka.errors。 NoBrokersAvailable:NoBrokersAvailable

python apache-kafka kafka-consumer-api kafka-python kafka-producer-api

10
推荐指数
4
解决办法
3万
查看次数

如何使用kafka-python订阅多个kafka通配符模式列表?

我正在使用带通配符的模式订阅Kafka,如下所示.通配符表示动态客户ID.

consumer.subscribe(pattern='customer.*.validations')
Run Code Online (Sandbox Code Playgroud)

这很有效,因为我可以从主题字符串中提取客户ID.但是现在我需要扩展功能以听取类似主题的目的略有不同.我们称之为customer.*.additional-validations.代码需要存在于同一个项目中,因为共享了很多功能,但我需要能够根据队列类型采用不同的路径.

Kafka文档中,我可以看到可以订阅一系列主题.然而,这些是硬编码的字符串.不是允许灵活性的模式.

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)
Run Code Online (Sandbox Code Playgroud)

所以我想知道是否有可能以某种方式做两者的组合?有点像这样(不工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
Run Code Online (Sandbox Code Playgroud)

python apache-kafka kafka-python

10
推荐指数
1
解决办法
9317
查看次数

KafkaTimeoutError:60.0 秒后无法更新元数据

我有一个高吞吐量 kafka 生产者的用例,我想每秒推送数千条 json 消息。

我有一个 3 节点 kafka 集群,我正在使用最新的 kafka-python 库,并有以下方法来生成消息

def publish_to_kafka(topic):
    data = get_data(topic)
    producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
    try:
        for obj in data:
           producer.send(topic, value=obj)
    except Exception as e:
            logger.error(e)
    finally:
        producer.close()
Run Code Online (Sandbox Code Playgroud)

我的主题有 3 个分区。

方法有时可以正常工作,但会失败并出现错误“KafkaTimeoutError:无法在 60.0 秒后更新元数据。”

我需要更改哪些设置才能使其顺利工作?

python apache-kafka kafka-python kafka-producer-api

10
推荐指数
1
解决办法
3万
查看次数

如何从python客户端发送JSON对象到kafka

我有一个简单的 JSON 对象,如下所示

d = { 'tag ': 'blah',
  'name' : 'sam',
  'score': 
    {'row1': 100,
      'row2': 200
     }
}
Run Code Online (Sandbox Code Playgroud)

以下是我的 python 代码,它将消息发送到 Kafka

from kafka import SimpleProducer, KafkaClient
import json 

# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)
Run Code Online (Sandbox Code Playgroud)

我在风暴日志中看到消息正在被接收,但它抛出 Transformation null for tuple { json structure in here } 不确定需要做什么才能解决这个问题?..

python json apache-kafka kafka-python

9
推荐指数
2
解决办法
4万
查看次数

如何将数据从Kafka传递到Spark Streaming?

我正在尝试将数据从kafka传递到spark streaming.

这就是我到目前为止所做的事情:

  1. 安装了kafkaspark
  2. zookeeper默认属性config开始
  3. kafka server默认属性config开始
  4. 入门 kafka producer
  5. 入门 kafka consumer
  6. 从生产者发送消息到消费者.工作良好.
  7. 写了kafka-spark.py来接收从kafka到spark的消息.
  8. 我试着跑步 ./bin/spark-submit examples/src/main/python/kafka-spark.py
  9. 我收到一个错误.

kafka-spark.py -

from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
    conf = SparkConf().setAppName("Kafka-Spark")
    #sc = SparkContext(appName="KafkaSpark")
    sc = SparkContext(conf=conf)
    stream=StreamingContext(sc,1)
    map1={'spark-kafka':1}
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too

    print("kafkastream=",kafkaStream)
    sc.stop()
Run Code Online (Sandbox Code Playgroud)

完整日志包括运行spark-kafka.py时出错:

Using …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming kafka-python

9
推荐指数
2
解决办法
7193
查看次数

如何正确使用pyspark向kafka经纪人发送数据?

我正在尝试编写一个简单的pyspark作业,它将从kafka代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的kafka代理主题上.

我有以下代码,它从kafka主题读取数据,但没有影响运行sendkafka函数:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('spark.out', message)

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 5)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    parsed = kvs.map(lambda (key, value): json.loads(value))
    parsed.pprint()

    sentRDD = kvs.mapPartitions(sendkafka)
    sentRDD.count()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main() …
Run Code Online (Sandbox Code Playgroud)

python-2.7 spark-streaming pyspark kafka-python

7
推荐指数
1
解决办法
7904
查看次数