我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。
该流通过 4 个线程并行化。
如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:
1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)
问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?
当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:
45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)
这是该程序的代码片段:
imports...
public class StreamingCase {
public static void main(String[] args) throws Exception {
int parallelism = 4;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(parallelism);
DataStream<Cube> start = env
.addSource(new …Run Code Online (Sandbox Code Playgroud) 在流处理方面,Apache Beam和Apache Kafka有什么区别?我也试图掌握技术和程序上的差异。
请通过您的经验报告帮助我理解。
stream-processing apache-kafka apache-beam apache-kafka-streams
我无法理解kafka流中的groupBy / groupById和窗口化的概念。我的目标是汇总一段时间(例如5秒)内的流数据。我的流数据看起来像:
{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}
Run Code Online (Sandbox Code Playgroud)
时间以毫秒(纪元)为单位。我的时间戳记在我的消息中,而不在密钥中。我想平均5秒窗口的值。
这是我正在尝试的代码,但似乎无法使它正常工作
builder.<String, String>stream("my_topic")
.map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
.groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
.windowedBy(TimeWindows.of(5000))
.count()
.toStream()
.foreach((key, val) -> System.out.println(key + " " + val));
Run Code Online (Sandbox Code Playgroud)
即使主题每两秒钟生成一次消息,此代码也不会打印任何内容。当我按Ctrl + C时,它会打印出类似
[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1
Run Code Online (Sandbox Code Playgroud)
此输出对我来说没有意义。
相关代码:
public class MessageTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
String str = (String)record.value();
TimeVal tv = TimeVal.fromJson(str);
return tv.time;
}
}
public …Run Code Online (Sandbox Code Playgroud) 我是 flink 的新手,对状态后端配置有一些困惑。
据我所知,RocksDB 将应用程序的所有状态保存在文件系统上。我使用 s3 来存储状态,因此我将state.checkpoints.dir和state.savepoints.dir都配置为指向我的 s3 存储桶。现在我看到还有另一个与 RocksDB 存储相关的选项,名为state.backend.rocksdb.localdir。这是什么目的?(我看到我不能使用 s3)另外,如果 RocksDB 使用本地机器存储来做某事,当我使用 Kubernetes 并且我的 pod 突然失败时会怎么样?我应该使用持久存储吗?
另一件事,我不确定我是否正确理解了所有国家的事情。检查点是否保存了我的所有状态?例如,当我使用 AggregationFunction 并且应用程序失败时,当应用程序恢复时,每个键的聚合值是否会恢复?
要了解什么是kafka-streams我应该知道什么是stream-processing。当我开始在网上阅读它们时,我无法掌握整体情况,因为它是一个永无止境的新概念链接树。
谁能stream-processing用一个简单的现实世界的例子来解释什么?
以及如何将其kafka-streams与生产者消费者架构联系起来?
谢谢你。
我正在使用不同的窗口大小计算一些记录的简单平均值。使用1 小时和1 周的窗口没有问题,并且结果计算正确。
var keyed = src
.filter(event -> event.getSensor_id() < 10000)
.keyBy(Event::getSensor_id);
var hourResult = keyed
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new AvgQ1(Config.HOUR))
.setParallelism(5);
var weekResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(7)))
.aggregate(new AvgQ1(Config.WEEK))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
相反,使用1 个月(31 天)的窗口,窗口被分成两半,flink 给出两个结果作为输出,一个用于 05-1 到 05-14 的记录,另一个用于从 05-15 到 05-14 的记录。 31.
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(31)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
使用大小为 30 天的窗口,结果会分为 (05-1;05-27) 和 (05-28;05-31)。
SingleOutputOperator<OutputQuery> monthResult = keyed
.window(TumblingEventTimeWindows.of(Time.days(30)))
.aggregate(new AvgQ1(Config.MONTH))
.setParallelism(5);
Run Code Online (Sandbox Code Playgroud)
这是AggregateFunction.
public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> { …Run Code Online (Sandbox Code Playgroud) 假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码来做到这一点:
aggregatedTuple
.keyBy( 0).timeWindow(Time.milliseconds(8))
.reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Run Code Online (Sandbox Code Playgroud)
Point:数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的后8个约数,例如1531569851297将映射到1531569851296。
但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎或至少延迟小于窗口时间(8 毫秒),这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段)已到达,延迟时间为 30 毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:
我在流中获取字符串数据,我只想保留字母数字字符。我注意到 Siddhi 提供了一个 regexp 函数,正如这里提到的。但问题是它返回一个布尔值而不是修改后的字符串。有没有办法直接获取修改后的字符串?这是我的代码。
@App:name("strtest")
@App:description("Description of the plan")
-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor.
define stream InboundStream(ipstring string);
@sink(type='log', prefix='Modified string')
define stream Opstream(ropstring bool);
from InboundStream
select str:regexp(ipstring, "^A-Za-z0-9") as ropstring insert into Opstream;
Run Code Online (Sandbox Code Playgroud)
是否有返回修改后的正则表达式字符串的函数?
apache-flink ×4
apache-kafka ×4
java ×3
apache-beam ×1
docker ×1
rocksdb ×1
siddhi ×1
windowing ×1
wso2sp ×1