Spark (v2.4) 程序功能:
Kafka
在spark中以结构化流模式从队列中读取JSON数据问题得到:
- 得到Resetting offset for partition nifi-log-batch-0 to offset 2826180.
源代码:
package io.xyz.streaming
import org.apache.spark.sql.avro._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._
object readKafkaJson {
private val topic = "nifi-log-batch"
private val kafkaUrl = "http://<hostname>:9092"
private val chk = "/home/xyz/tmp/checkpoint"
private val outputFileLocation = "/home/xyz/abc/data"
private val sparkSchema = StructType(Array(
StructField("timestamp", StringType),
StructField("level", StringType),
StructField("thread", StringType),
StructField("class", StringType),
StructField("message", StringType),
StructField("updatedOn", StringType),
StructField("stackTrace", StringType)))
def main(args: Array[String]): Unit = { …
Run Code Online (Sandbox Code Playgroud) 实现使用参考数据丰富存储在Kafka中的传入事件流的用例的简单方法是通过在map()
运算符中调用为每个传入事件提供此参考数据的外部服务REST API.
eventStream.map((key, event) -> /* query the external service here, then return the enriched event */)
Run Code Online (Sandbox Code Playgroud)
另一种方法是将第二个事件流与参考数据一起存储,KTable
然后将其存储在一个轻量级嵌入式"数据库"中,然后将主事件流与其连接.
KStream<String, Object> eventStream = builder.stream(..., "event-topic");
KTable<String, Object> referenceDataTable = builder.table(..., "reference-data-topic");
KTable<String, Object> enrichedEventStream = eventStream
.leftJoin(referenceDataTable , (event, referenceData) -> /* return the enriched event */)
.map((key, enrichedEvent) -> new KeyValue<>(/* new key */, enrichedEvent)
.to("enriched-event-topic", ...);
Run Code Online (Sandbox Code Playgroud)
"天真"的方法可以被视为反模式吗?可以KTable
推荐" "方法作为首选方法吗?
Kafka每分钟可以轻松管理数百万条消息.从map()
操作员调用的服务也应该能够处理高负载并且也具有高可用性.这些是服务实现的额外要求.但是,如果服务满足这些标准,可以采用"天真"的方法吗?
consumer.subscribe(Pattern.compile(".*"),new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> clctn) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> clctn) {
}
});
Run Code Online (Sandbox Code Playgroud)
如何在 apache/kafka 中使用正则表达式使用所有主题?我尝试了上面的代码,但它不起作用。
我正在一个4小时的窗口上进行跳跃窗口聚合,每5分钟推进一次.由于跳跃窗口重叠,我得到具有不同聚合值的重复键.
TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
Run Code Online (Sandbox Code Playgroud)
如何使用重复数据消除重复键或仅选择包含最新值的键.
我正在构建一个简单的应用程序,按顺序执行 -
1)从远程IBM MQ读取消息(遗留系统仅适用于IBM MQ)
2)将这些消息写入Kafka主题
3)从同一个Kafka主题中读取这些消息并调用REST API.
4)未来可能会有其他消费者阅读这个主题.
我开始知道Kafka有新的流API,在速度/简单性等方面应该比Kafka消费者更好.有人可以告诉我,如果流API非常适合我的用例以及在什么时候在我的过程中我可以插上吗?
我正在创建从KStream处理数据的KTable。但是,当我使用键和有效负载为空的逻辑删除消息时,它并没有从KTable中删除消息。
样本-
public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));
GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
Run Code Online (Sandbox Code Playgroud)
触发带有空值的消息后,我可以使用有效负载为空的testStream映射函数进行调试,但是它不会删除KTable更改日志“测试存储”上的记录。看起来它甚至没有达到reduce方法,不确定我在这里缺少什么。
感谢任何帮助!
谢谢。
这是问题所在:
假设有一个数字流,我想从这些数字中收集 1 小时时段的 MAX,其中我允许给定时段最多 3 小时的延迟。
这听起来像是翻滚窗户的实验室案例。
这是我到目前为止所拥有的:
stream.aggregate(
() -> 0L,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)),
Serdes.Long(),
"my_store"
)
Run Code Online (Sandbox Code Playgroud)
首先,我无法验证这是否确实发生在测试中。时间戳是通过 TimestampExtractor 提取的,我模拟延迟Thread.sleep
(我将窗口设置为较小的测试值),但“延迟记录”仍然被处理而不是被丢弃。
常规窗口上似乎很少(没有?)示例。有一个关于 SessionWindows 的集成测试,但就是这样。我是否正确理解了这些概念?
编辑 2
示例 JUnit 测试。由于它相当大,我通过 Gist 分享它。
https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6
编辑(添加更多代码)
数据点具有时间戳(收集数据的时间)、收集数据的机器的主机名和值。
{
"collectedAt": 12314124134, // timestamp
"hostname": "machine-1",
"reading": 3
}
Run Code Online (Sandbox Code Playgroud)
自定义时间戳提取器用于获取collectedAt
. 这是我的管道的更完整表示:
{
"collectedAt": 12314124134, // timestamp
"hostname": "machine-1",
"reading": 3
}
Run Code Online (Sandbox Code Playgroud)
测试的一个片段是
source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>"
.groupByKey(Serdes.String(), roundDataSerde) …
Run Code Online (Sandbox Code Playgroud)