标签: apache-flink

如何在Apache Flink中对GroupedDataSet上的函数进行flatMap

我想将一个函数应用于flatMap生成的每个组DataSet.groupBy.试图调用flatMap我得到编译器错误:

error: value flatMap is not a member of org.apache.flink.api.scala.GroupedDataSet
Run Code Online (Sandbox Code Playgroud)

我的代码:

var mapped = env.fromCollection(Array[(Int, Int)]())
var groups = mapped.groupBy("myGroupField")
groups.flatMap( myFunction: (Int, Array[Int]) => Array[(Int, Array[(Int, Int)])] )  // error: GroupedDataSet has no member flatMap
Run Code Online (Sandbox Code Playgroud)

实际上,在flink-scala 0.9-SNAPSHOT的文档中没有map列出或类似的.是否有类似的方法可以使用?如何在节点上单独实现每个组的所需分布式映射?

hadoop scala apache-flink

8
推荐指数
1
解决办法
786
查看次数

Flink:如何在一次转换中处理和输出两个数据集?

连接和coGroup转换可以读取2个输入数据集并输出一个("Y"通量)(如果我错了,请纠正我).

我想处理和更新2个数据集.为此,我计划使用2次coGroup转换.

但是,出于性能目的,这两种转换都可以在一个转换中完成("H"通量)吗?

此外,随着数据集的更新,我想迭代它们.如果目前无法实现,您是否计划在未来支持这种转型?

apache-flink

8
推荐指数
1
解决办法
863
查看次数

在Apache Flink中使用集合$ UnmodifiableCollection

使用Apache Flink时使用以下代码:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();
Run Code Online (Sandbox Code Playgroud)

我得到了这个例外

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我怎么UnmodifiableCollection能跟Flink一起玩?

kryo apache-flink

8
推荐指数
1
解决办法
1242
查看次数

如何正确处理自定义MapFunction中的错误?

我已经实现MapFunction了我的Apache Flink流程.它正在解析传入的元素并将它们转换为其他格式,但有时会出现错误(即传入的数据无效).

我看到了两种可能的处理方式:

  • 忽略无效元素,但似乎我不能忽略错误,因为对于任何传入元素我必须提供传出元素.
  • 将传入的元素拆分为有效和无效但似乎我应该使用其他功能.

所以,我有两个问题:

  1. 如何正确处理我的错误MapFunction
  2. 如何正确实现这样的转换功能?

apache-flink

8
推荐指数
1
解决办法
744
查看次数

如何在kafka 0.9.0中使用多线程消费者?

kafka的文档给出了以下描述的方法:

每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例.

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if …
Run Code Online (Sandbox Code Playgroud)

java multithreading distributed-computing apache-kafka apache-flink

8
推荐指数
1
解决办法
1万
查看次数

Apache Flink与Twitter Heron?

有很多问题比较Flink vs Spark Streaming,Flink vs Storm和Storm vs Heron.

这个问题的根源在于Apache Flink和Twitter Heron都是真正的流处理框架(不是微批处理,如Spark Streaming).Storm去年已经退役,他们正在使用Heron(基本上是Storm重做).

Slim Baltagi在Flink和Flink vs Spark上有很好的演示:https://www.youtube.com/watch?v = G77m6Ou_kFA

Ilya Ganelin对各种流媒体框架的精彩研究:https://www.youtube.com/watch?v = KkjhyBLupvs

关于Flink vs Storm的相当有趣的想法: Flink和Storm之间的主要区别是什么?

但我没有看到任何新的Storm/Heron与Apache Flink的比较.

这两个项目都很年轻,都支持使用以前编写的Storm应用程序和许多其他东西.Flink更适合Hadoop生态系统,Heron更多地融入基于Twitter的生态系统堆栈.

有什么想法吗?

twitter apache-storm apache-flink flink-streaming heron

8
推荐指数
1
解决办法
5177
查看次数

Apache Flink - 在作业中无法识别自定义Java选项

我已将以下行添加到flink-conf.yaml:

env.java.opts:" - Ddy.props.path =/PATH/TO/PROPS/FILE"

当启动jobmanager(jobmanager.sh启动集群)时,我在日志中看到jvm选项确实被识别

2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -  JVM Options:
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xms256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xmx256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
Run Code Online (Sandbox Code Playgroud)

但是当我运行flink作业(flink run -d PROG.JAR)时,System.getProperty("dy.props.path")返回null(当打印系统属性时,我发现它确实不存在.)

问题是 - 如何设置flink-job代码中可用的系统属性?

java apache-flink flink-streaming

8
推荐指数
1
解决办法
1590
查看次数

Flink中的操作员并行性的一些困惑

我只是得到下面关于并行性的示例,并且有一些相关的问题:

  1. setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?

  2. 是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。

  3. 根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

8
推荐指数
1
解决办法
3113
查看次数

如何从程序中停止flink流式传输作业

我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题FlinkKafkaProducer09FlinkKafkaConsumer09分别使用和从相同的kafka主题读取数据.我正在传递产品中的测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
Run Code Online (Sandbox Code Playgroud)

并检查相同的数据是否来自消费者:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);
Run Code Online (Sandbox Code Playgroud)

使用TestListResultSink.

我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.

是以任何方式进入FlinkFlinkKafkaConsumer09停止进程或运行特定时间?

junit apache-kafka apache-flink flink-streaming

8
推荐指数
1
解决办法
3021
查看次数

从IDE运行时Flink webui

我想在网上看到我的工作.

我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作

  val conf: Configuration = new Configuration()
  import org.apache.flink.configuration.ConfigConstants
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
  val env =  StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val rides = env.addSource(
    new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))

  val filteredRides = rides
    .filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
    .map(r => (r.passengerCnt, 1))
    .keyBy(_._1)
    .window(TumblingTimeWindows.of(Time.seconds(5)))
    .sum(1)
    .map(r => (r._1.toString+"test", r._2))

  filteredRides.print()
  env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)

我需要设置其他东西吗?

apache-flink flink-streaming

8
推荐指数
2
解决办法
2486
查看次数