标签: flink-streaming

使用 Flink 获取 DataStream 的文件名

我有一个带有 flink 的流处理过程,可以在单个路径中处理 csv 文件。我想知道每个处理文件的文件名。

我目前正在使用此函数将 csv 文件读入路径(dataPath)。

val recs:DataStream[CallCenterEvent] = env
          .readFile[CallCenterEvent](
          CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
          dataPath,
          FileProcessingMode.PROCESS_CONTINUOUSLY,
          c._2.fileInterval)
          .uid("source-%s-%s".format(systemConfig.name, c._1))
          .name("%s records reading".format(c._1))
Run Code Online (Sandbox Code Playgroud)

并使用此函数获取 TupleCsvInputFormat。

def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
  val typeInfo = implicitly[TypeInformation[T]]
  val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
  if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
    format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
  format.setFieldDelimiter(conf.fieldDelimiter)
  format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
  format.setLenient(true)

  return format
}       
Run Code Online (Sandbox Code Playgroud)

该过程运行正常,但我找不到获取处理的每个 csv 文件的文件名的方法。

提前致谢

streaming scala data-stream apache-flink flink-streaming

6
推荐指数
1
解决办法
714
查看次数

如何调试Flink中的可序列化异常?

我遇到了几个可序列化的异常,并且我在Flink的互联网和文档上进行了一些搜索;有一些著名的解决方案,如瞬态、扩展序列化等。每次异常的起源都非常清楚,但就我而言,我无法找到它到底在哪里没有序列化。

问:遇到这种异常应该如何调试?

A.斯卡拉:

class executor ( val sink: SinkFunction[List[String]] {
    def exe(): Unit = {
        xxx.....addSink(sinks)
    }
}
Run Code Online (Sandbox Code Playgroud)

B.scala:

class Main extends App {
  def createSink: SinkFunction[List[String]] = new StringSink()

  object StringSink {
    // static
    val stringList: List[String] = List()
  }

  // create a testing sink
  class StringSink extends SinkFunction[List[String]] {
    override def invoke(strs: List[String]): Unit = {
        // add strs into the variable "stringList" of the compagin object StringSink
    }
  }

  new executor(createSink()).exe()

  // then do …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming flink-cep

6
推荐指数
1
解决办法
8974
查看次数

Apache Flink:为什么选择 MemoryStateBackend 而不是 FsStateBackend?

Flink 有一个MemoryStateBackend和一个FsStateBackend(和一个RocksDBStateBackend)。两者似乎都扩展了HeapKeyedStateBackend,即存储当前工作状态的机制完全相同。

这个SO answer说主要区别在于在MemoryStateBackendJobManagers 内存中保留检查点的副本。(我无法从源代码中收集到任何证据。)这MemoryStateBackend也限制了每个子任务的最大状态大小。

现在我想知道:你为什么要使用MemoryStateBackend?

apache-flink flink-streaming

6
推荐指数
1
解决办法
713
查看次数

Flink Windows 边界、水印、事件时间戳和处理时间

问题定义和建立概念

比方说,我们有一个TumblingEventTimeWindow大小5分钟。我们有包含2 条基本信息的事件:

  • 数字
  • 事件时间戳

在这个例子中,我们在工作人员机器的挂钟时间下午 12:00启动我们的Flink拓扑(当然工作人员可能有不同步的时钟,但这超出了本问题的范围)。该拓扑包含一个处理运算符,其职责是汇总属于每个窗口的事件值和一个与此问题无关的 KAFKA Sink。

  • 这个窗口有一个BoundedOutOfOrdernessTimestampExtractor,允许延迟一分钟
  • 水印:据我所知,Flink 和 Spark Structured Stream 中的水印定义为(max-event-timestamp-seen-so-far - allowed-lateness)。任何事件时间戳小于或等于此水印的事件都将在结果计算中被丢弃和忽略。

第 1 部分(确定窗口的边界)

快乐(实时)路径

在这种情况下,几个事件到达Flink Operator,具有不同的事件时间戳12:01 - 12:09。此外,事件时间戳与我们的处理时间相对一致(如下面的 X 轴所示)。由于我们正在处理EVENT_TIME特性,因此应通过其事件时间戳来确定偶数是否属于特定事件。

在此处输入图片说明

旧数据涌入

在那个流程中,我假设两个翻滚窗口边界是并且仅仅因为我们在12:00开始执行拓扑。如果这个假设是正确的(我希望不是),那么在回填情况下会发生什么,其中几个事件带有更旧的事件时间戳,并且我们在12:00再次启动了拓扑?(足够老,我们的迟到津贴不包括他们)。类似于以下内容:12:00 -- …

watermark stream-processing apache-flink flink-streaming

6
推荐指数
1
解决办法
1060
查看次数

使用 CassandrSink 的 Flink 作业因写入错误而失败

我有两个从 Kafka 读取的简单 Flink 流作业执行一些转换并将结果放入 Cassandra Sink。他们从不同的 Kafka 主题中读取数据并保存到不同的 Cassandra 表中。

当我单独运行这两项工作中的任何一项时,一切正常。检查点被触发并完成,数据被保存到 Cassandra。

但是,当我运行两个作业(或其中一个运行两次)时,第二个作业在启动时失败,出现以下异常: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing)).

我找不到有关此错误的太多信息,它可能是由以下任一原因引起的:

  • Flink (v 1.10.0-scala_2.12),
  • Flink Cassandra 连接器(flink-connector-cassandra_2.11:jar:1.10.2,也试过 flink-connector-cassandra_2.12:jar:1.10.0),
  • Datastax 底层驱动程序 (v 3.10.2),
  • Cassandra v4.0(与v3.0相同),
  • Netty 传输 (v 4.1.51.Final)。

我还使用可能与第一个冲突的包:

  • mysql-connector-java (v 8.0.19),
  • cassandra-driver-extras (v 3.10.2)

最后,这是我的集群构建器代码:

ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = null;
        try {
            cluster = builder
                    .addContactPoint("localhost")
                    .withPort(9042)
                    .withClusterName("Test Cluster")
                    .withoutJMXReporting() …
Run Code Online (Sandbox Code Playgroud)

cassandra datastax-java-driver apache-flink flink-streaming

6
推荐指数
1
解决办法
509
查看次数

Flink中如何连接2个以上的流?

我有 3 个不同类型的键控数据流。

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
Run Code Online (Sandbox Code Playgroud)

每个流都定义了自己的处理逻辑,并在它们之间共享状态。我想连接这 3 个流,只要任何流中有数据可用,就会触发相应的处理函数。可以连接两个流。

first.connect(second).process(<CoProcessFunction>)
Run Code Online (Sandbox Code Playgroud)

我无法使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。

apache-flink flink-streaming

6
推荐指数
2
解决办法
3592
查看次数

pyflink(flink) 1.12.0 当表通过 to_append_stream 转换为数据流时出现错误(java api 是:toAppendStream)

非常感谢您的帮助!

\n

代码:

\n
from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo\nfrom pyflink.table import EnvironmentSettings, StreamTableEnvironment\n\n# stream \xe6\xa8\xa1\xe5\xbc\x8f\xe7\x9a\x84env\xe5\x88\x9b\xe5\xbb\xba\nenv_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()\nenv_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)\n\ntable1 = env_stream.from_elements([(1, 23.4, \'lili\'), (2, 33.4, \'er\'), (3, 45.6, \'yu\')], [\'id\', \'order_amt\', \'name\'])\ntable2 = env_stream.from_elements([(1, 43.4, \'xixi\'), (2, 53.4, \'rr\'), (3, 65.6, \'ww\')], [\'id2\', \'order_amt2\', \'name\'])\n\n# types: List[TypeInformation], field_names: List[str]\n# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], [\'id\', \'order_amt\', \'name\'])\nrow_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])\n\n\nstream = env_stream.to_append_stream(table1, row_type_info)\n
Run Code Online (Sandbox Code Playgroud)\n

错误信息:

\n
Traceback (most recent call last):\n  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco\n …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming pyflink

6
推荐指数
0
解决办法
458
查看次数

没有启用检查点的数据接收器的 Flink 管道

我正在研究构建一个没有数据接收器的 flink 管道。即,当我的管道成功对数据存储进行 api 调用时,它就会结束。

在这种情况下,如果我们不使用接收器运算符,检查点将如何工作?

因为检查点基于检查点前纪元(所有保存在状态中或发送到接收器中的事件)和检查点后纪元的概念。Flink 管道是否需要接收器?

apache-flink flink-streaming

6
推荐指数
1
解决办法
1232
查看次数

在嵌入式 Flinkrunner (apache_beam [GCP]) 中使用 pub/sub io 运行光束流管道 (Python) 时出错

我在 Flinkrunner 上的 Apache Beam 中运行流管道 (python) 时遇到以下错误。该管道包含 GCP pub/sub io 源和 pub/sub 目标。

WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.

ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "23 Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
Traceback (most recent call last):
  File "<stdin>", line 5, in <module>
  File "/usr/local/lib64/python3.6/site-packages/apache_beam/pipeline.py", line 586, in __exit__
    self.result.wait_until_finish()
  File "/usr/local/lib64/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 599, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-swarna0kpaul-0712135603-763999c_45da372e-757d-4690-8e25-1a5ed0a5cc84 failed in state FAILED: …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-pubsub flink-streaming apache-beam

6
推荐指数
1
解决办法
893
查看次数

尝试从检查点重新启动时作业卡住

语境

我们使用 Flink 运行许多从 Kafka 读取数据的流作业,执行一些 SQL 转换并将输出写入 Kafka。它在 Kubernetes 上运行,有两个作业管理器和许多任务管理器。我们的作业使用 RocksDB 的检查点,并且我们的检查点写入 AWS S3 中的存储桶上。

最近,我们从 Flink 1.13.1 升级到 Flink 1.15.2。我们使用保存点机制来停止我们的作业并在新版本上重新启动它们。我们有两个 Kubernetes 集群。搬家后,他们俩似乎一切都很好。但几天后(第一个集群几乎一个月,第二个集群需要 2 或 3 天),我们现在遇到了其他问题(这可能与迁移到 Flink 1.15 有关,也可能与后来发生的迁移无关)。

问题描述

最近,我们注意到有一些作业无法启动。我们看到执行图中的“源”任务保持已创建状态,而图中的所有其他任务(ChangelogNormalize、Writer)都在运行。作业定期重新启动并出现错误(为了便于阅读而简化了堆栈跟踪):

java.lang.Exception: Cannot deploy task Source: source_consumer -> *anonymous_datastream_source$81*[211] (1/8) (de8f109e944dfa92d35cdc3f79f41e6f) - TaskManager (<address>) not responding after a rpcTimeout of 10000 ms
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:602)
    ...
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@<address>/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

6
推荐指数
1
解决办法
1160
查看次数