我是apache flink的新手.我的输入中有一个未绑定的数据流(通过kakfa送入flink 0.10).
我想获得每个主键的第一次出现(主键是contract_num和event_dt).
这些"重复"几乎在彼此之后立即发生.源系统不能为我过滤这个,所以flink必须这样做.
这是我的输入数据:
contract_num,event_dt,attr
A1,2016-02-24
10:25: 08,X
A1,2016-02-24
10:25: 08,Y A1,2016-02-24 10:25: 09,Z
A2,2016-02-24 10:25:10,C
这是我想要的输出数据:
A1,2016-02-24 10 :25: 08,X A1,2016-02-24 10 :25:
09,Z A2,2016-02-24 10 :25:10
,C
请注意第2行已被删除,因为A001和'2016-02-24 10:25:08'的组合键已在第1行中出现.
我怎么能用flink 0.10做到这一点?
我正在考虑使用keyBy(0,1),
但之后我不知道该怎么做!
(我使用joda-time和org.flinkspector来设置这些测试).
contract_num, event_dt, attr
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
Run Code Online (Sandbox Code Playgroud) 我使用的是最新的Flink-1.1.2-Hadoop-27和flink-connector-kafka-0.10.2-hadoop1罐子.
Flink消费者如下:
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
if (properties == null) {
properties = new Properties();
InputStream props = Resources.getResource(KAFKA_CONFIGURATION_FILE).openStream();
properties.load(props);
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer082<>(KAFKA_SIP_TOPIC, new SimpleStringSchema() , properties));
Run Code Online (Sandbox Code Playgroud)
以下是执行后得到的异常:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointNotifier
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$100(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at …
Run Code Online (Sandbox Code Playgroud) 我正在使用协议缓冲区将数据流发送到 Apache Flink。我有两节课。一种是生产者,一种是消费者。Producer 是一个 java 线程类,它从 socket 中读取数据,Protobuf 将其反序列化,然后将其存储在我的 BlockingQueue 中。 Consumer 是一个在 Flink 中实现 SourceFunction 的类。我使用以下方法测试了这个程序:
DataStream<Event.MyEvent> stream = env.fromCollection(queue);
Run Code Online (Sandbox Code Playgroud)
而不是自定义源,它工作正常。但是当我尝试使用 SourceFunction 类时,它会引发此异常:
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
...
Caused by: java.lang.ClassNotFoundException: event.Event$MyEvent
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
...
Run Code Online (Sandbox Code Playgroud)
在另一次尝试中,我将两个分类为一个(实现 SourceFunction 的类)。我从套接字获取数据并使用 protobuf 将其反序列化并将其存储在 BlockingQueue 中,然后我立即从 BlockingQueue 中读取。我的代码也适用于这种方法。
但我想使用两个单独的类(多线程),但它会抛出该异常。我试图在过去 2 天内解决它,也做了很多搜索,但没有运气。任何帮助都会受到重视。
制作人:
public class Producer implements Runnable {
Boolean running = true;
Socket socket = null, bufferSocket = null;
PrintStream ps = …
Run Code Online (Sandbox Code Playgroud) 当我执行Flink应用程序时,它会给我以下信息NullPointerException
:
2017-08-08 13:21:57,690 INFO com.datastax.driver.core.Cluster - New Cassandra host /127.0.0.1:9042 added
2017-08-08 13:22:02,427 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(TumblingEventTimeWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@15d1c80b}, EventTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:302)) -> Filter -> Flat Map -> Sink: Cassandra Sink (1/1) (092a7ef50209f7a050d9d82be1e03d80) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) …
Run Code Online (Sandbox Code Playgroud) 我有一个带有 flink 的流处理过程,可以在单个路径中处理 csv 文件。我想知道每个处理文件的文件名。
我目前正在使用此函数将 csv 文件读入路径(dataPath)。
val recs:DataStream[CallCenterEvent] = env
.readFile[CallCenterEvent](
CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
dataPath,
FileProcessingMode.PROCESS_CONTINUOUSLY,
c._2.fileInterval)
.uid("source-%s-%s".format(systemConfig.name, c._1))
.name("%s records reading".format(c._1))
Run Code Online (Sandbox Code Playgroud)
并使用此函数获取 TupleCsvInputFormat。
def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
val typeInfo = implicitly[TypeInformation[T]]
val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
format.setFieldDelimiter(conf.fieldDelimiter)
format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
format.setLenient(true)
return format
}
Run Code Online (Sandbox Code Playgroud)
该过程运行正常,但我找不到获取处理的每个 csv 文件的文件名的方法。
提前致谢
我正在遵循Flink的快速入门示例:监视Wikipedia编辑流。
该示例使用Java,并且正在Scala中实现,如下所示:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
Run Code Online (Sandbox Code Playgroud)
但是,fold
Flink中的功能已被弃用,aggregate
建议使用该功能。
但我没有找到有关如何转换的过时的例子或教程fold
来aggregrate
。 …
Flink有两种消息
对于这两个消息,我们有单独的源流。我们对两个流都附加了相同的接收器。我们想要做的是广播控制消息,以便所有并行运行的接收器都应该接收它。
下面是相同的代码:
package com.ranjit.com.flinkdemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;;
public class FlinkBroadcast {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);
ctrl_message_stream.broadcast();
DataStream<String> message_stream = env.socketTextStream("localhost", 8087);
RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<String>() );
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
ctrl_message_stream.broadcast().addSink(sink);
message_stream.addSink(sink);
env.execute("stream");
}
}
Run Code Online (Sandbox Code Playgroud)
但是我观察到的是,它正在创建4个接收器实例,并且控制消息仅广播到2个接收器(由控制消息流创建)。因此,我了解的是,两个流都应通过相同的运算符链来执行此操作,而这是我们不希望的,因为数据消息将进行多次转换。我们已经编写了自己的接收器,如果它是控制消息,它将接收消息,然后它只会滚动文件。
示例代码:
package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import …
Run Code Online (Sandbox Code Playgroud) 在我的应用程序中,我想丰富无限的事件流。流本身通过对 Id 进行散列来并行化。对于每个事件,可能会调用外部源(例如 REST、DB)。这个调用本质上是阻塞的。必须保持一个流分区内的事件顺序。
我的想法是创建一个 RichMapFunction,它设置连接,然后轮询每个事件的外部源。阻塞调用通常不需要很长时间,但在最坏的情况下,服务可能会关闭。
理论上,这是可行的,但我觉得这样做不太好,因为我不知道如果流中有一些阻塞操作,Flink 会如何反应。如果您有很多并行流阻塞会发生什么,即我的线程用完了吗?或者在流并行化的点上行为如何向上流?
其他人是否可能有类似的问题以及我的问题的答案或一些解决方法的想法?
首先,当我重新运行Flink消费者时,这与Kafka再次消费最新消息非常类似,但它不一样.这个问题的答案似乎并没有解决我的问题.如果我错过了答案,那么请重新解释答案,因为我显然错过了一些东西.
但问题完全相同--Flink(kafka连接器)重新运行它在关闭之前看到的最后3-9条消息.
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
Run Code Online (Sandbox Code Playgroud)
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._
object Runner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");
val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}
Run Code Online (Sandbox Code Playgroud)
libraryDependencies ++= …
Run Code Online (Sandbox Code Playgroud) 我知道一个任务管理器可以有几个任务槽。
但是,什么是任务槽?JVM进程还是内存或线程中的对象?
apache-flink ×10
flink-streaming ×10
java ×3
apache-kafka ×2
scala ×2
aggregate ×1
data-stream ×1
duplicates ×1
fold ×1
streaming ×1