标签: flink-streaming

Flink错误:java.lang.NoSuchMethodError:org.apache.flink.api.table.Table

我尝试使用 flink 的表和 sql api 作为一个简单的示例,我从文件中读取字符串,将其转换为 Tuple2 并尝试将其插入表中。这是我的代码。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class table_streaming_test
{
    public static void main (String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
        StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env); 
        env.setParallelism(1);
        DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
         DataStream<Tuple2<String,Integer>> ds=  datastream_in
             .flatMap(new Splitter());  // transformation flatmap
         Table msg=tEnv.fromDataStream(ds).as("symbol,price");
         Table result = msg.select("symbol ='A'");
         DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
         ds2.print();
         env.execute();
    }
public static class Splitter implements FlatMapFunction<String,     Tuple2<String, …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

2
推荐指数
1
解决办法
9185
查看次数

Flink Stream 窗口内存使用情况

我正在评估 Flink,专门针对可能生成警报的流窗口支持。我担心的是内存使用情况,因此如果有人可以提供帮助,我们将不胜感激。

例如,该应用程序可能会在给定的滚动窗口(例如 5 分钟)内消耗来自流的大量数据。在评估时,如果有一百万个文档符合标准,它们是否都会被加载到内存中?

一般流程是:

producer -> kafka -> flinkkafkaconsumer -> table.window(Tumble.over("5.minutes").select("...").where("...").writeToSink(someKafkaSink)

此外,如果有一些明确的文档描述了在这些情况下如何处理内存,我可能会忽略有人可能会有所帮助。

谢谢

apache-flink flink-streaming

2
推荐指数
1
解决办法
1303
查看次数

flink 解析地图中的 JSON:InvalidProgramException:任务不可序列化

我正在 Flink 项目上工作,想将源 JSON 字符串数据解析为 Json 对象。我正在使用jackson-module-scala进行 JSON 解析。但是,我在 Flink API 中使用 JSON 解析器时遇到了一些问题(map例如)。

以下是代码的一些示例,我无法理解其行为背后的原因。

情况一:

在这种情况下,我正在做jackson-module-scala 的官方 exmaple 代码告诉我要做的事情

  1. 创建一个新的ObjectMapper
  2. 注册DefaultScalaModule

    DefaultScalaModule是一个 Scala 对象,包含对所有当前支持的 Scala 数据类型的支持。

  3. 调用readValue以将 JSON 解析为Map

我得到的错误是:org.apache.flink.api.common.InvalidProgramException:Task not serializable

object JsonProcessing {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.readTextFile("xxx")

    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)
    val …
Run Code Online (Sandbox Code Playgroud)

serialization scala jackson apache-flink flink-streaming

2
推荐指数
1
解决办法
2024
查看次数

Flink 中的 windowAll 算子是否会将并行度缩小到 1?

我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。

该流通过 4 个线程并行化。

如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)

问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?

当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:

45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)

这是该程序的代码片段:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new …
Run Code Online (Sandbox Code Playgroud)

java stream-processing apache-flink flink-streaming

2
推荐指数
1
解决办法
1669
查看次数

如何使用 Prometheus 指标监控 Grafana 中的 Flink 背压

Flink Web UI 有一个出色的背压部分。但我看不到 Prometheus 记者给出的任何指标,这些指标可用于以与 Grafana 仪表板相同的方式检测背压。

在此输入图像描述 有没有办法在 Flink Web UI 之外获取相同的指标?使用此处描述的指标https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html。或者甚至有一个 prometheus scraper 来抓取 web api?

grafana apache-flink prometheus flink-streaming

2
推荐指数
1
解决办法
1340
查看次数

Flink 应用程序中的延迟监控

我正在寻找有关延迟监控的帮助(flink 1.8.0)。

假设我有一个简单的流数据流,具有以下运算符:FlinkKafkaConsumer -> Map -> print。

如果我想测量数据流中记录处理的延迟,最好的机会是什么?我想获取处理源中接收到的输入的持续时间,直到接收器/完成接收器操作接收到输入为止。

我添加了我的代码: env.getConfig().setLatencyTrackingInterval(100);

然后,可以使用以下延迟指标:

在此输入图像描述

但我不明白他们到底在测量什么?此外,据我所知,延迟平均值似乎与延迟无关。

我还尝试使用 codahale 指标来获取某些方法的持续时间,但这并不能帮助我获取在整个管道中处理的记录的延迟。

该解决方案与 LatencyMarker 相关吗?如果是,我如何在接收器操作中到达它以检索它?

谢谢,罗伊。

monitoring metrics latency apache-flink flink-streaming

2
推荐指数
1
解决办法
2655
查看次数

MapFunction 的实现不可序列化 Flink

我正在尝试实现一个类,该类使用户能够操作 N 个输入流,而不受输入流类型的限制。

首先,我想将所有输入数据流转换为 keyedStreams。因此,我将输入数据流映射到元组中,然后应用 KeyBy 将其转换为密钥流。

我总是遇到序列化问题,我尝试遵循本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html但它不起作用。

我想知道的是:

  1. Java 中的序列化/反序列化是什么?以及用途。
  2. 在 Flink 中通过序列化可以解决哪些问题
  3. 我的代码有什么问题(您可以在代码和错误消息下面找到)

非常感谢。

主要类别:

public class CEP {

private  Integer streamsIdComp = 0;
final  private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final  private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    return Tuple2.of(streamsIdComp, value);
                }
            }). …
Run Code Online (Sandbox Code Playgroud)

java serialization apache-flink flink-streaming

2
推荐指数
1
解决办法
6077
查看次数

在无密钥的 Flink 流中实施良好平衡的并行性

根据我对Flink的理解,它引入了基于键(键组)的并行性。然而,假设一个人有大量未加密的流并且希望并行完成工作,那么实现这一目标的最佳方法是什么?

如果流有一些字段,人们可能会考虑任意地使用其中一个字段进行键控,但这并不能保证工作负载能够正确平衡。例如,因为该字段中的一个值可能出现在 90% 的消息中。因此我的问题是:

如何在不事先了解流中内容的情况下在 Flink 中实施良好平衡的并行性


我能想到的一个可能的解决方案是为每条消息分配一个随机数(如果您希望并行度为 3,则为 1-3;如果您希望并行度更灵活,则为 1-1000)。然而,我想知道这是否是推荐的方法,因为它感觉不太优雅。

parallel-processing apache-flink flink-streaming

2
推荐指数
1
解决办法
922
查看次数

AggregateFunction 中 merge 方法的含义

我试图理解 Flink 中的 AggregateFunction ,这里描述了这一点。它总共有四种方法,即

  1. 创建累加器
  2. 添加
  3. 获取结果
  4. 合并

据我了解,

createAccumulator当第一个元素进入新窗口时调用该方法,并且新创建的实例将被进一步使用

addcreateAccumulator调用方法以根据定义减少结果,这使用在方法中创建的实例

getResult当窗口关闭时调用方法并返回可用结果

我对上述方法的理解是否正确?最后,方法的用例是什么merge以及何时使用/调用它?这里的定义对我来说并不清楚。

apache-flink flink-streaming

2
推荐指数
1
解决办法
772
查看次数

在处理函数之后,带键的流是否变为不带键的?

假设我在键控过程后得到一个流。

DataStream<T> stream= sourceStream.keyBy(key).window(window).apply(function);
Run Code Online (Sandbox Code Playgroud)

生成的流是否仍被键入?我可以在该流中使用某些 Keyed 状态吗?

apache-flink flink-streaming

2
推荐指数
1
解决办法
1138
查看次数