标签: apache-kafka

为什么使用 os.popen 时最后一行是空的

我正在使用这个 Python (Python 3) 代码,以便获取所有主题的列表

\n

作为:

\n
more test.py\n\nlist = os.popen(" kafka-topics.sh \xe2\x80\x93zookeeper zoo_server:2181 --list | sed '/^[[:space:]]*$/d'  ").read().split('\\n')\nprint (list)\n
Run Code Online (Sandbox Code Playgroud)\n

当我运行 python 脚本时,我注意到最后一行如下所示

\n
\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6\n\xe2\x80\xa6\xe2\x80\xa6\xe2\x80\xa6.'topic32', 'topic33', 'topic34 , \xe2\x80\x98 \xe2\x80\x98]\n
Run Code Online (Sandbox Code Playgroud)\n

所以最后一个单词实际上是空的,因为在单引号之间,因为\xe2\x80\x98 \xe2\x80\x98我们没有主题名称

\n

这很奇怪,因为在行中我使用 sed -sed '/^[[:space:]]*$/d'来删除空行,并且当我运行以下命令时实际上不是空行

\n
kafka-topics.sh \xe2\x80\x93zookeeperzoo_server:2181 --list | sed '/^[[:space:]]*$/d'  \n
Run Code Online (Sandbox Code Playgroud)\n

我的 Python 行有什么问题吗?

\n

例如,当我跑步时

\n
kafka-topics.sh \xe2\x80\x93zookeeper zoo_server:2181 --list | sed '/^[[:space:]]*$/d'\n\ntopic1\ntopic2\ntopic3\n.\n.\n.\n
Run Code Online (Sandbox Code Playgroud)\n

python linux python-3.x apache-kafka

1
推荐指数
1
解决办法
162
查看次数

如何在 JavaScript 中将带有等号“=”的字符串转换为 json

以下字符串从 Kafka 返回到 Lambda 连接器。

'{device_id=D_2021_A07, key=tele_metrics, sensor_1=62, sensor_2=23}'
Run Code Online (Sandbox Code Playgroud)

我想将其转换为有效的 JSON,如下所示

{
    "device_id": "D_2021_A07",   //String
    "key": "tele_metrics",       //String
    "sensor_1": 62,              //Integer
    "sensor_2": 23               //Integer
}
Run Code Online (Sandbox Code Playgroud)

我怎样才能在 JavaScript 中做到这一点。

lambda apache-kafka jsonparser

1
推荐指数
1
解决办法
2617
查看次数

Spring Cloud Kafka Streams 与 Spring Cloud Stream 之间的区别?

Spring Cloud Kafka Streams、Spring Cloud Stream、Spring Cloud Function、Spring AMQP 和 Spring for Apache Kafka 有什么区别?

apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
3728
查看次数

有没有办法在 Kafka Streaming 的“foreachBatch”函数中传递附加/额外参数?

我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。

def write_stream_batches(kafka_df: DataFrame,table_config):
    table_config = state_config
    kafka_df.writeStream \
    .format('kafka') \
    .foreachBatch(join_kafka_streams_denorm) \
    .option('checkpointLocation', table_config['checkpoint_location']) \
    .start() \
    .awaitTermination()

def join_kafka_streams_denorm(kafka_df, batch_id):
    try:
        table_config = state_config
        kafka_config = kafkaconfig

        filters = ata_filter(kafka_df=kafka_df)
        main_df = spark.sql(f'select * from db.table where {filters}')

        joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
        push_to_kafka(joined_df, kafka_config, table_config, 'state')
    except Exception as error:
        print(f'Join failed with the exception: {error}')
        traceback.print_exc()
        print('Stopping the application')
        sys.exit(1)
Run Code Online (Sandbox Code Playgroud)

该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。

table_config = state_config
Run Code Online (Sandbox Code Playgroud)

这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config …

apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1229
查看次数

如何修复此错误“线程“main”中的异常 joptsimple.UnrecognizedOptionException:zookeeper 不是可识别的选项”

我尝试创建 Kafka 主题,但收到错误。

$ docker exec -ti kafka-server1 \
>   /opt/bitnami/kafka/bin/kafka-topics.sh --create \
>   --zookeeper zookeeper-server:2181 \
>   --replication-factor 1 \
>   --partitions 1 \
>   --topic topic_model_local

Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
    at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
    at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
    at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
    at joptsimple.OptionParser.parse(OptionParser.java:396)
    at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:517)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

Run Code Online (Sandbox Code Playgroud)

apache-kafka

1
推荐指数
1
解决办法
3626
查看次数

提取数据进行分析时的 PCI DSS 合规性

场景如下:我有 2 个子网。1 符合 PCI DSS 标准,而另一个则不符合。我可以将数据从 PCI 兼容子网提取到不兼容子网中,以便在 Kafka 上处理吗?

tl;dr 必须分析的数据位于兼容子网上。Kafka 位于不合规子网中。

pci-dss amazon-web-services pci-compliance apache-kafka

1
推荐指数
1
解决办法
156
查看次数

动态调整消费者线程数量以适应 Kafka 分区数量

我有一个 Kafka 主题,有 50 个分区。
我的 Spring Boot 应用程序使用 Spring Kafka 来读取这些消息@KafkaListener

我的应用程序在 Kubernetes 中自动缩放的实例数量。

默认情况下,Spring Kafka 似乎为每个主题启动 1 个消费者线程。

org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1
Run Code Online (Sandbox Code Playgroud)

因此,对于应用程序的唯一实例,一个线程正在读取 50 个分区。
有 2 个实例,有负载平衡,每个实例监听 25 个分区。每个实例仍然有 1 个线程。

concurrency我知道我可以使用上的参数设置线程数@KafkaListener
但这是一个固定值。

有什么方法可以告诉 Spring 动态调整消费者线程的数量以适应客户端当前正在侦听的分区数量吗?

spring apache-kafka spring-kafka

1
推荐指数
1
解决办法
2018
查看次数

基于Kafka的Confluence Platform 7.1是免费的吗?开源?用于生产用途

我有开始使用 Kafka 的用例,并且正在寻找开源免费(生产)kafka。

当检查Confluence 7.1平台看起来很合适时,因为它捆绑了zookeeper/kafka/schema注册表/kafka UI。

在决定继续之前,只想检查一下 Confluence Platform 7.1 是否免费且开源?我需要购买许可或付费支持吗?

apache-kafka confluent-platform

1
推荐指数
1
解决办法
810
查看次数

当 S3 上有新文件时,在 Kafka 上获取事件/消息

我对 AWS 很陌生,对 Kafka 也很陌生(使用 Confluence 平台和 .NET)。

我们将接收大文件 (~1-40+Mb) 到我们的 S3 存储桶,并且其消费端应该处理这些文件。我们将通过 Kafka 发送所有消息。

我读过你不应该通过 Kafka 发送大文件,但也许我在这里被误导了?

如果我们只想获取一个新文件已到达我们的 S3 存储桶的事件(当然还有对其的某种引用),我们该怎么办?

amazon-s3 apache-kafka confluent-kafka-dotnet

1
推荐指数
1
解决办法
3511
查看次数

使用 Kafka 和每个实体类型的主题进行事件溯源,外键会发生什么情况?

我正在尝试使用 Kafka 设计一个事件源系统。假设我们有一个非常简单的事件源应用程序,由四个聚合根组成:用户、类别、产品和购买。

遵循confluence的一些指导,我创建了 4 个主题(为简单起见,我们假设每个主题 1 个分区),每个聚合一个:

  • 为用户提供一个主题
  • 类别的一个主题
  • 产品的一个主题
  • 购买的一个主题

我的投影是 PostgresSQL 投影,它从 Kafka 读取事件,然后将每个事件转换为 SQL 查询(根据事件类型插入、更新或删除)。

问题是数据库模型大约是这样的:

在此输入图像描述

鉴于主题之间的事件顺序无法保证,如果我尝试将产品添加到没有其类别的数据库中,我很可能会收到来自数据库的foreignkey错误。同样,如果我尝试在用户或产品尚未处理的情况下添加购买,我很可能会收到另一个外键错误。

我必须使用哪些分区选项来解决读取模型中的外键问题,并允许我的系统随着更多实体和更多事件而增长,而不必依赖将所有内容放入单个主题来避免外键问题?

我想到的一件事是,虽然我可以独立处理用户和产品而不会发生冲突,但我仍然需要对购买做一些事情(以某种方式延迟该主题的事件处理,直到确保用户和产品存在于数据库)。

domain-driven-design event-sourcing apache-kafka

1
推荐指数
1
解决办法
566
查看次数