是否可以合并kafka中的记录并将输出发布到不同的流?
例如,有一个针对 kafka 主题的事件流,如下所示
{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930},{txnId:2,startTime:0912}, {txnId:3,开始时间:0925}......
我想通过 txnId 合并这些事件并创建合并的输出,如下所示
{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930}
请注意,传入事件中不会维护顺序。因此,如果在开始时间事件之前收到 txn Id 的 endTime,那么我们需要等到收到该 txnId 的开始时间事件后再启动合并
我浏览了 Kafka Streams 示例附带的字数示例,但不清楚如何等待事件,然后在进行转换时合并。
任何想法都受到高度赞赏。
apache-kafka kafka-consumer-api apache-kafka-streams confluent-platform
我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储.
我期待.punctuate()大约每30秒看一次.我得到的是保存在这里.
(原始文件长达数千行)
总结 - .punctuate()看似随机然后反复调用.它似乎不符合通过ProcessorContext.schedule()设置的值.
另一个相同代码的运行.punctuate()大约每四分钟产生一次调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.
使用以下代码:
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
Run Code Online (Sandbox Code Playgroud)
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay; …Run Code Online (Sandbox Code Playgroud) 接收我只是想在CarClass上映射并想要创建新流的Json数据,但是map方法不允许我在自定义数据类型上进行映射。KStream类型的map(KeyValueMapper>)方法不适用于参数(新的KeyValueMapper>(){})?
我正在开发一个项目,使用kafka connect从多个数据库源中提取数据.我希望能够将数据转换为指定的json格式,然后最终将最终的json推送到S3存储桶,最好使用kafka connect来保持我的开销.
以下是数据目前进入kafka(以avro格式)的示例:
{"tableName":"TABLE1","SchemaName{"string":"dbo"},"tableID":1639117030,"columnName":{"string":"DATASET"},"ordinalPosition":{"int":1},"isNullable":{"int":1},"dataType":{"string":"varchar"},"maxLength":{"int":510},"precision":{"int":0},"scale":{"int":0},"isPrimaryKey":{"int":0},"tableSizeKB":{"long":72}}
{"tableName":"dtproperties","SchemaName":{"string":"dbo"},"tableID":1745441292,"columnName":{"string":"id"},"ordinalPosition":{"int":1},"isNullable":{"int":0},"dataType":{"string":"int"},"maxLength":{"int":4},"precision":{"int":10},"scale":{"int":0},"isPrimaryKey":{"int":1},"tableSizeKB":{"long":24}}
Run Code Online (Sandbox Code Playgroud)
转换为JSON时看起来如此:
{
"tablename" : "AS_LOOKUPS",
"tableID": 5835333,
"columnName": "SVALUE",
"ordinalPosition": 6,
"isNullable": 1,
"dataType": "varchar",
"maxLength": 4000,
"precision": 0,
"scale": 0,
"isPrimaryKey": 0,
"tableSize": 0,
"sizeUnit": "GB"
},
{
"tablename" : "AS_LOOKUPS",
"tableID": 5835333,
"columnName": "SORT_ORDER",
"ordinalPosition": 7,
"isNullable": 1,
"dataType": "int",
"maxLength": 4,
"precision": 10,
"scale": 0,
"isPrimaryKey": 0,
"tableSize": 0,
"sizeUnit": "GB"
}
Run Code Online (Sandbox Code Playgroud)
我的目标是让数据看起来像这样:
{
"header": "Database Inventory",
"DBName": "DB",
"ServerName": "server@server.com",
"SchemaName": "DBE",
"DB Owner": "Name",
"DB Guardian" : "Name/Group",
"ASV" …Run Code Online (Sandbox Code Playgroud) bigdata apache-kafka apache-kafka-streams apache-kafka-connect
我试图使用下面scala的kafka流是我的Java代码,它完全正常:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Run Code Online (Sandbox Code Playgroud)
我的scala代码如下:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
Run Code Online (Sandbox Code Playgroud)
scala代码抛出编译错误.期望类型不匹配:找不到ForEachAction [ > String, > String],Actual((any,any),Unit):找不到值键:值value
有没有人知道如何在scala中使用流API
假设我有两种类型的日志,它们有一个公共字段'uid',如果包含uid的这两个日志的日志到达,我想输出日志,就像连接一样,Kafka可能吗?
我是Kafka Streams的新手,正在使用1.0.0版。我想从一个值中为KTable设置一个新键。
使用KStream时,可以通过使用像这样的selectKey()方法来完成。
kstream.selectKey ((k,v) -> v.newKey)
Run Code Online (Sandbox Code Playgroud)
但是,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了反对KTable设计的关键吗?
我的主题之一是将string-json作为键-{“ city”:“ X”,“ id”:22}。在我的ksql语句中,我想将其提取到2个不同的字段而不是一个字段中,以便稍后进行过滤和加入。在文档中,它似乎允许我仅将整个字符串粘贴到键中,而不是允许将其格式化为JSON(就像FORMAT_VALUE一样),请参见下文。
VALUE_FORMAT(必需)指定主题中消息值的序列化格式。支持的格式:JSON,DELIMITED和AVRO
KEY将Kafka主题中的消息密钥与KSQL流中的一列相关联。
有了至少一次的保证,我知道发生故障的情况下可能会重复。但是,
1)Kafka Stream库执行提交的频率如何?
2)除了上述内容,用户是否还需要考虑提交?
3)是否有关于执行提交频率的最佳实践?
我希望能够根据消息密钥的密钥将Kafkastream中的所有记录发送到另一个主题。例如 Kafka中的流包含名称作为键和记录作为值。我想根据记录的关键将这些记录分散到不同的主题
数据:(jhon-> {jhonsRecord}),(sean-> {seansRecord}),(mary-> {marysRecord}),(jhon-> {jhonsRecord2}),预期
下面是我现在执行此操作的方式,但是由于名称列表比较笨拙,因此速度很慢。另外,即使有一些名字的记录,我也需要遍历整个列表。请提出修复建议
for( String name : names )
{
recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
}
Run Code Online (Sandbox Code Playgroud)