标签: apache-flink

apache flink:如何解释 DataStream.print 输出?

我是 Flink 的新手,试图了解如何最有效地使用它。

我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此

case class IncomingDataUnit (
sensorUUID: String, radiationLevel: Int,photoSensor: Float,
humidity: Float,timeStamp: Long, ambientTemperature: Float)
  extends Serializable {

}
Run Code Online (Sandbox Code Playgroud)

而且,这就是我阅读行的方式:

env.readTextFile(inputPath).map(datum => {
      val fields = datum.split(",")
      IncomingDataUnit(
        fields(0),              // sensorUUID
        fields(1).toInt,        // radiationLevel
        fields(2).toFloat,      // photoSensor
        fields(3).toFloat,      // humidity
        fields(4).toLong,       // timeStamp
        fields(5).toFloat       // ambientTemperature
      )
    })
Run Code Online (Sandbox Code Playgroud)

后来,使用一个简单的窗口,我尝试打印最大环境温度,因此:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
      .trigger(CountTrigger.of(5))
      .evictor(CountEvictor.of(4))
      .max(1)

readings.print
Run Code Online (Sandbox Code Playgroud)

输出包含这些(来自一堆 DEBUG 日志语句):

1> (probe-987f2cb6,29.43)
1> (probe-987f2cb6,29.43)
3> (probe-dccefede,30.02) …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink flink-streaming

2
推荐指数
1
解决办法
4115
查看次数

Flink中如何对WindowedStream进行自定义操作?

我想在 Flink 中的 WindowedStream 上执行一些操作,比如平均操作。但可用的预定义操作非常有限,例如求和、最小值、最大值等。

val windowedStream = valueStream
                          .keyBy(0)
                          .timeWindow(Time.minutes(5))
                          .sum(2) //Change this to average?
Run Code Online (Sandbox Code Playgroud)

假设我想求平均值,我该怎么做呢?

apache-flink flink-streaming

2
推荐指数
1
解决办法
399
查看次数

如何迭代 Flink DataStream 中的每条消息?

我有来自 Kafka 的消息流,如下所示

DataStream<String> messageStream = env
  .addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));
Run Code Online (Sandbox Code Playgroud)

如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法,DataStream但它不返回Iterator<String>.

apache-flink flink-streaming

2
推荐指数
1
解决办法
2041
查看次数

Flink错误:java.lang.NoSuchMethodError:org.apache.flink.api.table.Table

我尝试使用 flink 的表和 sql api 作为一个简单的示例,我从文件中读取字符串,将其转换为 Tuple2 并尝试将其插入表中。这是我的代码。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class table_streaming_test
{
    public static void main (String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
        StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env); 
        env.setParallelism(1);
        DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
         DataStream<Tuple2<String,Integer>> ds=  datastream_in
             .flatMap(new Splitter());  // transformation flatmap
         Table msg=tEnv.fromDataStream(ds).as("symbol,price");
         Table result = msg.select("symbol ='A'");
         DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
         ds2.print();
         env.execute();
    }
public static class Splitter implements FlatMapFunction<String,     Tuple2<String, …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

2
推荐指数
1
解决办法
9185
查看次数

Flink(Kafka 来源)如何管理偏移量?

我正在使用 Flink FlinkKafkaConsumer09,我想知道 kafka 消费者的偏移量存储在哪里?

我在 Zookeeper 和 Kafka 的偏移主题中都找不到它们。kafka-consumer-offset工具也找不到。

这是 Flink 内部处理的吗?

apache-kafka apache-flink

2
推荐指数
1
解决办法
1965
查看次数

Flink 插槽/并行度与最大 CPU 能力

我试图理解 .yaml 文档中 flink 插槽和并行配置背后的逻辑。

Flink 官方文档指出,对于 cpu 中的每个核心,您必须分配 1 个插槽,同时将并行度提高 1 个。

但我认为这只是一个建议。例如,如果我有一个强大的核心(例如具有最大 GHz 的最新 i7),那么它与具有有限 GHz 的旧 CPU 不同。因此,运行比我的系统 CPU 最大核心数更多的插槽和并行度并不是不合理的。

但是,除了测试不同的配置之外,还有其他方法来检查我的系统使用 flink 的最大功能吗?

仅供记录,我使用 Flink 的 Batch Python API。

python parallel-processing apache-flink

2
推荐指数
1
解决办法
2230
查看次数

是否可以在 apache flink CEP 中处理多个流?

我的问题是,如果我们有两个原始事件流,即烟雾温度,并且我们想通过将运算符应用于原始流来查明复杂事件(即火灾)是否发生,我们可以在 Flink 中执行此操作吗?

我问这个问题是因为到目前为止我所看到的 Flink CEP 的所有示例都只包含一个输入流。如果我错了,请纠正我。

apache-flink flink-cep

2
推荐指数
1
解决办法
1989
查看次数

Apache Flink 状态函数 python vs java 性能

开发apache flink有状态函数时使用python或java有什么优缺点。

  • 有什么性能差异吗?对于相同的操作,哪一种更有效?
  • 我们可以完全在python上开发应用程序吗?
  • 一个支持哪些功能,另一个不支持哪些功能。

apache-flink flink-statefun

2
推荐指数
1
解决办法
467
查看次数

Flink 在 Kubernetes 和 Native Kubernetes 上的部署有何不同

b/w Native KubernetesKubernetes部署的主要区别是什么?

我是 Kubernetes 的新手,并试图了解它们上的 Flink 部署有何不同。如果对内部结构有任何见解,那将有很大帮助。

kubernetes apache-flink flink-streaming

2
推荐指数
1
解决办法
456
查看次数

失败消息:使用 apache flink 1.11 时检查点在完成之前已过期

我最近在 kubernetes 集群中将我的 Apache Flink 升级到 1.11 版,但今天我发现一个任务检查点总是失败。该任务从 RabbitMQ 读取数据并计算结果并调用 HTTP 请求将数据保存到 MySQL。这是任务管理器错误日志输出:

2020-08-21 15:53:28
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

这是 Apache Flink UI 错误消息:

Failure Message: Checkpoint expired before completing.
Run Code Online (Sandbox Code Playgroud)

作业总是在一段时间内重新启动。我有 2 个任务,另一个任务检查点保持成功。那么问题出在哪里,我应该怎么做才能解决这个问题?

在此处输入图片说明

这是我的 flink 配置:

 public static void initEnv(StreamExecutionEnvironment env) {
        env.setParallelism(1);
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        StateBackend stateBackend = new …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
3349
查看次数