标签: apache-flink

Flink Stream 窗口内存使用情况

我正在评估 Flink,专门针对可能生成警报的流窗口支持。我担心的是内存使用情况,因此如果有人可以提供帮助,我们将不胜感激。

例如,该应用程序可能会在给定的滚动窗口(例如 5 分钟)内消耗来自流的大量数据。在评估时,如果有一百万个文档符合标准,它们是否都会被加载到内存中?

一般流程是:

producer -> kafka -> flinkkafkaconsumer -> table.window(Tumble.over("5.minutes").select("...").where("...").writeToSink(someKafkaSink)

此外,如果有一些明确的文档描述了在这些情况下如何处理内存,我可能会忽略有人可能会有所帮助。

谢谢

apache-flink flink-streaming

2
推荐指数
1
解决办法
1303
查看次数

flink 解析地图中的 JSON:InvalidProgramException:任务不可序列化

我正在 Flink 项目上工作,想将源 JSON 字符串数据解析为 Json 对象。我正在使用jackson-module-scala进行 JSON 解析。但是,我在 Flink API 中使用 JSON 解析器时遇到了一些问题(map例如)。

以下是代码的一些示例,我无法理解其行为背后的原因。

情况一:

在这种情况下,我正在做jackson-module-scala 的官方 exmaple 代码告诉我要做的事情

  1. 创建一个新的ObjectMapper
  2. 注册DefaultScalaModule

    DefaultScalaModule是一个 Scala 对象,包含对所有当前支持的 Scala 数据类型的支持。

  3. 调用readValue以将 JSON 解析为Map

我得到的错误是:org.apache.flink.api.common.InvalidProgramException:Task not serializable

object JsonProcessing {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.readTextFile("xxx")

    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)
    val …
Run Code Online (Sandbox Code Playgroud)

serialization scala jackson apache-flink flink-streaming

2
推荐指数
1
解决办法
2024
查看次数

Flink 中的 windowAll 算子是否会将并行度缩小到 1?

我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。

该流通过 4 个线程并行化。

如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)

问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?

当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:

45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)

这是该程序的代码片段:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new …
Run Code Online (Sandbox Code Playgroud)

java stream-processing apache-flink flink-streaming

2
推荐指数
1
解决办法
1669
查看次数

Apache Flink - DataSet API - 如何将 n 个结果分组在一起

我们正在使用 Apache Flink(1.4.2) 进行批处理,出于性能原因,我们希望在输出之前对 100 个项目进行分组,而不是直接输出每个项目。

如果我们要使用 DataStream API,我们将能够使用诸如翻滚窗口之类的东西(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html #翻滚窗口

但这在进行批处理时不可用。

是否可以使用 DataSet Api 或许通过某些 group/reduce 函数来做到这一点?

batch-processing apache-flink

2
推荐指数
1
解决办法
517
查看次数

Apache Flink:执行环境和多个接收器

我的问题可能会引起一些混乱,因此请先查看说明。确定我的问题可能会有所帮助。我稍后将在问题末尾添加我的代码(也欢迎有关我的代码结构/实现的任何建议)。\n感谢您提前提供的任何帮助!

\n\n

我的问题:

\n\n
    \n
  1. 如何在 Flink Batch 处理中定义多个接收器而不让它重复从一个源获取数据?

  2. \n
  3. createCollectionEnvironment()和 和有什么区别getExecutionEnvironment()?我应该在本地环境中使用哪一个?

  4. \n
  5. 有什么用env.execute()?我的代码将输出没有这句话的结果。如果我添加这句话,它会弹出一个异常:

  6. \n
\n\n

-

\n\n
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to \'execute()\', \'count()\', \'collect()\', or \'print()\'. \n    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) \n    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) \n    at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) \n    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) \n    at MainClass.main(MainClass.java:114)\n
Run Code Online (Sandbox Code Playgroud)\n\n

描述: \n编程新手。最近我需要使用 Flink Batch 处理一些数据(对数据进行分组、计算标准差等)。\n但是我遇到了需要输出两个 DataSet 的情况。\n结构是这样的

\n\n
\n

从 …

java apache-flink

2
推荐指数
1
解决办法
1万
查看次数

在 Apache Flink 服务器上哪里可以找到我使用 Apache Flink 仪表板提交的 jar

我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的罐子的路径,但无法。Flink 是否将这些 jar 文件保留在服务器上?如果有的话,我在哪里可以找到?有什么文档吗?请帮忙。谢谢!

apache-flink

2
推荐指数
1
解决办法
2725
查看次数

Apache Flink:文件 STDOUT 在 TaskExecutor 上不可用

我使用官方 flink 存储库中的以下 docker-compose.yml 启动了 flink。我只添加了到外部hadoop网络的连接。

version: "2.1"

networks:
  hadoop:
    external:
      name: flink_hadoop

services:
  jobmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-jobmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-taskmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)

此后一切运行,我可以访问 WebUI。

然后我打包了以下工作。

import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie

object HdfsJob {
  private lazy val logger = LoggerFactory.getLogger(getClass)

  def …
Run Code Online (Sandbox Code Playgroud)

logging scala docker apache-flink

2
推荐指数
1
解决办法
1万
查看次数

如何使用 Prometheus 指标监控 Grafana 中的 Flink 背压

Flink Web UI 有一个出色的背压部分。但我看不到 Prometheus 记者给出的任何指标,这些指标可用于以与 Grafana 仪表板相同的方式检测背压。

在此输入图像描述 有没有办法在 Flink Web UI 之外获取相同的指标?使用此处描述的指标https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html。或者甚至有一个 prometheus scraper 来抓取 web api?

grafana apache-flink prometheus flink-streaming

2
推荐指数
1
解决办法
1340
查看次数

Flink 应用程序中的延迟监控

我正在寻找有关延迟监控的帮助(flink 1.8.0)。

假设我有一个简单的流数据流,具有以下运算符:FlinkKafkaConsumer -> Map -> print。

如果我想测量数据流中记录处理的延迟,最好的机会是什么?我想获取处理源中接收到的输入的持续时间,直到接收器/完成接收器操作接收到输入为止。

我添加了我的代码: env.getConfig().setLatencyTrackingInterval(100);

然后,可以使用以下延迟指标:

在此输入图像描述

但我不明白他们到底在测量什么?此外,据我所知,延迟平均值似乎与延迟无关。

我还尝试使用 codahale 指标来获取某些方法的持续时间,但这并不能帮助我获取在整个管道中处理的记录的延迟。

该解决方案与 LatencyMarker 相关吗?如果是,我如何在接收器操作中到达它以检索它?

谢谢,罗伊。

monitoring metrics latency apache-flink flink-streaming

2
推荐指数
1
解决办法
2655
查看次数

Flink 中的算子是什么?操作员状态和键控状态有何不同?

根据我的理解,Flink 中运算符的示例有 Source 运算符、Transformation 运算符等。我对 Flink 中运算符的理解是否正确?

在算子状态中,Flink 是维护每个算子的状态(例如每个作业/任务的 map()、reduce() 等)还是维护一个完整作业/任务的状态?另外,如果我的作业以多个并行方式提交,每个槽是否都有自己的状态?

假设,我提交了两个带键流的作业,并且两个作业具有相同的键“颜色”,但两个作业完全不同。Flink 是否会维护两种不同的状态,或者为这两项工作维护一种状态。

apache-flink

2
推荐指数
1
解决办法
3313
查看次数