标签: apache-kafka-streams

合并kafka流中的记录

是否可以合并kafka中的记录并将输出发布到不同的流?

例如,有一个针对 kafka 主题的事件流,如下所示

{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2,startTime:0912}, {txnId:3,开始时间:0925}......

我想通过 txnId 合并这些事件并创建合并的输出,如下所示

{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}

请注意,传入事件中不会维护顺序。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并

我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。

任何想法都受到高度赞赏。

apache-kafka kafka-consumer-api apache-kafka-streams confluent-platform

2
推荐指数
1
解决办法
2852
查看次数

Kafka KStream - 使用带窗口的AbstractProcessor

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储.

我期待.punctuate()大约每30秒看一次.我得到的是保存在这里.

(原始文件长达数千行)

总结 - .punctuate()看似随机然后反复调用.它似乎不符合通过ProcessorContext.schedule()设置的值.


编辑:

另一个相同代码的运行.punctuate()大约每四分钟产生一次调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.

使用以下代码:

主要

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay; …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1449
查看次数

如何使用KeyValueMapper将输入的KStream &lt;String,String&gt;映射到&lt;String,CarClass&gt;?

接收我只是想在CarClass上映射并想要创建新流的Json数据,但是map方法不允许我在自定义数据类型上进行映射。KStream类型的map(KeyValueMapper>)方法不适用于参数(新的KeyValueMapper>(){})?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2244
查看次数

从kafka转换数据的最简单方法

我正在开发一个项目,使用kafka connect从多个数据库源中提取数据.我希望能够将数据转换为指定的json格式,然后最终将最终的json推送到S3存储桶,最好使用kafka connect来保持我的开销.

以下是数据目前进入kafka(以avro格式)的示例:

{"tableName":"TABLE1","SchemaName{"string":"dbo"},"tableID":1639117030,"columnName":{"string":"DATASET"},"ordinalPosition":{"int":1},"isNullable":{"int":1},"dataType":{"string":"varchar"},"maxLength":{"int":510},"precision":{"int":0},"scale":{"int":0},"isPrimaryKey":{"int":0},"tableSizeKB":{"long":72}}
{"tableName":"dtproperties","SchemaName":{"string":"dbo"},"tableID":1745441292,"columnName":{"string":"id"},"ordinalPosition":{"int":1},"isNullable":{"int":0},"dataType":{"string":"int"},"maxLength":{"int":4},"precision":{"int":10},"scale":{"int":0},"isPrimaryKey":{"int":1},"tableSizeKB":{"long":24}}
Run Code Online (Sandbox Code Playgroud)

转换为JSON时看起来如此:

{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SVALUE",
      "ordinalPosition": 6,
      "isNullable": 1,
      "dataType": "varchar",
      "maxLength": 4000,
      "precision": 0,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
},
{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SORT_ORDER",
      "ordinalPosition": 7,
      "isNullable": 1,
      "dataType": "int",
      "maxLength": 4,
      "precision": 10,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
}
Run Code Online (Sandbox Code Playgroud)

我的目标是让数据看起来像这样:

{
  "header": "Database Inventory",
  "DBName": "DB",
  "ServerName": "server@server.com",
  "SchemaName": "DBE",
  "DB Owner": "Name",
  "DB Guardian" : "Name/Group",
  "ASV" …
Run Code Online (Sandbox Code Playgroud)

bigdata apache-kafka apache-kafka-streams apache-kafka-connect

1
推荐指数
1
解决办法
3814
查看次数

Kafka与Scala一起流

我试图使用下面scala的kafka流是我的Java代码,它完全正常:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
Run Code Online (Sandbox Code Playgroud)

我的scala代码如下:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()
Run Code Online (Sandbox Code Playgroud)

scala代码抛出编译错误.期望类型不匹配:找不到ForEachAction [ > String, > String],Actual((any,any),Unit):找不到值键:值value

有没有人知道如何在scala中使用流API

scala apache-kafka kafka-consumer-api apache-kafka-streams

1
推荐指数
1
解决办法
1197
查看次数

如何在kafka中同步多个日志?

假设我有两种类型的日志,它们有一个公共字段'uid',如果包含uid的这两个日志的日志到达,我想输出日志,就像连接一样,Kafka可能吗?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
383
查看次数

kafka stream-如何为KTable设置新密钥

我是Kafka Streams的新手,正在使用1.0.0版。我想从一个值中为KTable设置一个新键。

使用KStream时,可以通过使用像这样的selectKey()方法来完成。

kstream.selectKey ((k,v) -> v.newKey)
Run Code Online (Sandbox Code Playgroud)

但是,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了反对KTable设计的关键吗?

java apache-kafka apache-kafka-streams

1
推荐指数
2
解决办法
2365
查看次数

KSQL Kafka密钥格式

我的主题之一是将string-json作为键-{“ city”:“ X”,“ id”:22}。在我的ksql语句中,我想将其提取到2个不同的字段而不是一个字段中,以便稍后进行过滤和加入。在文档中,它似乎允许我仅将整个字符串粘贴到键中,而不是允许将其格式化为JSON(就像FORMAT_VALUE一样),请参见下文。

VALUE_FORMAT(必需)指定主题中消息值的序列化格式。支持的格式:JSON,DELIMITED和AVRO

KEY将Kafka主题中的消息密钥与KSQL流中的一列相关联。

apache-kafka apache-kafka-streams ksql

1
推荐指数
1
解决办法
410
查看次数

Kafka Stream:消费者提交频率

有了至少一次的保证,我知道发生故障的情况下可能会重复。但是,
1)Kafka Stream库执行提交的频率如何?
2)除了上述内容,用户是否还需要考虑提交?
3)是否有关于执行提交频率的最佳实践?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
659
查看次数

使用kafka流基于消息密钥将消息发送到主题

我希望能够根据消息密钥的密钥将Kafkastream中的所有记录发送到另一个主题。例如 Kafka中的流包含名称作为键和记录作为值。我想根据记录的关键将这些记录分散到不同的主题

数据:(jhon-> {jhonsRecord}),(sean-> {seansRecord}),(mary-> {marysRecord}),(jhon-> {jhonsRecord2}),预期

  • topic1:名称:jhon->(jhon-> {jhonsRecord}),(jhon-> {jhonsRecord2})
  • topic2:sean->(sean-> {seansRecord})
  • topic3:mary->(mary-> {marysRecord})

下面是我现在执行此操作的方式,但是由于名称列表比较笨拙,因此速度很慢。另外,即使有一些名字的记录,我也需要遍历整个列表。请提出修复建议

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
162
查看次数