标签: stream-processing

Flink 中的 windowAll 算子是否会将并行度缩小到 1?

我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。

该流通过 4 个线程并行化。

如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)

问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?

当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:

45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)

这是该程序的代码片段:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new …
Run Code Online (Sandbox Code Playgroud)

java stream-processing apache-flink flink-streaming

2
推荐指数
1
解决办法
1669
查看次数

Apache Beam over Apache Kafka流处理

在流处理方面,Apache Beam和Apache Kafka有什么区别?我也试图掌握技术和程序上的差异。

请通过您的经验报告帮助我理解。

stream-processing apache-kafka apache-beam apache-kafka-streams

2
推荐指数
2
解决办法
5244
查看次数

了解Kafka流组by和window

我无法理解kafka流中的groupBy / groupById和窗口化的概念。我的目标是汇总一段时间(例如5秒)内的流数据。我的流数据看起来像:

{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}
Run Code Online (Sandbox Code Playgroud)

时间以毫秒(纪元)为单位。我的时间戳记在我的消息中,而不在密钥中。我想平均5秒窗口的值。

这是我正在尝试的代码,但似乎无法使它正常工作

builder.<String, String>stream("my_topic")
   .map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
   .groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
   .windowedBy(TimeWindows.of(5000))
   .count()
   .toStream()
   .foreach((key, val) -> System.out.println(key + " " + val));
Run Code Online (Sandbox Code Playgroud)

即使主题每两秒钟生成一次消息,此代码也不会打印任何内容。当我按Ctrl + C时,它会打印出类似

[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1
Run Code Online (Sandbox Code Playgroud)

此输出对我来说没有意义。

相关代码:

public class MessageTimeExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record,  long previousTimestamp) {
        String str = (String)record.value();
        TimeVal tv = TimeVal.fromJson(str);
        return tv.time;
    }
}

public …
Run Code Online (Sandbox Code Playgroud)

java stream-processing apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
4357
查看次数

Flink - RocksDB 中的 localdir 配置是什么?

我是 flink 的新手,对状态后端配置有一些困惑。

据我所知,RocksDB 将应用程序的所有状态保存在文件系统上。我使用 s3 来存储状态,因此我将state.checkpoints.dirstate.savepoints.dir都配置为指向我的 s3 存储桶。现在我看到还有另一个与 RocksDB 存储相关的选项,名为state.backend.rocksdb.localdir。这是什么目的?(我看到我不能使用 s3)另外,如果 RocksDB 使用本地机器存储来做某事,当我使用 Kubernetes 并且我的 pod 突然失败时会怎么样?我应该使用持久存储吗?

另一件事,我不确定我是否正确理解了所有国家的事情。检查点是否保存了我的所有状态?例如,当我使用 AggregationFunction 并且应用程序失败时,当应用程序恢复时,每个键的聚合值是否会恢复?

stream-processing rocksdb apache-flink

2
推荐指数
1
解决办法
1468
查看次数

外行来说什么是流处理和 Kafka 流?

要了解什么是kafka-streams我应该知道什么是stream-processing。当我开始在网上阅读它们时,我无法掌握整体情况,因为它是一个永无止境的新概念链接树。
谁能stream-processing用一个简单的现实世界的例子来解释什么?
以及如何将其kafka-streams与生产者消费者架构联系起来?

谢谢你。

stream-processing apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
146
查看次数

TumblingWindow 中的 Flink AggregateFunction 自动分割为两个窗口以实现大窗口大小

我正在使用不同的窗口大小计算一些记录的简单平均值。使用1 小时1 周的窗口没有问题,并且结果计算正确。

var keyed = src
        .filter(event -> event.getSensor_id() < 10000)
        .keyBy(Event::getSensor_id);

var hourResult = keyed
        .window(TumblingEventTimeWindows.of(Time.hours(1)))
        .aggregate(new AvgQ1(Config.HOUR))
        .setParallelism(5);

var weekResult = keyed
        .window(TumblingEventTimeWindows.of(Time.days(7)))
        .aggregate(new AvgQ1(Config.WEEK))
        .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

相反,使用1 个月(31 天)的窗口,窗口被分成两半,flink 给出两个结果作为输出,一个用于 05-1 到 05-14 的记录,另一个用于从 05-15 到 05-14 的记录。 31.

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(31)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

使用大小为 30 天的窗口,结果会分为 (05-1;05-27) 和 (05-28;05-31)。

SingleOutputOperator<OutputQuery> monthResult = keyed
    .window(TumblingEventTimeWindows.of(Time.days(30)))
    .aggregate(new AvgQ1(Config.MONTH))
    .setParallelism(5);
Run Code Online (Sandbox Code Playgroud)

这是AggregateFunction.

public class AvgQ1 implements AggregateFunction<Event, AccumulatorQ1, OutputQuery> { …
Run Code Online (Sandbox Code Playgroud)

java stream-processing apache-kafka docker apache-flink

2
推荐指数
1
解决办法
618
查看次数

如何在 Flink 流处理窗口中收集延迟数据

假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码来做到这一点:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Run Code Online (Sandbox Code Playgroud)

Point数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的后8个约数,例如1531569851297将映射到1531569851296

但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎或至少延迟小于窗口时间(8 毫秒),这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段)已到达,延迟时间为 30 毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:

  • 如何在数据流想要进入窗口时过滤数据流并检查数据是否在窗口的正确时间戳创建?
  • 如何将如此晚的数据收集到变量中以对它们进行一些处理?

stream-processing windowing apache-flink

1
推荐指数
1
解决办法
2452
查看次数

在字符串数据中只保留字母数字字符

我在流中获取字符串数据,我只想保留字母数字字符。我注意到 Siddhi 提供了一个 regexp 函数,正如这里提到的。但问题是它返回一个布尔值而不是修改后的字符串。有没有办法直接获取修改后的字符串?这是我的代码。

@App:name("strtest")
@App:description("Description of the plan")

-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor. 

define stream InboundStream(ipstring string);

@sink(type='log', prefix='Modified string')
define stream Opstream(ropstring bool);

from InboundStream 
select str:regexp(ipstring, "^A-Za-z0-9") as ropstring insert into Opstream;
Run Code Online (Sandbox Code Playgroud)

是否有返回修改后的正则表达式字符串的函数?

stream-processing siddhi event-stream-processing wso2sp

0
推荐指数
1
解决办法
123
查看次数