我正在尝试使用wurstmeister\kafka-docker图像 docker-compose,但我在连接所有内容方面遇到了实际问题.
我检查的所有帖子或问题,似乎没有任何问题,但我坦率地失去了.(并且在SO中至少有两个问题试图解决这个问题)
我认为问题在于我对网络的了解不足docker.所以问题是:
我可以从同一个kafka容器中使用和生产,但是,当我尝试创建另一个容器(或者使用我的笔记本电脑和python客户端)时,我得到了几个与advertised.host.name参数相关的错误(在图像中这个参数是KAFKA_ADVERTISED_HOST_NAME)
我已经尝试过多种方式设置这个变量,但它根本不起作用.
所以我正在寻找一个明确的答案(即如何自动设置这些参数及其含义)如何设置 docker-compose.yml
这是我的:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
# hostname: kafka
ports:
- "9092"
links:
- zookeeper:zk
environment:
KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: "zk:2181"
Run Code Online (Sandbox Code Playgroud)
UPDATE
按照@dnephin的建议,我修改start-kafka.sh了以下几行:
...
if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
export KAFKA_ADVERTISED_PORT=$(hostname -i)
fi
...
Run Code Online (Sandbox Code Playgroud)
并KAFKA_ADVERTISED_HOST_NAME: "kafka"从中删除docker-compose.yml
我以规范的方式启动了容器:
docker-compose up -d
Run Code Online (Sandbox Code Playgroud)
两个容器都在运行:
$ docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------
infraestructura_kafka_1 start-kafka.sh Up …Run Code Online (Sandbox Code Playgroud) 到目前为止,我没有看到一个python客户端显式实现主题的创建,而不使用配置选项自动创建主题.
我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量.但是我无法让它发挥作用.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Run Code Online (Sandbox Code Playgroud)
但我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
.... …Run Code Online (Sandbox Code Playgroud) 连接到代理时kafka-python(1.0.0)抛出错误.同时/ usr/bin/kafka-console-producer和/ usr/bin/kafka-console-consumer工作正常.
Python应用程序过去也运行良好,但在zookeeper重新启动后,它不再能够连接.
我正在使用文档中的裸骨示例:
from kafka import KafkaProducer
from kafka.common import KafkaError
producer = KafkaProducer(bootstrap_servers=['hostname:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
Traceback (most recent call last): File "pp.py", line 4, in <module>
producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object …Run Code Online (Sandbox Code Playgroud) 我已经开始学习卡夫卡了。尝试对其进行基本操作。我一直坚持关于“经纪人”的观点。
我的 kafka 正在运行,但是当我想创建一个分区时。
from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)
Run Code Online (Sandbox Code Playgroud)
回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”中,第 284 行,在init self._client = KafkaClient(metrics=self._metrics, **self.config) 文件 "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in init self.config['api_version '] = self.check_version(timeout=check_timeout) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 791 行,在 check_version 中引发 Errors.NoBrokersAvailable() kafka.errors。 NoBrokersAvailable:NoBrokersAvailable
python apache-kafka kafka-consumer-api kafka-python kafka-producer-api
我正在使用带通配符的模式订阅Kafka,如下所示.通配符表示动态客户ID.
consumer.subscribe(pattern='customer.*.validations')
Run Code Online (Sandbox Code Playgroud)
这很有效,因为我可以从主题字符串中提取客户ID.但是现在我需要扩展功能以听取类似主题的目的略有不同.我们称之为customer.*.additional-validations.代码需要存在于同一个项目中,因为共享了很多功能,但我需要能够根据队列类型采用不同的路径.
在Kafka文档中,我可以看到可以订阅一系列主题.然而,这些是硬编码的字符串.不是允许灵活性的模式.
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
Run Code Online (Sandbox Code Playgroud)
所以我想知道是否有可能以某种方式做两者的组合?有点像这样(不工作):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
Run Code Online (Sandbox Code Playgroud) 我有一个高吞吐量 kafka 生产者的用例,我想每秒推送数千条 json 消息。
我有一个 3 节点 kafka 集群,我正在使用最新的 kafka-python 库,并有以下方法来生成消息
def publish_to_kafka(topic):
data = get_data(topic)
producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
try:
for obj in data:
producer.send(topic, value=obj)
except Exception as e:
logger.error(e)
finally:
producer.close()
Run Code Online (Sandbox Code Playgroud)
我的主题有 3 个分区。
方法有时可以正常工作,但会失败并出现错误“KafkaTimeoutError:无法在 60.0 秒后更新元数据。”
我需要更改哪些设置才能使其顺利工作?
我有一个简单的 JSON 对象,如下所示
d = { 'tag ': 'blah',
'name' : 'sam',
'score':
{'row1': 100,
'row2': 200
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我的 python 代码,它将消息发送到 Kafka
from kafka import SimpleProducer, KafkaClient
import json
# To send messages synchronously
kafka = KafkaClient('10.20.30.12:9092')
producer = SimpleProducer(kafka)
jd = json.dumps(d)
producer.send_messages(b'message1',jd)
Run Code Online (Sandbox Code Playgroud)
我在风暴日志中看到消息正在被接收,但它抛出 Transformation null for tuple { json structure in here } 不确定需要做什么才能解决这个问题?..
我正在尝试将数据从kafka传递到spark streaming.
这就是我到目前为止所做的事情:
kafka和sparkzookeeper默认属性config开始kafka server默认属性config开始kafka producerkafka consumer./bin/spark-submit examples/src/main/python/kafka-spark.pykafka-spark.py -
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
#conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
conf = SparkConf().setAppName("Kafka-Spark")
#sc = SparkContext(appName="KafkaSpark")
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,1)
map1={'spark-kafka':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too
print("kafkastream=",kafkaStream)
sc.stop()
Run Code Online (Sandbox Code Playgroud)
完整日志包括运行spark-kafka.py时出错:
Using …Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个简单的pyspark作业,它将从kafka代理主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的kafka代理主题上.
我有以下代码,它从kafka主题读取数据,但没有影响运行sendkafka函数:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('spark.out', message)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 5)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
parsed = kvs.map(lambda (key, value): json.loads(value))
parsed.pprint()
sentRDD = kvs.mapPartitions(sendkafka)
sentRDD.count()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main() …Run Code Online (Sandbox Code Playgroud) kafka-python ×10
apache-kafka ×9
python ×6
apache-spark ×1
docker ×1
json ×1
pyspark ×1
python-2.7 ×1