Ben*_*son 10 python apache-kafka kafka-python
我正在使用带通配符的模式订阅Kafka,如下所示.通配符表示动态客户ID.
consumer.subscribe(pattern='customer.*.validations')
Run Code Online (Sandbox Code Playgroud)
这很有效,因为我可以从主题字符串中提取客户ID.但是现在我需要扩展功能以听取类似主题的目的略有不同.我们称之为customer.*.additional-validations.代码需要存在于同一个项目中,因为共享了很多功能,但我需要能够根据队列类型采用不同的路径.
在Kafka文档中,我可以看到可以订阅一系列主题.然而,这些是硬编码的字符串.不是允许灵活性的模式.
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
Run Code Online (Sandbox Code Playgroud)
所以我想知道是否有可能以某种方式做两者的组合?有点像这样(不工作):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
Run Code Online (Sandbox Code Playgroud)
Dhr*_*hak 17
在KafkaConsumer代码中,它支持主题列表或模式,
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern
Partitions will be dynamically assigned via a group coordinator.
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
Run Code Online (Sandbox Code Playgroud)
所以你可以创建一个正则表达式,使用OR条件|,它应该作为订阅多个动态主题正则表达式,因为它在内部使用re模块进行匹配.
(customer.*.validations)|(customer.*.additional-validations)
| 归档时间: |
|
| 查看次数: |
9317 次 |
| 最近记录: |