有谁知道,在远程 Flink 集群上运行程序时出现以下错误的根源是什么?
我该如何解决?
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
at org.apache.flink.api.java.RemoteEnvironment.<init>(RemoteEnvironment.java:118)
at org.apache.flink.api.java.RemoteEnvironment.<init>(RemoteEnvironment.java:78)
at org.apache.flink.api.java.ExecutionEnvironment.createRemoteEnvironment(ExecutionEnvironment.java:1155)
at org.apache.flink.test.myProj.main(myProj.java:133)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
... 6 more
Run Code Online (Sandbox Code Playgroud) 我使用 dop > 1 执行程序,但我不需要多个输出文件。在 Java 中myDataSet.writeAsText(outputFilePath, WriteMode.OVERWRITE).setParallelism(1);正在按预期工作。
但当我在 Python 中尝试同样的方法时,它不起作用。这是我的代码: myDataSet.write_text(output_file, write_mode=WriteMode.OVERWRITE).set_degree_of_parallelism(1)
有可能在Python中实现这种行为吗?
在阅读了flink文档之后(下面提到的相关部分)我仍然没有完全理解原子性和密钥分布。
\n\n即考虑一个由 keyby->flatmap(包含映射状态)组成的图,并行度设置为 1,有 4 个任务槽,flink 是否确保每个键在分布式环境中只存在一次(在一个任务槽中),是吗?原子单位?\n提前感谢所有帮助者。
\n\n\n\n您可以将键控状态视为已分区或分片的操作员状态,每个键只有一个状态分区。每个键控状态在逻辑上都绑定到 的唯一组合
\n\n<parallel-operator-instance, key>,并且由于每个键 \xe2\x80\x9c 都属于\xe2\x80\x9d 恰好是键控运算符的一个并行实例,因此我们可以将其简单地视为<operator, key>。键控状态进一步组织成所谓的键组。Key Groups 是 Flink 可以重新分配 Keyed State 的原子单元;关键组的数量与定义的最大并行度完全相同。在执行期间,键控运算符的每个并行实例都使用一个或多个键组的键。
\n
我有一个 Apache Flink 设置,其中包含一个 TaskManager 和两个处理槽。当我执行并行度设置为 1 的应用程序时,该作业的执行时间约为 33 秒。当我将并行度增加到 2 时,该作业需要 45 秒才能完成。
我在 Windows 机器上使用 Flink,配置为 10 个计算核心(4C + 6G)。我想用 2 个插槽取得更好的结果。我能做些什么?
我想用Flink 和 Kafka 运行集成测试。该过程是从 Kafka 读取数据,使用 Flink 进行一些操作,然后将数据流放入 Kafka 中。
我想从头到尾测试这个过程。现在我使用scalatest-embedded-kafka。
我在这里举了一个例子,我试图尽可能简单:
import java.util.Properties
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}
import scala.collection.mutable.ListBuffer
object SimpleFlinkKafkaTest {
class CollectSink extends SinkFunction[String] {
override def invoke(string: String): Unit = {
synchronized {
CollectSink.values += string
}
}
}
object CollectSink {
val values: ListBuffer[String] = ListBuffer.empty[String]
}
val kafkaPort = 9092
val zooKeeperPort = 2181
val props = …Run Code Online (Sandbox Code Playgroud) integration-testing scala apache-kafka apache-flink embedded-kafka
假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码来做到这一点:
aggregatedTuple
.keyBy( 0).timeWindow(Time.milliseconds(8))
.reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Run Code Online (Sandbox Code Playgroud)
Point:数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的后8个约数,例如1531569851297将映射到1531569851296。
但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎或至少延迟小于窗口时间(8 毫秒),这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段)已到达,延迟时间为 30 毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:
我已经使用 Flink CEP 实现了一个匹配三个事件的模式,例如A->B->C. 在我定义了我的模式后,我生成了一个
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
与PatternSelectFunction这样的
patternStream.select(new MyPatternSelectFunction()).print();
这就像一个魅力,但我对所有匹配事件的事件时间感兴趣。我知道传统的 Flink 流 API 提供了丰富的功能,允许您注册 Flink 的内部延迟跟踪器,如本问题所述。我还看到 Flink 1.8RichPatternSelectFunction添加了一个新功能。但不幸的是我无法使用 Flink CEP 设置 Flink 1.8。
最后,有没有办法获取所有匹配事件的事件时间?
我试图了解 Apache Flink 仪表板显示的“接收/发送的字节数”的含义。对于某些上下文,CSV 文件托管在 HDFS 服务器上,我将结果写入本地计算机上的 TXT 文件。Flink 也在我的机器上本地运行。考虑到这一点,“发送的字节数”似乎意味着“从 HDFS 服务器发送到我的机器的字节数”,“接收的字节数”似乎意味着“从我的机器发送到 HDFS 服务器的字节数”。这是正确的解释吗?
我对时间线显示的重叠任务也有点困惑。奇怪的是,连接在前两个数据集的过滤完成之前就开始了。这是预期的行为吗?如果是,为什么?
以下是我针对正在发生的事情的一些背景的执行计划。
Flink 中 KeyBy 和 GroupBy 的异同点是什么?如果在 Table only 程序中使用 Table/SQL API,GroupBy 是否等同于 KeyBy?
FsStateBackend. But somehow I was getting the following error.Error
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Run Code Online (Sandbox Code Playgroud)
Flink version: I am using Flink 1.10.0 version.
amazon-s3 checkpoint apache-flink checkpointing flink-streaming
apache-flink ×10
amazon-s3 ×1
apache-kafka ×1
checkpoint ×1
flink-cep ×1
flink-sql ×1
python ×1
scala ×1
windowing ×1