我正在使用这个 Python (Python 3) 代码,以便获取所有主题的列表
\n作为:
\nmore test.py\n\nlist = os.popen(" kafka-topics.sh \xe2\x80\x93zookeeper zoo_server:2181 --list | sed '/^[[:space:]]*$/d' ").read().split('\\n')\nprint (list)\nRun Code Online (Sandbox Code Playgroud)\n当我运行 python 脚本时,我注意到最后一行如下所示
\n\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\n\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6.'topic32', 'topic33', 'topic34 , \xe2\x80\x98 \xe2\x80\x98]\nRun Code Online (Sandbox Code Playgroud)\n所以最后一个单词实际上是空的,因为在单引号之间,因为\xe2\x80\x98 \xe2\x80\x98我们没有主题名称
这很奇怪,因为在行中我使用 sed -sed '/^[[:space:]]*$/d'来删除空行,并且当我运行以下命令时实际上不是空行
kafka-topics.sh \xe2\x80\x93zookeeperzoo_server:2181 --list | sed '/^[[:space:]]*$/d' \nRun Code Online (Sandbox Code Playgroud)\n我的 Python 行有什么问题吗?
\n例如,当我跑步时
\nkafka-topics.sh \xe2\x80\x93zookeeper zoo_server:2181 --list | sed '/^[[:space:]]*$/d'\n\ntopic1\ntopic2\ntopic3\n.\n.\n.\nRun Code Online (Sandbox Code Playgroud)\n 以下字符串从 Kafka 返回到 Lambda 连接器。
'{device_id=D_2021_A07, key=tele_metrics, sensor_1=62, sensor_2=23}'
Run Code Online (Sandbox Code Playgroud)
我想将其转换为有效的 JSON,如下所示
{
"device_id": "D_2021_A07", //String
"key": "tele_metrics", //String
"sensor_1": 62, //Integer
"sensor_2": 23 //Integer
}
Run Code Online (Sandbox Code Playgroud)
我怎样才能在 JavaScript 中做到这一点。
Spring Cloud Kafka Streams、Spring Cloud Stream、Spring Cloud Function、Spring AMQP 和 Spring for Apache Kafka 有什么区别?
我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。
def write_stream_batches(kafka_df: DataFrame,table_config):
table_config = state_config
kafka_df.writeStream \
.format('kafka') \
.foreachBatch(join_kafka_streams_denorm) \
.option('checkpointLocation', table_config['checkpoint_location']) \
.start() \
.awaitTermination()
def join_kafka_streams_denorm(kafka_df, batch_id):
try:
table_config = state_config
kafka_config = kafkaconfig
filters = ata_filter(kafka_df=kafka_df)
main_df = spark.sql(f'select * from db.table where {filters}')
joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
push_to_kafka(joined_df, kafka_config, table_config, 'state')
except Exception as error:
print(f'Join failed with the exception: {error}')
traceback.print_exc()
print('Stopping the application')
sys.exit(1)
Run Code Online (Sandbox Code Playgroud)
该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。
table_config = state_config
Run Code Online (Sandbox Code Playgroud)
这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config …
我尝试创建 Kafka 主题,但收到错误。
$ docker exec -ti kafka-server1 \
> /opt/bitnami/kafka/bin/kafka-topics.sh --create \
> --zookeeper zookeeper-server:2181 \
> --replication-factor 1 \
> --partitions 1 \
> --topic topic_model_local
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:517)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud) 场景如下:我有 2 个子网。1 符合 PCI DSS 标准,而另一个则不符合。我可以将数据从 PCI 兼容子网提取到不兼容子网中,以便在 Kafka 上处理吗?
tl;dr 必须分析的数据位于兼容子网上。Kafka 位于不合规子网中。
我有一个 Kafka 主题,有 50 个分区。
我的 Spring Boot 应用程序使用 Spring Kafka 来读取这些消息@KafkaListener
我的应用程序在 Kubernetes 中自动缩放的实例数量。
默认情况下,Spring Kafka 似乎为每个主题启动 1 个消费者线程。
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
Run Code Online (Sandbox Code Playgroud)
因此,对于应用程序的唯一实例,一个线程正在读取 50 个分区。
有 2 个实例,有负载平衡,每个实例监听 25 个分区。每个实例仍然有 1 个线程。
concurrency我知道我可以使用上的参数设置线程数@KafkaListener。
但这是一个固定值。
有什么方法可以告诉 Spring 动态调整消费者线程的数量以适应客户端当前正在侦听的分区数量吗?
我有开始使用 Kafka 的用例,并且正在寻找开源免费(生产)kafka。
当检查Confluence 7.1平台看起来很合适时,因为它捆绑了zookeeper/kafka/schema注册表/kafka UI。
在决定继续之前,只想检查一下 Confluence Platform 7.1 是否免费且开源?我需要购买许可或付费支持吗?
我对 AWS 很陌生,对 Kafka 也很陌生(使用 Confluence 平台和 .NET)。
我们将接收大文件 (~1-40+Mb) 到我们的 S3 存储桶,并且其消费端应该处理这些文件。我们将通过 Kafka 发送所有消息。
我读过你不应该通过 Kafka 发送大文件,但也许我在这里被误导了?
如果我们只想获取一个新文件已到达我们的 S3 存储桶的事件(当然还有对其的某种引用),我们该怎么办?
我正在尝试使用 Kafka 设计一个事件源系统。假设我们有一个非常简单的事件源应用程序,由四个聚合根组成:用户、类别、产品和购买。
遵循confluence的一些指导,我创建了 4 个主题(为简单起见,我们假设每个主题 1 个分区),每个聚合一个:
我的投影是 PostgresSQL 投影,它从 Kafka 读取事件,然后将每个事件转换为 SQL 查询(根据事件类型插入、更新或删除)。
问题是数据库模型大约是这样的:
鉴于主题之间的事件顺序无法保证,如果我尝试将产品添加到没有其类别的数据库中,我很可能会收到来自数据库的foreignkey错误。同样,如果我尝试在用户或产品尚未处理的情况下添加购买,我很可能会收到另一个外键错误。
我必须使用哪些分区选项来解决读取模型中的外键问题,并允许我的系统随着更多实体和更多事件而增长,而不必依赖将所有内容放入单个主题来避免外键问题?
我想到的一件事是,虽然我可以独立处理用户和产品而不会发生冲突,但我仍然需要对购买做一些事情(以某种方式延迟该主题的事件处理,直到确保用户和产品存在于数据库)。
apache-kafka ×10
amazon-s3 ×1
apache-spark ×1
jsonparser ×1
lambda ×1
linux ×1
pci-dss ×1
python ×1
python-3.x ×1
spring ×1
spring-kafka ×1