标签: apache-flink

如何在Apache Flink中对GroupedDataSet上的函数进行flatMap

我想将一个函数应用于flatMap生成的每个组DataSet.groupBy.试图调用flatMap我得到编译器错误:

error: value flatMap is not a member of org.apache.flink.api.scala.GroupedDataSet
Run Code Online (Sandbox Code Playgroud)

我的代码:

var mapped = env.fromCollection(Array[(Int, Int)]())
var groups = mapped.groupBy("myGroupField")
groups.flatMap( myFunction: (Int, Array[Int]) => Array[(Int, Array[(Int, Int)])] )  // error: GroupedDataSet has no member flatMap
Run Code Online (Sandbox Code Playgroud)

实际上,在flink-scala 0.9-SNAPSHOT的文档中没有map列出或类似的.是否有类似的方法可以使用?如何在节点上单独实现每个组的所需分布式映射?

hadoop scala apache-flink

8
推荐指数
1
解决办法
786
查看次数

在Apache Flink中使用集合$ UnmodifiableCollection

使用Apache Flink时使用以下代码:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();
Run Code Online (Sandbox Code Playgroud)

我得到了这个例外

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我怎么UnmodifiableCollection能跟Flink一起玩?

kryo apache-flink

8
推荐指数
1
解决办法
1242
查看次数

如何在kafka 0.9.0中使用多线程消费者?

kafka的文档给出了以下描述的方法:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例.

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if …
Run Code Online (Sandbox Code Playgroud)

java multithreading distributed-computing apache-kafka apache-flink

8
推荐指数
1
解决办法
1万
查看次数

Apache Flink与Twitter Heron?

有很多问题比较Flink vs Spark Streaming,Flink vs Storm和Storm vs Heron.

这个问题的根源在于Apache Flink和Twitter Heron都是真正的流处理框架(不是微批处理,如Spark Streaming).Storm去年已经退役,他们正在使用Heron(基本上是Storm重做).

Slim Baltagi在Flink和Flink vs Spark上有很好的演示:https://www.youtube.com/watch?v = G77m6Ou_kFA

Ilya Ganelin对各种流媒体框架的精彩研究:https://www.youtube.com/watch?v = KkjhyBLupvs

关于Flink vs Storm的相当有趣的想法: Flink和Storm之间的主要区别是什么?

但我没有看到任何新的Storm/Heron与Apache Flink的比较.

这两个项目都很年轻,都支持使用以前编写的Storm应用程序和许多其他东西.Flink更适合Hadoop生态系统,Heron更多地融入基于Twitter的生态系统堆栈.

有什么想法吗?

twitter apache-storm apache-flink flink-streaming heron

8
推荐指数
1
解决办法
5177
查看次数

为什么有人会在Tez上运行Spark/Flink?

在Saha等人的Tez论文中,显示了Hadoop 2与Tez的以下模块化架构:

Hadoop 2与Tez

为什么有人会在Tez上运行Spark/Flink?

有什么好处?更好地利用YARN?

hadoop apache-spark apache-tez apache-flink tez

8
推荐指数
1
解决办法
422
查看次数

Flink中的操作员并行性的一些困惑

我只是得到下面关于并行性的示例,并且有一些相关的问题:

  1. setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?

  2. 是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。

  3. 根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

8
推荐指数
1
解决办法
3113
查看次数

从IDE运行时Flink webui

我想在网上看到我的工作.

我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作

  val conf: Configuration = new Configuration()
  import org.apache.flink.configuration.ConfigConstants
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
  val env =  StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val rides = env.addSource(
    new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))

  val filteredRides = rides
    .filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
    .map(r => (r.passengerCnt, 1))
    .keyBy(_._1)
    .window(TumblingTimeWindows.of(Time.seconds(5)))
    .sum(1)
    .map(r => (r._1.toString+"test", r._2))

  filteredRides.print()
  env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)

我需要设置其他东西吗?

apache-flink flink-streaming

8
推荐指数
2
解决办法
2486
查看次数

在流启动之前访问Flink类加载器

在我的项目中,我想在执行流之前访问Flink User Classloader.在流执行之前,我一直在实例化我自己的类加载器以反序列化类(尽量避免与多个类加载器相关的问题).

然而,我正在进一步推进更多的问题,我不得不写(坏)代码,以避免这个问题.

如果我可以访问Flink用户类加载器并使用它,这可以解决,但是在"RichFunctions"之外我没有看到这样做的机制(https://ci.apache.org/projects/flink/flink-docs -stable/api/java/org/apache/flink/api/common/functions/RichFunction.html),它们要求流运行.

这里的任何指导将不胜感激

java scala apache-flink flink-streaming

8
推荐指数
1
解决办法
117
查看次数

用于Apache Flink的BZip2压缩输入

我有一个用bzip2压缩的维基百科转储(从http://dumps.wikimedia.org/enwiki/下载),但我不想解压缩它:我想在动态解压缩时处理它.

我知道可以用普通的Java做到这一点(参见例如Java - 读取BZ2文件并动态解压缩/解析),但我想知道它是如何在Apache Flink中做到的?我可能需要的是像https://github.com/whym/wikihadoop,但对于Flink,而不是Hadoop.

bzip2 apache-flink

7
推荐指数
1
解决办法
659
查看次数

Flink和Play 2.5之间的Akka版本碰撞

在我们的项目中,我们有一个Flink(1.1.3)流式作业,它从一个kafka队列中读取,执行映射函数转换并写入另一个队列.

在我们作为流程的一部分引入传出REST请求之前,这一点很有效.为此,我们使用了PlayFramework WSClient(因为它在我们堆栈的其他位置使用),并以这种方式在代码中创建它:

  val config = new AhcWSClientConfig(wsClientConfig = WSClientConfig())
  val builder = new AhcConfigBuilder(config)
  val ahcConfig = builder.configure().build()
  new AhcWSClient(ahcConfig)(ActorMaterializer()(ActorSystem()))
Run Code Online (Sandbox Code Playgroud)

这在本地运行良好,但在部署并在群集上运行时,我遇到了以下异常:

java.lang.NoSuchMethodError: akka.util.Helpers$.toRootLowerCase(Ljava/lang/String;)Ljava/lang/String;
    at akka.stream.StreamSubscriptionTimeoutSettings$.apply(ActorMaterializer.scala:491)
    at akka.stream.ActorMaterializerSettings$.apply(ActorMaterializer.scala:243)
    at akka.stream.ActorMaterializerSettings$.apply(ActorMaterializer.scala:232)
    at akka.stream.ActorMaterializer$$anonfun$1.apply(ActorMaterializer.scala:41)
    at akka.stream.ActorMaterializer$$anonfun$1.apply(ActorMaterializer.scala:41)
    at scala.Option.getOrElse(Option.scala:121)
    at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:41)
    at com.ourstuff.etl.core.utils.web.GlobalWSClient$.generateClient(WSClientFactory.scala:32)
Run Code Online (Sandbox Code Playgroud)

调查一下,我认为这是Akka 2.3.x(由Flink 1.1.X带来)和Akka 2.4.x(由PlayFramework带来)之间的碰撞.

我们将Flink集群升级到1.3.1(以及我们的代码对Flink的依赖),假设这将解决问题.但同样的问题似乎仍然存在.

什么可能仍然导致这个?

akka playframework apache-flink flink-streaming playframework-2.5

7
推荐指数
1
解决办法
373
查看次数