我有一个带有 flink 的流处理过程,可以在单个路径中处理 csv 文件。我想知道每个处理文件的文件名。
我目前正在使用此函数将 csv 文件读入路径(dataPath)。
val recs:DataStream[CallCenterEvent] = env
.readFile[CallCenterEvent](
CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
dataPath,
FileProcessingMode.PROCESS_CONTINUOUSLY,
c._2.fileInterval)
.uid("source-%s-%s".format(systemConfig.name, c._1))
.name("%s records reading".format(c._1))
Run Code Online (Sandbox Code Playgroud)
并使用此函数获取 TupleCsvInputFormat。
def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
val typeInfo = implicitly[TypeInformation[T]]
val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
format.setFieldDelimiter(conf.fieldDelimiter)
format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
format.setLenient(true)
return format
}
Run Code Online (Sandbox Code Playgroud)
该过程运行正常,但我找不到获取处理的每个 csv 文件的文件名的方法。
提前致谢
我遇到了几个可序列化的异常,并且我在Flink的互联网和文档上进行了一些搜索;有一些著名的解决方案,如瞬态、扩展序列化等。每次异常的起源都非常清楚,但就我而言,我无法找到它到底在哪里没有序列化。
问:遇到这种异常应该如何调试?
A.斯卡拉:
class executor ( val sink: SinkFunction[List[String]] {
def exe(): Unit = {
xxx.....addSink(sinks)
}
}
Run Code Online (Sandbox Code Playgroud)
B.scala:
class Main extends App {
def createSink: SinkFunction[List[String]] = new StringSink()
object StringSink {
// static
val stringList: List[String] = List()
}
// create a testing sink
class StringSink extends SinkFunction[List[String]] {
override def invoke(strs: List[String]): Unit = {
// add strs into the variable "stringList" of the compagin object StringSink
}
}
new executor(createSink()).exe()
// then do …Run Code Online (Sandbox Code Playgroud) Flink 有一个MemoryStateBackend和一个FsStateBackend(和一个RocksDBStateBackend)。两者似乎都扩展了HeapKeyedStateBackend,即存储当前工作状态的机制完全相同。
这个SO answer说主要区别在于在MemoryStateBackendJobManagers 内存中保留检查点的副本。(我无法从源代码中收集到任何证据。)这MemoryStateBackend也限制了每个子任务的最大状态大小。
现在我想知道:你为什么要使用MemoryStateBackend?
比方说,我们有一个TumblingEventTimeWindow与大小5分钟。我们有包含2 条基本信息的事件:
在这个例子中,我们在工作人员机器的挂钟时间下午 12:00启动我们的Flink拓扑(当然工作人员可能有不同步的时钟,但这超出了本问题的范围)。该拓扑包含一个处理运算符,其职责是汇总属于每个窗口的事件值和一个与此问题无关的 KAFKA Sink。
在这种情况下,几个事件到达Flink Operator,具有不同的事件时间戳12:01 - 12:09。此外,事件时间戳与我们的处理时间相对一致(如下面的 X 轴所示)。由于我们正在处理EVENT_TIME特性,因此应通过其事件时间戳来确定偶数是否属于特定事件。
在那个流程中,我假设两个翻滚窗口的边界是并且仅仅因为我们在12:00开始执行拓扑。如果这个假设是正确的(我希望不是),那么在回填情况下会发生什么,其中几个旧事件带有更旧的事件时间戳,并且我们在12:00再次启动了拓扑?(足够老,我们的迟到津贴不包括他们)。类似于以下内容:12:00 -- …
我有两个从 Kafka 读取的简单 Flink 流作业执行一些转换并将结果放入 Cassandra Sink。他们从不同的 Kafka 主题中读取数据并保存到不同的 Cassandra 表中。
当我单独运行这两项工作中的任何一项时,一切正常。检查点被触发并完成,数据被保存到 Cassandra。
但是,当我运行两个作业(或其中一个运行两次)时,第二个作业在启动时失败,出现以下异常:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing)).
我找不到有关此错误的太多信息,它可能是由以下任一原因引起的:
我还使用可能与第一个冲突的包:
最后,这是我的集群构建器代码:
ClusterBuilder builder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
Cluster cluster = null;
try {
cluster = builder
.addContactPoint("localhost")
.withPort(9042)
.withClusterName("Test Cluster")
.withoutJMXReporting() …Run Code Online (Sandbox Code Playgroud) 我有 3 个不同类型的键控数据流。
DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
Run Code Online (Sandbox Code Playgroud)
每个流都定义了自己的处理逻辑,并在它们之间共享状态。我想连接这 3 个流,只要任何流中有数据可用,就会触发相应的处理函数。可以连接两个流。
first.connect(second).process(<CoProcessFunction>)
Run Code Online (Sandbox Code Playgroud)
我无法使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。
非常感谢您的帮助!
\n代码:
\nfrom pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo\nfrom pyflink.table import EnvironmentSettings, StreamTableEnvironment\n\n# stream \xe6\xa8\xa1\xe5\xbc\x8f\xe7\x9a\x84env\xe5\x88\x9b\xe5\xbb\xba\nenv_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()\nenv_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)\n\ntable1 = env_stream.from_elements([(1, 23.4, \'lili\'), (2, 33.4, \'er\'), (3, 45.6, \'yu\')], [\'id\', \'order_amt\', \'name\'])\ntable2 = env_stream.from_elements([(1, 43.4, \'xixi\'), (2, 53.4, \'rr\'), (3, 65.6, \'ww\')], [\'id2\', \'order_amt2\', \'name\'])\n\n# types: List[TypeInformation], field_names: List[str]\n# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], [\'id\', \'order_amt\', \'name\'])\nrow_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])\n\n\nstream = env_stream.to_append_stream(table1, row_type_info)\nRun Code Online (Sandbox Code Playgroud)\n错误信息:
\nTraceback (most recent call last):\n File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco\n …Run Code Online (Sandbox Code Playgroud) 我正在研究构建一个没有数据接收器的 flink 管道。即,当我的管道成功对数据存储进行 api 调用时,它就会结束。
在这种情况下,如果我们不使用接收器运算符,检查点将如何工作?
因为检查点基于检查点前纪元(所有保存在状态中或发送到接收器中的事件)和检查点后纪元的概念。Flink 管道是否需要接收器?
我在 Flinkrunner 上的 Apache Beam 中运行流管道 (python) 时遇到以下错误。该管道包含 GCP pub/sub io 源和 pub/sub 目标。
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter.
ERROR:root:java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=ref_PCollection_PCollection_1, PCollection=unique_name: "23 Read from Pub/Sub/Read.None"
coder_id: "ref_Coder_BytesCoder_1"
is_bounded: UNBOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}] were consumed but never produced
Traceback (most recent call last):
File "<stdin>", line 5, in <module>
File "/usr/local/lib64/python3.6/site-packages/apache_beam/pipeline.py", line 586, in __exit__
self.result.wait_until_finish()
File "/usr/local/lib64/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 599, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline BeamApp-swarna0kpaul-0712135603-763999c_45da372e-757d-4690-8e25-1a5ed0a5cc84 failed in state FAILED: …Run Code Online (Sandbox Code Playgroud) 我们使用 Flink 运行许多从 Kafka 读取数据的流作业,执行一些 SQL 转换并将输出写入 Kafka。它在 Kubernetes 上运行,有两个作业管理器和许多任务管理器。我们的作业使用 RocksDB 的检查点,并且我们的检查点写入 AWS S3 中的存储桶上。
最近,我们从 Flink 1.13.1 升级到 Flink 1.15.2。我们使用保存点机制来停止我们的作业并在新版本上重新启动它们。我们有两个 Kubernetes 集群。搬家后,他们俩似乎一切都很好。但几天后(第一个集群几乎一个月,第二个集群需要 2 或 3 天),我们现在遇到了其他问题(这可能与迁移到 Flink 1.15 有关,也可能与后来发生的迁移无关)。
最近,我们注意到有一些作业无法启动。我们看到执行图中的“源”任务保持已创建状态,而图中的所有其他任务(ChangelogNormalize、Writer)都在运行。作业定期重新启动并出现错误(为了便于阅读而简化了堆栈跟踪):
java.lang.Exception: Cannot deploy task Source: source_consumer -> *anonymous_datastream_source$81*[211] (1/8) (de8f109e944dfa92d35cdc3f79f41e6f) - TaskManager (<address>) not responding after a rpcTimeout of 10000 ms
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:602)
...
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@<address>/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the …Run Code Online (Sandbox Code Playgroud) flink-streaming ×10
apache-flink ×9
apache-beam ×1
cassandra ×1
data-stream ×1
flink-cep ×1
pyflink ×1
python ×1
scala ×1
streaming ×1
watermark ×1