我能够成功运行我的 flink 作业,该作业使用./bin/flink run ....
为此,我必须将 flink-s3-fs-presto jar 复制到我的$FLINK_HOME/lib文件夹中,并且我还必须在我的以下文件中配置我的 S3 连接详细信息flink-conf.yaml:
你需要在 Flink 的 flink-conf.yaml 中同时配置 s3.access-key 和 s3.secret-key :
Run Code Online (Sandbox Code Playgroud)s3.access-key: your-access-key s3.secret-key: your-secret-key来源:flink aws 文档
我还必须设置一个属性,s3.endpoint因为我使用的是来自 IBM Cloud 的 S3。
当我使用./bin/flink run.
但是,当我尝试从 IDE (IntelliJ) 运行我的作业时,出现以下错误:
org.apache.flink.runtime.client.JobExecutionException:无法初始化任务“DataSink(TextOutputFormat(s3://xxxx/folder)-UTF-8)”:无法从服务端点加载凭据
我在 IDE 运行作业中设置了一个环境变量,FLINK_CONF_DIR指向我的 flink-conf.yaml,我可以看到我的配置属性被选中:
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.access-key, ****
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.secret-key, ****
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration …Run Code Online (Sandbox Code Playgroud) 我想知道是否可以通过使用Flink的摄取时间模式来获取记录的时间戳。考虑以下 flink 代码示例(https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples /join/WindowJoinSampleData.scala ),
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)
val joined = joinStreams(grades, salaries, windowSize)
...
case class Grade(name: String, level: Int)
case class Salary(name: String, salary: Int)
Run Code Online (Sandbox Code Playgroud)
默认情况下,等级和工资都不包含时间戳字段。但是,由于 Flink 允许使用“ingestionTime”为数据流中的记录分配挂钟时间戳,是否有可能在运行时获得这样的时间戳?例如,这是我想要做的:
val oldDatastream = env.addSource... // Using ingestion time
val newDatastream = oldDatastream.map{record =>
val ts = getRecordTimestamp(record)
// do some thing with ts
}
Run Code Online (Sandbox Code Playgroud)
谢谢你的帮助。
我在下面得到了一个声明:
“根据您的状态后端,Flink 还可以管理应用程序的状态,这意味着 Flink 处理内存管理(如有必要,可能会溢出到磁盘)以允许应用程序保持非常大的状态。”
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state_backends.html
这是否意味着只有当状态后端配置为 时RocksDBStateBackend,状态才会保留在内存中并可能在必要时溢出到磁盘?
但是,如果配置为MemoryStateBackend或FsStateBackend,则状态仅保留在内存中,永远不会溢出到磁盘。
我正在开发一个使用 java 和 flink 构建推文数据流的项目。当我运行程序时,我收到此错误:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.core.memory.HybridMemorySegment (file:/home/haydn/.m2/repository/org/apache/flink/flink-core/1.4.2/flink-core-1.4.2.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.flink.core.memory.HybridMemorySegment
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Run Code Online (Sandbox Code Playgroud)
该程序似乎在之后运行,所以我只是想知道这个错误是否意味着任何事情都不会按预期工作,或者一切都会好起来。
我注意到其他帖子中人们就如何隐藏消息提出了建议,这就是为什么我认为这并不重要。
谢谢 :)
当我使用 flink 事件时间窗口时,该窗口不会触发。请问如何解决问题,有什么方法可以调试吗?
我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流看起来像
Source1 -> operator1 -> Sink1
Source2 -> operator2 -> Sink2
我想为两个流重新使用相同的 Flink 集群。我可以想到通过两种方式来做到这一点:
1) 在同一个 Flink 应用程序上提交 2 个不同的作业
2) 在同一个作业中设置 2 个管道
我能够设置第一个选项,但不确定如何执行第二个选项。有没有人试过这样的设置?一个比另一个有什么优势?
这是一个后续问题:状态到期时触发
我在流中存储每个传入元素的状态,在计时器关闭后,我删除状态。这样我就可以防止处理重复项,直到元素超时,然后我可以再次处理相同的元素。一世
我编写了以下代码来测试计时器,但似乎在所有 3 个元素都通过第一个ProcessFunction.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
streamEnv.setParallelism(12);
List<Tuple2<String, String>> inputList = new ArrayList<>();
inputList.add(new Tuple2<>("Test", "test"));
inputList.add(new Tuple2<>("Test", "test"));
inputList.add(new Tuple2<>("Test", "test"));
streamEnv.fromCollection(inputList).keyBy(0)
.process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
ValueState<Integer> occur;
@Override
public void open(Configuration parameters) throws Exception {
occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
if (occur.value() < 2) { …Run Code Online (Sandbox Code Playgroud) 这几天在摸索Apache Flink,对Task Slot的概念有些疑惑。虽然已经问了几个问题,但有一点我不明白。
我正在使用玩具应用程序进行测试,运行本地集群。我已禁用操作员链接
我从文档中知道插槽允许内存隔离而不是 CPU 隔离。阅读文档,似乎 Task Slot 是一个 Java 线程。
1) 当我使用 parallelism=1 部署我的应用程序时,所有操作员的子任务都部署在同一个插槽中。但是,如果我从 的open()方法打印当前线程 ID AbstractStreamOperator,我会看到不同子任务的不同 ID。那么,它们不是共享同一个线程(即插槽?)。
2) 如果我将并行度从 1 更改为 3,我需要 3 个插槽才能正确重新部署应用程序。文档确认插槽数限制了我可以拥有的并行度。但是为什么我可以在同一个槽里有不同算子的子任务,而在同一个槽里不能有同一个算子的子任务呢?
感谢您的任何解释!
我有一个WordCount用 Flink (Scala) 编写的示例流示例。在其中,我想使用外部化检查点来在发生故障时进行恢复。但它没有按预期工作。
我的代码如下:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in …Run Code Online (Sandbox Code Playgroud) 我只是想了解何时在 Flink 中使用 CoProcessFunction 的用例。用一个例子来解释将帮助我更好地理解这个概念。