标签: apache-flink

Apache Flink JobManager 无法找到自定义指标报告器

我使用的是 Apache Flink 版本1.13.1

我编写了一个自定义指标报告器,但 JobManager 似乎无法识别它。启动时,JobManager 显示以下警告日志:

2021-08-25 14:54:06,243 WARN  org.apache.flink.runtime.metrics.ReporterSetup               [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-08-25 14:54:06,245 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl          [] - No metrics reporter configured, no metrics will be exposed/reported.
Run Code Online (Sandbox Code Playgroud)

我在 Flink 插件文件夹中有一个名为的文件夹metrics-kafka,其中包含指标报告器的打包 jar。我也把这个jar复制到lib文件夹中,这两个都不起作用。请参阅下面使用的配置和代码。

Flink配置文件

2021-08-25 14:54:06,243 WARN  org.apache.flink.runtime.metrics.ReporterSetup               [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

3
推荐指数
1
解决办法
1281
查看次数

2022年Flink可以支持什么Java版本?

假设我开始一个新的 Flink Java 项目,如果我寻找“稳定的 Flink Java 生产体验”,我应该使用哪个版本?官方文档说它从 Flink 1.10 开始可以支持 Java-11,但很多用户仍在使用 Java-8,因此尝试了解我是否需要使用AdoptOpenJDK-8AdoptOpenJDK-11

java apache-flink

3
推荐指数
1
解决办法
6222
查看次数

Flink:内存使用情况

我正在测量Flink中的应用程序(WordCount)的内存使用情况ps -p TaskManagerPID -o rss.然而,结果没有任何意义.因为对于每个数据量(1MB,10MB,100MB,1GB,10GB),所使用的内存量相同.对于10GB数据,测量结果甚至小于10GB.TaskManager是否是测量内存使用量的错误过程?Flink Process Model的哪个进程负责内存分配?

apache-flink

2
推荐指数
1
解决办法
2064
查看次数

类'SessionTrigger'必须声明为abstract或实现抽象成员

我正在构建一个关于Flink 1.2的教程,我想运行一些简单的窗口示例.其中之一是Session Windows.

我想要运行的代码如下:

import <package>.Session
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

import scala.util.Try

object SessionWindowExample {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source = env.socketTextStream("localhost", 9000)

    //session map
    val values = source.map(value => {
      val columns = value.split(",")
      val endSignal = Try(Some(columns(2))).getOrElse(None)
      Session(columns(0), columns(1).toDouble, endSignal)
    })

    val keyValue = values.keyBy(_.sessionId)

    // create global window

    val sessionWindowStream = keyValue.
      window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(new SessionTrigger[GlobalWindow]()))

    sessionWindowStream.sum("value").print()

    env.execute()
  }
}
Run Code Online (Sandbox Code Playgroud)

正如您将注意到的,我需要new SessionTrigger基于此类实例化我所做的对象:

import <package>.Session
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import …
Run Code Online (Sandbox Code Playgroud)

abstract-class scala apache-flink

2
推荐指数
1
解决办法
882
查看次数

Flink使用Gradle代替Maven的利弊

我正在做一个项目,在该项目中我制作了一个flink数据流程序,并将代码的第一次运行交给另一个团队,以便他们查看。他们都是gradle的大用户,问我在将来的flink项目中使用gradle代替maven是否有任何不利之处或好处。

现在我知道有很多关于Maven和Gradle比较的文章,但是我的意思是专门针对Apache Flink。我发现有两个git hub仓库,乍一看就像他们在flink项目上实现了gradle一样,仅此而已。

而且,如果您能说出您使用flink的经验,那也将是有益的。

所以我想知道的是:

1.)是否有类似“如果flink项目试图切换到gradle,x,y和z根本无法工作”的内容?

2)将flink项目切换到gradle会比在maven和gradle之间切换任何其他Java项目(应该在Java中)更麻烦吗?

3.)gradle的repo是否具有与maven的repo相同的flink依赖项?

4.)您建议进行开关还是不进行开关?

感谢您提前提供的所有帮助!

java gradle maven apache-flink

2
推荐指数
1
解决办法
598
查看次数

30秒似乎是1小时30秒?

我想验证我的时间戳.我附上了代码片段:

import org.apache.flink.streaming.api.windowing.time.Time;   

Date date = new Date(Time.seconds(30).toMilliseconds());
DateFormat formatter = new SimpleDateFormat("HH:mm:ss:SSS");
String dateFormatted = formatter.format(date);
logger.debug(Time.seconds(30).toMilliseconds() + " " + dateFormatted);
Run Code Online (Sandbox Code Playgroud)

事实证明,结果是:

30000 01:00:30:000
Run Code Online (Sandbox Code Playgroud)

编辑:

   [user@hdp ~]$ date
   Thu Jun 29 11:39:27 CEST 2017
Run Code Online (Sandbox Code Playgroud)

我错过了什么?谢谢!

java time apache-flink

2
推荐指数
1
解决办法
81
查看次数

从ListState删除选择项

我正在实现具有大状态(可能不适合内存)的自定义运算符。我试图为此目的使用ListState。我在用

checkpointedState = context.getOperatorStateStore().getListState(descriptor);
Run Code Online (Sandbox Code Playgroud)

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state中所述

在上面的链接中实现snapshotState()会清除checkpointedState,然后将内存数据结构中的元素添加到checkpointedState。

相反,我在snapshotState()中需要以下内容:

  1. 删除checkpointedState的特定条目,而不是clear()。
  2. 将内存数据结构中的新元素添加到checkpointedState。

有什么方法可以有选择地从ListState中删除项目吗?

apache-flink

2
推荐指数
1
解决办法
365
查看次数

未找到org.apache.flink.streaming.api.scala.DataStream的Apache Flink-类文件

使用Apache Flink版本1.3.2和Cassandra 3.11,我编写了一个简单的代码,使用Apache Flink Cassandra连接器将数据写入Cassandra.以下是代码:

final Collection<String> collection = new ArrayList<>(50);
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<UUID, String>> dataStream = env
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<UUID, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<UUID, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return new Tuple2(
                                UUID.randomUUID(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });
        dataStream.print();
        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.phases (id, …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

2
推荐指数
1
解决办法
1447
查看次数

Apache Flink中保存的默认检查点在哪里?

我是Apache Flink的新手,并且正在研究Apache Flink的示例。我发现,如果发生故障,Flink可以从检查点恢复流处理。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000L);
Run Code Online (Sandbox Code Playgroud)

现在,我的问题是Flink在默认情况下将检查点保留在哪里?

任何帮助表示赞赏!

java apache-flink checkpointing flink-streaming

2
推荐指数
1
解决办法
699
查看次数

将检查点链接到Google Cloud Storage

我正在尝试为GCS中的flink作业配置检查点。如果我在本地运行测试作业(没有docker和任何群集设置),则一切正常,但是如果我使用docker-compose或群集设置运行它,并在flink仪表板中部署带有工作的胖子,则它会失败并显示错误。

有什么想法吗?谢谢!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
Run Code Online (Sandbox Code Playgroud)

环境配置是这样的:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend …
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage apache-flink google-cloud-dataproc

2
推荐指数
1
解决办法
882
查看次数