标签: flink-streaming

Apache Flink与Twitter Heron?

有很多问题比较Flink vs Spark Streaming,Flink vs Storm和Storm vs Heron.

这个问题的根源在于Apache Flink和Twitter Heron都是真正的流处理框架(不是微批处理,如Spark Streaming).Storm去年已经退役,他们正在使用Heron(基本上是Storm重做).

Slim Baltagi在Flink和Flink vs Spark上有很好的演示:https://www.youtube.com/watch?v = G77m6Ou_kFA

Ilya Ganelin对各种流媒体框架的精彩研究:https://www.youtube.com/watch?v = KkjhyBLupvs

关于Flink vs Storm的相当有趣的想法: Flink和Storm之间的主要区别是什么?

但我没有看到任何新的Storm/Heron与Apache Flink的比较.

这两个项目都很年轻,都支持使用以前编写的Storm应用程序和许多其他东西.Flink更适合Hadoop生态系统,Heron更多地融入基于Twitter的生态系统堆栈.

有什么想法吗?

twitter apache-storm apache-flink flink-streaming heron

8
推荐指数
1
解决办法
5177
查看次数

Apache Flink - 在作业中无法识别自定义Java选项

我已将以下行添加到flink-conf.yaml:

env.java.opts:" - Ddy.props.path =/PATH/TO/PROPS/FILE"

当启动jobmanager(jobmanager.sh启动集群)时,我在日志中看到jvm选项确实被识别

2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -  JVM Options:
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xms256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xmx256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
Run Code Online (Sandbox Code Playgroud)

但是当我运行flink作业(flink run -d PROG.JAR)时,System.getProperty("dy.props.path")返回null(当打印系统属性时,我发现它确实不存在.)

问题是 - 如何设置flink-job代码中可用的系统属性?

java apache-flink flink-streaming

8
推荐指数
1
解决办法
1590
查看次数

Flink中的操作员并行性的一些困惑

我只是得到下面关于并行性的示例,并且有一些相关的问题:

  1. setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?

  2. 是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。

  3. 根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

8
推荐指数
1
解决办法
3113
查看次数

如何从程序中停止flink流式传输作业

我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题FlinkKafkaProducer09FlinkKafkaConsumer09分别使用和从相同的kafka主题读取数据.我正在传递产品中的测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
Run Code Online (Sandbox Code Playgroud)

并检查相同的数据是否来自消费者:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);
Run Code Online (Sandbox Code Playgroud)

使用TestListResultSink.

我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.

是以任何方式进入FlinkFlinkKafkaConsumer09停止进程或运行特定时间?

junit apache-kafka apache-flink flink-streaming

8
推荐指数
1
解决办法
3021
查看次数

从IDE运行时Flink webui

我想在网上看到我的工作.

我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作

  val conf: Configuration = new Configuration()
  import org.apache.flink.configuration.ConfigConstants
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
  val env =  StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val rides = env.addSource(
    new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))

  val filteredRides = rides
    .filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
    .map(r => (r.passengerCnt, 1))
    .keyBy(_._1)
    .window(TumblingTimeWindows.of(Time.seconds(5)))
    .sum(1)
    .map(r => (r._1.toString+"test", r._2))

  filteredRides.print()
  env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)

我需要设置其他东西吗?

apache-flink flink-streaming

8
推荐指数
2
解决办法
2486
查看次数

在流启动之前访问Flink类加载器

在我的项目中,我想在执行流之前访问Flink User Classloader.在流执行之前,我一直在实例化我自己的类加载器以反序列化类(尽量避免与多个类加载器相关的问题).

然而,我正在进一步推进更多的问题,我不得不写(坏)代码,以避免这个问题.

如果我可以访问Flink用户类加载器并使用它,这可以解决,但是在"RichFunctions"之外我没有看到这样做的机制(https://ci.apache.org/projects/flink/flink-docs -stable/api/java/org/apache/flink/api/common/functions/RichFunction.html),它们要求流运行.

这里的任何指导将不胜感激

java scala apache-flink flink-streaming

8
推荐指数
1
解决办法
117
查看次数

Flink 1.11.1 中找不到执行应用程序的 ExecutorFactory

首先,我读过这篇关于同一问题的文章,并尝试遵循适用于他的相同解决方案(使用 mvn 创建一个新的快速入门并将代码迁移到那里),并且在开箱即用时无法正常工作IntelliJ。

这是我的 pom.xml 与其他 pom.xml 的依赖项混合在一起。我究竟做错了什么?

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming flink-cep

8
推荐指数
3
解决办法
1万
查看次数

Apache Flume与Apache Flink的区别

我需要从某些源读取数据流(在我的例子中它是UDP流,但它应该无关紧要),转换每个记录并将其写入HDFS.

为此目的使用FlumeFlink有什么区别吗?

我知道我可以使用Flume和自定义拦截器来转换每个事件.

但我是Flink的新手,所以对我而言,Flink看起来也会如此.

哪一个更好选择?性能有差异吗?

请帮忙!

flume flume-ng apache-flink flink-streaming

7
推荐指数
2
解决办法
2992
查看次数

Flink Streaming:如何实现由start和end元素定义的窗口?

我有以下格式的数据,

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | INVITE RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58: 58 IST 2016 …

apache-flink flink-streaming

7
推荐指数
1
解决办法
251
查看次数

Flink和Play 2.5之间的Akka版本碰撞

在我们的项目中,我们有一个Flink(1.1.3)流式作业,它从一个kafka队列中读取,执行映射函数转换并写入另一个队列.

在我们作为流程的一部分引入传出REST请求之前,这一点很有效.为此,我们使用了PlayFramework WSClient(因为它在我们堆栈的其他位置使用),并以这种方式在代码中创建它:

  val config = new AhcWSClientConfig(wsClientConfig = WSClientConfig())
  val builder = new AhcConfigBuilder(config)
  val ahcConfig = builder.configure().build()
  new AhcWSClient(ahcConfig)(ActorMaterializer()(ActorSystem()))
Run Code Online (Sandbox Code Playgroud)

这在本地运行良好,但在部署并在群集上运行时,我遇到了以下异常:

java.lang.NoSuchMethodError: akka.util.Helpers$.toRootLowerCase(Ljava/lang/String;)Ljava/lang/String;
    at akka.stream.StreamSubscriptionTimeoutSettings$.apply(ActorMaterializer.scala:491)
    at akka.stream.ActorMaterializerSettings$.apply(ActorMaterializer.scala:243)
    at akka.stream.ActorMaterializerSettings$.apply(ActorMaterializer.scala:232)
    at akka.stream.ActorMaterializer$$anonfun$1.apply(ActorMaterializer.scala:41)
    at akka.stream.ActorMaterializer$$anonfun$1.apply(ActorMaterializer.scala:41)
    at scala.Option.getOrElse(Option.scala:121)
    at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:41)
    at com.ourstuff.etl.core.utils.web.GlobalWSClient$.generateClient(WSClientFactory.scala:32)
Run Code Online (Sandbox Code Playgroud)

调查一下,我认为这是Akka 2.3.x(由Flink 1.1.X带来)和Akka 2.4.x(由PlayFramework带来)之间的碰撞.

我们将Flink集群升级到1.3.1(以及我们的代码对Flink的依赖),假设这将解决问题.但同样的问题似乎仍然存在.

什么可能仍然导致这个?

akka playframework apache-flink flink-streaming playframework-2.5

7
推荐指数
1
解决办法
373
查看次数