标签: apache-flink

从 IDE 运行 flink 时如何设置 presto.s3.xxx 属性?

我能够成功运行我的 flink 作业,该作业使用./bin/flink run ....

为此,我必须将 flink-s3-fs-presto jar 复制到我的$FLINK_HOME/lib文件夹中,并且我还必须在我的以下文件中配置我的 S3 连接详细信息flink-conf.yaml

你需要在 Flink 的 flink-conf.yaml 中同时配置 s3.access-key 和 s3.secret-key :

s3.access-key: your-access-key
s3.secret-key: your-secret-key
Run Code Online (Sandbox Code Playgroud)

来源:flink aws 文档

我还必须设置一个属性,s3.endpoint因为我使用的是来自 IBM Cloud 的 S3。

当我使用./bin/flink run.

但是,当我尝试从 IDE (IntelliJ) 运行我的作业时,出现以下错误:

org.apache.flink.runtime.client.JobExecutionException:无法初始化任务“DataSink(TextOutputFormat(s3://xxxx/folder)-UTF-8)”:无法从服务端点加载凭据

我在 IDE 运行作业中设置了一个环境变量,FLINK_CONF_DIR指向我的 flink-conf.yaml,我可以看到我的配置属性被选中:

11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.access-key, ****
11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.secret-key, ****
11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration …
Run Code Online (Sandbox Code Playgroud)

apache-flink

1
推荐指数
1
解决办法
1346
查看次数

Apache Flink:如何在摄取时间模式下获取事件的时间戳?

我想知道是否可以通过使用Flink的摄取时间模式来获取记录的时间戳。考虑以下 flink 代码示例(https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples /join/WindowJoinSampleData.scala ),

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)

val joined = joinStreams(grades, salaries, windowSize)

...
case class Grade(name: String, level: Int) 
case class Salary(name: String, salary: Int)
Run Code Online (Sandbox Code Playgroud)

默认情况下,等级和工资都不包含时间戳字段。但是,由于 Flink 允许使用“ingestionTime”为数据流中的记录分配挂钟时间戳,是否有可能在运行时获得这样的时间戳?例如,这是我想要做的:

val oldDatastream = env.addSource...  // Using ingestion time
val newDatastream = oldDatastream.map{record =>   
    val ts = getRecordTimestamp(record)
    // do some thing with ts
    }
Run Code Online (Sandbox Code Playgroud)

谢谢你的帮助。

stream apache-flink flink-streaming

1
推荐指数
1
解决办法
1783
查看次数

Apache Flink:状态后端在哪里保存状态?

我在下面得到了一个声明:

“根据您的状态后端,Flink 还可以管理应用程序的状态,这意味着 Flink 处理内存管理(如有必要,可能会溢出到磁盘)以允许应用程序保持非常大的状态。”

https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state_backends.html

这是否意味着只有当状态后端配置为 时RocksDBStateBackend,状态才会保留在内存中并可能在必要时溢出到磁盘?

但是,如果配置为MemoryStateBackendFsStateBackend,则状态仅保留在内存中,永远不会溢出到磁盘。

apache-flink flink-streaming

1
推荐指数
1
解决办法
402
查看次数

非法的反射访问操作会阻止程序运行吗?

我正在开发一个使用 java 和 flink 构建推文数据流的项目。当我运行程序时,我收到此错误:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.core.memory.HybridMemorySegment (file:/home/haydn/.m2/repository/org/apache/flink/flink-core/1.4.2/flink-core-1.4.2.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.flink.core.memory.HybridMemorySegment
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Run Code Online (Sandbox Code Playgroud)

该程序似乎在之后运行,所以我只是想知道这个错误是否意味着任何事情都不会按预期工作,或者一切都会好起来。

我注意到其他帖子中人们就如何隐藏消息提出了建议,这就是为什么我认为这并不重要。

谢谢 :)

java apache-flink

1
推荐指数
1
解决办法
928
查看次数

Flink 中的事件时间窗口不触发

当我使用 flink 事件时间窗口时,该窗口不会触发。请问如何解决问题,有什么方法可以调试吗?

apache-flink flink-streaming flink-sql

1
推荐指数
1
解决办法
1635
查看次数

Flink 中的多个作业或一个作业中的多个管道

我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流看起来像

Source1 -> operator1 -> Sink1

Source2 -> operator2 -> Sink2

我想为两个流重新使用相同的 Flink 集群。我可以想到通过两种方式来做到这一点:

1) 在同一个 Flink 应用程序上提交 2 个不同的作业

2) 在同一个作业中设置 2 个管道

我能够设置第一个选项,但不确定如何执行第二个选项。有没有人试过这样的设置?一个比另一个有什么优势?

apache-flink flink-streaming

1
推荐指数
1
解决办法
2383
查看次数

Flink计时器未按时执行

这是一个后续问题:状态到期时触发

我在流中存储每个传入元素的状态,在计时器关闭后,我删除状态。这样我就可以防止处理重复项,直到元素超时,然后我可以再次处理相同的元素。一世

我编写了以下代码来测试计时器,但似乎在所有 3 个元素都通过第一个ProcessFunction.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    streamEnv.setParallelism(12);

    List<Tuple2<String, String>> inputList = new ArrayList<>();
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));

    streamEnv.fromCollection(inputList).keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Integer> occur;

                @Override
                public void open(Configuration parameters) throws Exception {
                    occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
                }

                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (occur.value() < 2) { …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

1
推荐指数
1
解决办法
1332
查看次数

在 Flink 中,我可以在同一个槽中拥有一个操作符的多个子任务吗?

这几天在摸索Apache Flink,对Task Slot的概念有些疑惑。虽然已经问了几个问题,但有一点我不明白。

我正在使用玩具应用程序进行测试,运行本地集群。我已禁用操作员链接

我从文档中知道插槽允许内存隔离而不是 CPU 隔离。阅读文档,似乎 Task Slot 是一个 Java 线程。

1) 当我使用 parallelism=1 部署我的应用程序时,所有操作员的子任务都部署在同一个插槽中。但是,如果我从 的open()方法打印当前线程 ID AbstractStreamOperator,我会看到不同子任务的不同 ID。那么,它们不是共享同一个线程(即插槽?)。

2) 如果我将并行度从 1 更改为 3,我需要 3 个插槽才能正确重新部署应用程序。文档确认插槽数限制了我可以拥有的并行度。但是为什么我可以在同一个槽里有不同算子的子任务,而在同一个槽里不能有同一个算子的子任务呢?

感谢您的任何解释!

apache-flink

1
推荐指数
1
解决办法
513
查看次数

Apache Flink:IDE 执行中的作业恢复未按预期工作

我有一个WordCount用 Flink (Scala) 编写的示例流示例。在其中,我想使用外部化检查点来在发生故障时进行恢复。但它没有按预期工作。

我的代码如下:

object WordCount {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment
      .getExecutionEnvironment
      .setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))

    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000)

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // prevent the tasks from failing if an error happens in …
Run Code Online (Sandbox Code Playgroud)

apache-flink checkpointing flink-streaming

1
推荐指数
1
解决办法
649
查看次数

Flink 什么时候使用 CoProcess 函数?

我只是想了解何时在 Flink 中使用 CoProcessFunction 的用例。用一个例子来解释将帮助我更好地理解这个概念。

apache-flink flink-streaming

1
推荐指数
1
解决办法
1734
查看次数