标签: apache-flink

flink 流 NoSuchMethodError: org.apache.flink.api.common.ExecutionConfig.setRestartStrategy

java.lang.NoSuchMethodError:

org.apache.flink.api.common.ExecutionConfig.setRestartStrategy(Lorg/apache/flink/api/common/restartstrategy/RestartStrategies$RestartStrategyConfiguration;)
在 com.WriteIntoKafka.main(WriteIntoKafka.java:53)
在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在 java.lang.reflect.Method .invoke(Method.java:606)
在 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
在 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
在 org.apache.flink.client.program.Client.runBlocking(Client.java:252)
在 org.apache.flink.client .CliFrontend.executeProgramBlocking(CliFrontend.java:676)
在org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
在org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
在org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)

apache-flink flink-streaming

1
推荐指数
1
解决办法
1279
查看次数

流中记录的顺序

这是我的一些查询:

我有两个不同的流stream1stream2其中的元素是按顺序排列的。

1)现在,当我keyBy在这些流中的每一个上执行操作时,顺序会保持不变吗?(由于这里的每个小组将仅发送给一个任务管理器)我的理解是,记录将按小组的顺序排列,请在此处更正。

2)在keyBy两个流上之后,我正在共同分组以获取匹配和不匹配的记录。会将订单也保持在这里?因为这也适用于KeyedStream。我正在使用EventTime,并AscendingTimestampExtractor用于生成timestampwatermark

3)现在,我想matching_nonMatchingStream使用map / flatmap对2)得到的序列执行序列检查。我是否需要再次执行keyBy此处操作,或者如果保持连锁状态,则matching_nonMatchingStream运行是否相同TaskManager?我的理解是,链条将在这里起作用,纠正我,变得困惑。

4)slotSharingGroup-您能否根据文档详细说明:设置此操作的插槽共享组。TaskManager如果可能,在同一插槽共享组中的并行操作实例将位于同一插槽中。

apache-flink flink-streaming

1
推荐指数
1
解决办法
2866
查看次数

寻找一种方法来连续处理写入hdfs的文件

我正在寻找一种可以:

  1. 监视新文件的hdfs dir并在它们出现时处理它们.
  2. 它还应该在作业/应用程序开始工作之前处理目录中的文件.
  3. 它应该有检查点,以便在重新启动时从它离开的地方继续.

我看了一下apache spark:它可以读取新添加的文件,并且可以处理重启以从它离开的地方继续.我找不到办法使它也处理同一作业范围内的旧文件(所以只有1和3).

我看了一下apache flink:它确实处理了新旧文件.但是,一旦重新启动作业,它将再次开始处理所有作业(1和2).

这是一个非常常见的用例.我错过了火花/叮当的东西,这使得它成为可能吗?还有其他工具可以在这里使用吗?

hadoop bigdata hdfs apache-spark apache-flink

1
推荐指数
1
解决办法
582
查看次数

为什么flink检查点的创建差异如此之大

在创建flink检查点时,我能否以某种方式获取更多详细信息?创建flink检查点的时间因子10而异.这至少是我的jobmanager logfile告诉我的:

2017-02-14 09:03:43,234 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 161 @ 1487059423232
2017-02-14 09:08:17,447 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 161 (in 271425 ms).
2017-02-14 09:08:43,233 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 162 @ 1487059723233
2017-02-14 09:09:46,684 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 162 (in 61951 ms).
Run Code Online (Sandbox Code Playgroud)

检查点161只需要一分钟即可创建,而下一个检查点162需要4.5分钟(它持续8分钟而不是回到1.5分钟).

  • 这可能与我使用的一些TumblingWindows有关(窗口持续时间为3分钟)?
  • 有没有办法影响频率(和位置)旁边的检查点创建?

apache-flink

1
推荐指数
1
解决办法
324
查看次数

如何在使用 bazel 构建时传递参数

如何在进行 bazel 构建时传递参数。实际上我的程序将接受两个参数。一个是目录,另一个是 target.csv 我需要获取目录下的文件并将其写入 csv。

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collector;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.utils.ParameterTool;


public class ReadFiles {

  /**
   * @param args
   */
  public static void main(String[] args) throws Exception {

    // set up the execution environment
    try {
      final ParameterTool params = ParameterTool.fromArgs(args);
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      env.setParallelism(1); // without this multiple files creating
      env.getConfig().setGlobalJobParameters(params);

      List<String> paths = new ArrayList<String>();
      File dir = new File(params.getRequired("input"));
      for (File f …
Run Code Online (Sandbox Code Playgroud)

build bazel apache-flink

1
推荐指数
1
解决办法
6253
查看次数

Flink是什么通过群集GUI将args提交到作业的正确方法是什么?

我的目标是通过群集GUI中的“程序参数”字段将args传递给Flink作业的Main()函数。在此处输入图片说明

并以某种方式在Main()函数中访问它们(最好是通过键名):

public static void main(String[] args) throws Exception {

    ParameterTool parameter = ParameterTool.fromArgs(args);

    CustomProps props = new CustomProps (DEFAULT_PROPERTIES_FILE);

    String kafkaAutoOffsetReset = props.getKafkaAutoOffsetReset();
    String cassandraClusterUrl = props.getCassandraClusterUrl();

    if (args.length == 1 && args[0] != null) {

        cassandraClusterUrl = parameter.get("cassandraClusterUrl");
        kafkaAutoOffsetReset = parameter.get("kafkaOffset");
    }

    //Other code...

}
Run Code Online (Sandbox Code Playgroud)

我已经尝试过“ ParameterTool”,但没有任何帮助,如果尝试以下操作:

kafkaAutoOffsetReset = args[0];
Run Code Online (Sandbox Code Playgroud)

仅当我在“程序参数”字段中仅输入一个单词时,它才有效。所以,如果我把:

blah
Run Code Online (Sandbox Code Playgroud)

它说它设置为“等等”,但是如果我尝试以下任何一种方法:

-kafkaOffset blah
--kafkaOffset blah
-kafkaOffset:blah
-kafkaOffset=blah
Run Code Online (Sandbox Code Playgroud)

我什么都没有。我知道在CLI中,如何将args传递给jar的示例是:

--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
Run Code Online (Sandbox Code Playgroud)

但是,似乎缺少使用GUI的另一种方式,但是我未能找到与此相关的文档。

TL; DR

通过Flink群集GUI中的“程序参数”字段提交多个arg的正确方法是什么,以及在Main()函数中访问它们的正确方法是什么?

感谢您提前提供的所有帮助!

java user-interface apache-flink

1
推荐指数
2
解决办法
2154
查看次数

如何重新启动 flink 作业以使用添加的 TaskManager

我正在测试 Flink 1.3.0 中的弹性特性。我有一份启用检查点和固定延迟重启策略的工作。当我杀死其中一个 TaskManager JVM 时,一段时间后作业在剩余节点上正确重新启动。但是,当我添加新节点时,该作业不会自动重新启动以使用它。

我尝试使用,bin/flink stop <jobId>但它总是给我java.lang.IllegalStateException: Job with ID <jobId> is not stoppable.

如何重新启动作业以使用附加节点?

java apache-flink flink-streaming

1
推荐指数
1
解决办法
1886
查看次数

如何在运行时配置flink作业?

是否可以在运行时配置flink应用程序?例如,我有一个流应用程序,该应用程序读取输入,进行一些转换,然后过滤掉低于某个阈值的所有元素。但是,我希望此阈值在运行时可配置,这意味着我可以更改此阈值而不必重新启动flink作业。示例代码:

DataStream<MyModel> myModelDataStream = // get input ...
                // do some stuff ...
                .filter(new RichFilterFunction<MyModel>() {
                    @Override
                    public boolean filter(MyModel value) throws Exception {
                        return value.someValue() > someGlobalState.getThreshold();
                    }
                })
                // write to some sink ...

DataStream<MyConfig> myConfigDataStream = // get input ...
                // ...
                .process(new RichProcessFunction<MyConfig>() {
                      someGlobalState.setThreshold(MyConfig.getThreshold());
                })
                // ...
Run Code Online (Sandbox Code Playgroud)

是否有可能实现这一目标?例如,可以通过配置流更改的全局状态。

apache-flink flink-streaming

1
推荐指数
1
解决办法
564
查看次数

Flink键控流密钥为空

我正在尝试在Flink中的KeyedStream上执行地图操作:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())
Run Code Online (Sandbox Code Playgroud)

JsonToObjectMapper运算符的输出是MessageObject类的POJO,它具有String字段' keyfield '.然后将该流键入此字段.

MessageProcessorStateful是一个RichMapFunction,如下所示:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    } …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

1
推荐指数
1
解决办法
956
查看次数

Apache Flink:如何根据事件类型将事件接收到不同的Kafka主题?

我想知道是否可以使用Flink Kafka接收器根据事件的类型编写不同主题的事件?假设我们有不同类型的事件:通知,消息和好友请求。我们希望将这些事件流式传输到不同的主题,这些主题分别是:notification-topic,messages-topic,friendsRequest-topic。

我尝试了多种方法来解决此问题,但仍然找不到正确的解决方案。我听说可以使用,ProcessFunction但如何将其与我的问题联系起来?

streaming apache-flink flink-streaming flink-cep

1
推荐指数
1
解决办法
484
查看次数