我试图增强显示流的使用的Flink示例.我的目标是使用窗口功能(请参阅window函数调用).我假设下面的代码输出流的最后3个数字的总和.(由于nc -lk 9999在ubuntu上打开了流)实际上,输出总结了输入的所有数字.切换到时间窗口会产生相同的结果,即不会产生窗口.
那是一个错误吗?(使用的版本:github上的最新版本)
object SocketTextStreamWordCount {
def main(args: Array[String]) {
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
// .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
.map { (x:String) => ("not used; just to have a tuple for …Run Code Online (Sandbox Code Playgroud) 我正在使用Flink 0.10.1的DataSet API编写应用程序.我可以在Flink中使用单个运算符获得多个收集器吗?
我想做的是如下:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
Run Code Online (Sandbox Code Playgroud)
目前我正在调用mapPartition两次从一个源数据集生成两个数据集.
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Flink运行示例程序。我使用下载了示例项目
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=0.8.1
然后,我在终端上运行以下命令
mvn package && java -cp target/test-1.0-SNAPSHOT.jar adfin.WordCount
我收到以下错误
线程“主要”中的异常java.lang.NoClassDefFoundError:adfin.WordCount.main(WordCount.scala)处的org / apache / flink / api / common / typeinfo / TypeInformation造成原因:java.lang.ClassNotFoundException:org.apache.flink .api.common.typeinfo.TypeInformation at java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher $ AppClassLoader.loadClass(Launcher .java:331),位于java.lang.ClassLoader.loadClass(ClassLoader.java:357)
我尝试将库从默认的0.8.1更新为1.0.0,但这没有任何改变。我想可能需要添加一个额外的jar。有人可以帮忙吗?
我想从FLINK中的多个KAFKA集群中读取数据.
但结果是kafkaMessageStream只从第一个Kafka读取.
只有当我为Kafka分别有2个流时,我才能从两个Kafka集群中读取,这不是我想要的.
是否可以将多个源连接到单个阅读器.
示例代码
public class KafkaReader<T> implements Reader<T>{
private StreamExecutionEnvironment executionEnvironment ;
public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){
executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500));
executionEnvironment.enableCheckpointing(
Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
//executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//try {
// executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH)));
// The RocksDBStateBackend or The FsStateBackend
//} catch (IOException e) {
// LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage());
}
return executionEnvironment;
}
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) {
DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>(
properties_k1.getProperty(Constants.TOPIC),deserializationSchema, …Run Code Online (Sandbox Code Playgroud) 对于操作员,输入流比其输出流快,因此其输入缓冲区将阻塞前一个操作员的输出线程,该线程将数据传输到该操作员。对?
Flink和Spark是否都通过阻塞线程来处理背压?那么它们之间有什么区别?
对于数据源,它会持续生成数据,如果其输出线程被阻塞怎么办?缓冲区会溢出吗?
我已经设置了一个Flink 1.2独立集群,其中包含2个JobManagers和3个TaskManagers,我正在使用JMeter通过生成Kafka消息/事件对其进行加载测试,然后对其进行处理.处理作业在TaskManager上运行,通常需要~15K事件/秒.
该作业已设置EXACTLY_ONCE检查点,并将状态和检查点持久保存到Amazon S3.如果我关闭运行作业的TaskManager,它需要几秒钟,然后在另一个TaskManager上恢复作业.该作业主要记录连续整数的事件ID(例如,从0到1200000).
当我检查TaskManager上的输出时,我关闭了最后一次计数,例如500000,然后当我在另一个TaskManager上检查恢复作业的输出时,它以~400000开始.这意味着~100K的重复事件.这个数字取决于测试的速度可以更高或更低.
不确定我是否遗漏了一些东西,但我希望在重新启动不同的TaskManager后,该作业会显示下一个连续的数字(如500001).
有谁知道为什么这发生/额外的设置我必须配置,以获得一次?
例如,有一个密钥流:
val keyedStream: KeyedStream[event, Key] = env
.addSource(...)
.keyBy(...)
// several transformations on the same stream
keyedStream.map(....)
keyedStream.window(....)
keyedStream.split(....)
keyedStream...(....)
Run Code Online (Sandbox Code Playgroud)
我认为这是Flink中相同流的重用,我发现重用它时,流的内容不受其他转换的影响,因此我认为它是同一流的副本。
但我不知道这是否正确。
如果是,这将使用大量资源(哪些资源?)来保存副本?
DataStreamSink类具有一个名称字段。为什么FlinkKafkaConsumer和FlinkKafkaProducer没有相似的字段?在“ Flink仪表板”中,我的洗手池所有人都说“未命名”
我使用Java中的Flink编写了一个简单程序,该程序将文件或文本作为输入,然后使用flatMap函数打印所有单词。
这是我的代码:
final ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
// show user defined parameters in the apache flink dashboard
DataStream<String> dataStream;
if(params.has("input"))
{
System.out.println("Executing Words example with file input");
dataStream = env.readTextFile(params.get("input"));
}else if (params.has("host") && params.has("port"))
{
System.out.println("Executing Words example with socket stream");
dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
}
else {
System.exit(1);
return;
}
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split(" "))
out.collect(word);
});
wordDataStream.print();
env.execute("Word Split");
Run Code Online (Sandbox Code Playgroud)
但是当我使用以下命令运行它时:
bin/flink run -c …Run Code Online (Sandbox Code Playgroud) Flink 1.5.3,当我将Flink作业提交到Flink群集(在Yarn上)时,它总是抛出AskTimeoutException。在flink配置文件中,我已经配置了参数“ akka.ask.timeout = 1000s”,但是下面的异常仍然是这样。
这意味着我增加了超时参数“ akka.ask.timeout=1000s”,但是它不起作用。
org.apache.flink.runtime.rest.handler.RestHandlerException: Job submission failed.
at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$handleRequest$2(JobSubmitHandler.java:116)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1851759541]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) …Run Code Online (Sandbox Code Playgroud)