小编Mic*_*oll的帖子

在 Spark 中以结构化流模式获取 Offset 的消息正在重置

Spark (v2.4) 程序功能:

  • Kafka在spark中以结构化流模式从队列中读取JSON数据
  • 在控制台上按原样打印读取的数据

问题得到:
- 得到Resetting offset for partition nifi-log-batch-0 to offset 2826180.

源代码:

package io.xyz.streaming

import org.apache.spark.sql.avro._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._

object readKafkaJson {
    private val topic = "nifi-log-batch"
    private val kafkaUrl = "http://<hostname>:9092"
    private val chk = "/home/xyz/tmp/checkpoint"
    private val outputFileLocation = "/home/xyz/abc/data"
    private val sparkSchema = StructType(Array(
                StructField("timestamp", StringType),
                StructField("level", StringType),
                StructField("thread", StringType),
                StructField("class", StringType),
                StructField("message", StringType),
                StructField("updatedOn", StringType),
                StructField("stackTrace", StringType)))


    def main(args: Array[String]): Unit = { …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming

7
推荐指数
1
解决办法
2523
查看次数

Kafka Streams和RPC:在map()运算符中调用REST服务被认为是反模式?

实现使用参考数据丰富存储在Kafka中的传入事件流的用例的简单方法是通过在map()运算符中调用为每个传入事件提供此参考数据的外部服务REST API.

eventStream.map((key, event) -> /* query the external service here, then return the enriched event */)
Run Code Online (Sandbox Code Playgroud)

另一种方法是将第二个事件流与参考数据一起存储,KTable然后将其存储在一个轻量级嵌入式"数据库"中,然后将主事件流与其连接.

KStream<String, Object> eventStream = builder.stream(..., "event-topic");
KTable<String, Object> referenceDataTable = builder.table(..., "reference-data-topic");
KTable<String, Object> enrichedEventStream = eventStream 
    .leftJoin(referenceDataTable , (event, referenceData) -> /* return the enriched event */)
    .map((key, enrichedEvent) -> new KeyValue<>(/* new key */, enrichedEvent)
    .to("enriched-event-topic", ...);
Run Code Online (Sandbox Code Playgroud)

"天真"的方法可以被视为反模式吗?可以KTable推荐" "方法作为首选方法吗?

Kafka每分钟可以轻松管理数百万条消息.从map()操作员调用的服务也应该能够处理高负载并且也具有高可用性.这些是服务实现的额外要求.但是,如果服务满足这些标准,可以采用"天真"的方法吗?

apache-kafka apache-kafka-streams

6
推荐指数
2
解决办法
1933
查看次数

Kafka多主题消费

   consumer.subscribe(Pattern.compile(".*"),new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> clctn) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> clctn) {
            }            
        });
Run Code Online (Sandbox Code Playgroud)

如何在 apache/kafka 中使用正则表达式使用所有主题?我尝试了上面的代码,但它不起作用。

regex wildcard apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
1万
查看次数

Kafka Streams - 跳跃窗口 - 重复数据删除键

我正在一个4小时的窗口上进行跳跃窗口聚合,每5分钟推进一次.由于跳跃窗口重叠,我得到具有不同聚合值的重复键.

TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
Run Code Online (Sandbox Code Playgroud)

如何使用重复数据消除重复键或仅选择包含最新值的键.

apache-kafka apache-kafka-streams

4
推荐指数
2
解决办法
2868
查看次数

Kafka Streams用例

我正在构建一个简单的应用程序,按顺序执行 -

1)从远程IBM MQ读取消息(遗留系统仅适用于IBM MQ)

2)将这些消息写入Kafka主题

3)从同一个Kafka主题中读取这些消息并调用REST API.

4)未来可能会有其他消费者阅读这个主题.

我开始知道Kafka有新的流API,在速度/简单性等方面应该比Kafka消费者更好.有人可以告诉我,如果流API非常适合我的用例以及在什么时候在我的过程中我可以插上吗?

apache-kafka apache-kafka-streams

3
推荐指数
2
解决办法
3076
查看次数

逻辑删除消息是否无法从KTable状态存储中删除记录?

我正在创建从KStream处理数据的KTable。但是,当我使用键和有效负载为空的逻辑删除消息时,它并没有从KTable中删除消息。

样本-

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
Run Code Online (Sandbox Code Playgroud)

触发带有空值的消息后,我可以使用有效负载为空的testStream映射函数进行调试,但是它不会删除KTable更改日志“测试存储”上的记录。看起来它甚至没有达到reduce方法,不确定我在这里缺少什么。

感谢任何帮助!

谢谢。

spring-cloud-stream apache-kafka-streams spring-kafka

3
推荐指数
1
解决办法
1510
查看次数

如何真正丢弃迟到的记录?

这是问题所在:

假设有一个数字流,我想从这些数字中收集 1 小时时段的 MAX,其中我允许给定时段最多 3 小时的延迟。

这听起来像是翻滚窗户的实验室案例。

这是我到目前为止所拥有的:

stream.aggregate(
      () -> 0L,
      (aggKey, value, aggregate) -> Math.max(value, aggregate),
      TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)),
      Serdes.Long(),
      "my_store"
)
Run Code Online (Sandbox Code Playgroud)

首先,我无法验证这是否确实发生在测试中。时间戳是通过 TimestampExtractor 提取的,我模拟延迟Thread.sleep(我将窗口设置为较小的测试值),但“延迟记录”仍然被处理而不是被丢弃。

常规窗口上似乎很少(没有?)示例。有一个关于 SessionWindows 的集成测试,但就是这样。我是否正确理解了这些概念?

编辑 2

示例 JUnit 测试。由于它相当大,我通过 Gist 分享它。

https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6

编辑(添加更多代码)

数据点具有时间戳(收集数据的时间)、收集数据的机器的主机名和值。

{
    "collectedAt": 12314124134, // timestamp
    "hostname": "machine-1",
    "reading": 3
}
Run Code Online (Sandbox Code Playgroud)

自定义时间戳提取器用于获取collectedAt. 这是我的管道的更完整表示:

{
    "collectedAt": 12314124134, // timestamp
    "hostname": "machine-1",
    "reading": 3
}
Run Code Online (Sandbox Code Playgroud)

测试的一个片段是

source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>"
  .groupByKey(Serdes.String(), roundDataSerde) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
699
查看次数