mdo*_*doe 2 python apache-kafka kafka-python
我是 python 和 kafka 的新手。我有一个脚本应该启动三个 kafka 消费者,等待来自这些消费者的消息并执行一些其他操作。此时我什至不知道我是否朝着正确的方向前进,所以任何帮助将不胜感激。
class MainClass():
def do_something_before(self):
# something is done here
def start_consumer(self):
consumer1_thread = threading.Thread(target=self.cons1, args=())
consumer2_thread = threading.Thread(target=self.cons2, args=())
consumer1_thread.daemon = True
consumer2_thread.daemon = True
consumer1_thread.start()
consumer2_thread.start()
def cons1(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])
for message in consumer:
print(message.value)
def cons2(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my2-topic'])
for message in consumer:
print(message.value)
def keep_working(self):
# something is done here
if __name__ == 'main':
g = MainClass()
g.do_something_before()
g.keep_working()
Run Code Online (Sandbox Code Playgroud)
小智 7
我添加了带有 2 个消费者的 python-kafka 示例(基本上是两个 python 进程),您可以在 github 链接上找到它https://github.com/Shubhamgorde/kafka-python-app。
无法发布整个 python 文件,它有点大。
from multiprocessing import Process
def consumeData(topic):
try:
consumer = KafkaConsumer(topic, value_deserializer=lambda v:
binascii.unhexlify(v).decode('utf-8'))
except:
print("Error!!")
for msg in consumer:
msg=ast.literal_eval(msg.value)
if(msg[2] == 'C'):
performCreditOperation(msg)
elif (msg[2] == 'D'):
performDebitOperation(msg)
t1 = Process(target=consumeData, args=('Credit_transac',))
t2 = Process(target=consumeData, args=('Debit_transac',))
t1.start()
t2.start()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
12491 次 |
最近记录: |