java.lang.NoSuchMethodError:
org.apache.flink.api.common.ExecutionConfig.setRestartStrategy(Lorg/apache/flink/api/common/restartstrategy/RestartStrategies$RestartStrategyConfiguration;)
在 com.WriteIntoKafka.main(WriteIntoKafka.java:53)
在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在 java.lang.reflect.Method .invoke(Method.java:606)
在 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
在 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
在 org.apache.flink.client.program.Client.runBlocking(Client.java:252)
在 org.apache.flink.client .CliFrontend.executeProgramBlocking(CliFrontend.java:676)
在org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
在org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
在org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
这是我的一些查询:
我有两个不同的流stream1,stream2其中的元素是按顺序排列的。
1)现在,当我keyBy在这些流中的每一个上执行操作时,顺序会保持不变吗?(由于这里的每个小组将仅发送给一个任务管理器)我的理解是,记录将按小组的顺序排列,请在此处更正。
2)在keyBy两个流上之后,我正在共同分组以获取匹配和不匹配的记录。会将订单也保持在这里?因为这也适用于KeyedStream。我正在使用EventTime,并AscendingTimestampExtractor用于生成timestamp和watermark。
3)现在,我想matching_nonMatchingStream使用map / flatmap对2)得到的序列执行序列检查。我是否需要再次执行keyBy此处操作,或者如果保持连锁状态,则matching_nonMatchingStream运行是否相同TaskManager?我的理解是,链条将在这里起作用,纠正我,变得困惑。
4)slotSharingGroup-您能否根据文档详细说明:设置此操作的插槽共享组。TaskManager如果可能,在同一插槽共享组中的并行操作实例将位于同一插槽中。
我正在寻找一种可以:
我看了一下apache spark:它可以读取新添加的文件,并且可以处理重启以从它离开的地方继续.我找不到办法使它也处理同一作业范围内的旧文件(所以只有1和3).
我看了一下apache flink:它确实处理了新旧文件.但是,一旦重新启动作业,它将再次开始处理所有作业(1和2).
这是一个非常常见的用例.我错过了火花/叮当的东西,这使得它成为可能吗?还有其他工具可以在这里使用吗?
在创建flink检查点时,我能否以某种方式获取更多详细信息?创建flink检查点的时间因子10而异.这至少是我的jobmanager logfile告诉我的:
2017-02-14 09:03:43,234 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 161 @ 1487059423232
2017-02-14 09:08:17,447 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 161 (in 271425 ms).
2017-02-14 09:08:43,233 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 162 @ 1487059723233
2017-02-14 09:09:46,684 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 162 (in 61951 ms).
Run Code Online (Sandbox Code Playgroud)
检查点161只需要一分钟即可创建,而下一个检查点162需要4.5分钟(它持续8分钟而不是回到1.5分钟).
如何在进行 bazel 构建时传递参数。实际上我的程序将接受两个参数。一个是目录,另一个是 target.csv 我需要获取目录下的文件并将其写入 csv。
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collector;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.utils.ParameterTool;
public class ReadFiles {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// set up the execution environment
try {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // without this multiple files creating
env.getConfig().setGlobalJobParameters(params);
List<String> paths = new ArrayList<String>();
File dir = new File(params.getRequired("input"));
for (File f …Run Code Online (Sandbox Code Playgroud) 我的目标是通过群集GUI中的“程序参数”字段将args传递给Flink作业的Main()函数。
并以某种方式在Main()函数中访问它们(最好是通过键名):
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
CustomProps props = new CustomProps (DEFAULT_PROPERTIES_FILE);
String kafkaAutoOffsetReset = props.getKafkaAutoOffsetReset();
String cassandraClusterUrl = props.getCassandraClusterUrl();
if (args.length == 1 && args[0] != null) {
cassandraClusterUrl = parameter.get("cassandraClusterUrl");
kafkaAutoOffsetReset = parameter.get("kafkaOffset");
}
//Other code...
}
Run Code Online (Sandbox Code Playgroud)
我已经尝试过“ ParameterTool”,但没有任何帮助,如果尝试以下操作:
kafkaAutoOffsetReset = args[0];
Run Code Online (Sandbox Code Playgroud)
仅当我在“程序参数”字段中仅输入一个单词时,它才有效。所以,如果我把:
blah
Run Code Online (Sandbox Code Playgroud)
它说它设置为“等等”,但是如果我尝试以下任何一种方法:
-kafkaOffset blah
--kafkaOffset blah
-kafkaOffset:blah
-kafkaOffset=blah
Run Code Online (Sandbox Code Playgroud)
我什么都没有。我知道在CLI中,如何将args传递给jar的示例是:
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
Run Code Online (Sandbox Code Playgroud)
但是,似乎缺少使用GUI的另一种方式,但是我未能找到与此相关的文档。
TL; DR
通过Flink群集GUI中的“程序参数”字段提交多个arg的正确方法是什么,以及在Main()函数中访问它们的正确方法是什么?
感谢您提前提供的所有帮助!
我正在测试 Flink 1.3.0 中的弹性特性。我有一份启用检查点和固定延迟重启策略的工作。当我杀死其中一个 TaskManager JVM 时,一段时间后作业在剩余节点上正确重新启动。但是,当我添加新节点时,该作业不会自动重新启动以使用它。
我尝试使用,bin/flink stop <jobId>但它总是给我java.lang.IllegalStateException: Job with ID <jobId> is not stoppable.
如何重新启动作业以使用附加节点?
是否可以在运行时配置flink应用程序?例如,我有一个流应用程序,该应用程序读取输入,进行一些转换,然后过滤掉低于某个阈值的所有元素。但是,我希望此阈值在运行时可配置,这意味着我可以更改此阈值而不必重新启动flink作业。示例代码:
DataStream<MyModel> myModelDataStream = // get input ...
// do some stuff ...
.filter(new RichFilterFunction<MyModel>() {
@Override
public boolean filter(MyModel value) throws Exception {
return value.someValue() > someGlobalState.getThreshold();
}
})
// write to some sink ...
DataStream<MyConfig> myConfigDataStream = // get input ...
// ...
.process(new RichProcessFunction<MyConfig>() {
someGlobalState.setThreshold(MyConfig.getThreshold());
})
// ...
Run Code Online (Sandbox Code Playgroud)
是否有可能实现这一目标?例如,可以通过配置流更改的全局状态。
我正在尝试在Flink中的KeyedStream上执行地图操作:
stream.map(new JsonToMessageObjectMapper())
.keyBy("keyfield")
.map(new MessageProcessorStateful())
Run Code Online (Sandbox Code Playgroud)
JsonToObjectMapper运算符的输出是MessageObject类的POJO,它具有String字段' keyfield '.然后将该流键入此字段.
MessageProcessorStateful是一个RichMapFunction,如下所示:
public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {
private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
...
@Override
public void open(Configuration config) throws Exception {
MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
new MapStateDescriptor<>(
"state", // the state name
TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
state = getRuntimeContext().getMapState(descriptor);
state.put(...); // Insert a key, value here. Exception here!
} …Run Code Online (Sandbox Code Playgroud) 我想知道是否可以使用Flink Kafka接收器根据事件的类型编写不同主题的事件?假设我们有不同类型的事件:通知,消息和好友请求。我们希望将这些事件流式传输到不同的主题,这些主题分别是:notification-topic,messages-topic,friendsRequest-topic。
我尝试了多种方法来解决此问题,但仍然找不到正确的解决方案。我听说可以使用,ProcessFunction但如何将其与我的问题联系起来?
apache-flink ×10
java ×3
apache-spark ×1
bazel ×1
bigdata ×1
build ×1
flink-cep ×1
hadoop ×1
hdfs ×1
streaming ×1