我想将一个函数应用于flatMap生成的每个组DataSet.groupBy.试图调用flatMap我得到编译器错误:
error: value flatMap is not a member of org.apache.flink.api.scala.GroupedDataSet
Run Code Online (Sandbox Code Playgroud)
我的代码:
var mapped = env.fromCollection(Array[(Int, Int)]())
var groups = mapped.groupBy("myGroupField")
groups.flatMap( myFunction: (Int, Array[Int]) => Array[(Int, Array[(Int, Int)])] ) // error: GroupedDataSet has no member flatMap
Run Code Online (Sandbox Code Playgroud)
实际上,在flink-scala 0.9-SNAPSHOT的文档中没有map列出或类似的.是否有类似的方法可以使用?如何在节点上单独实现每个组的所需分布式映射?
连接和coGroup转换可以读取2个输入数据集并输出一个("Y"通量)(如果我错了,请纠正我).
我想处理和更新2个数据集.为此,我计划使用2次coGroup转换.
但是,出于性能目的,这两种转换都可以在一个转换中完成("H"通量)吗?
此外,随着数据集的更新,我想迭代它们.如果目前无法实现,您是否计划在未来支持这种转型?
使用Apache Flink时使用以下代码:
DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {
@Override
public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
collector.collect(top5);
}
}).flatten();
Run Code Online (Sandbox Code Playgroud)
我得到了这个例外
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我怎么UnmodifiableCollection能跟Flink一起玩?
我已经实现MapFunction了我的Apache Flink流程.它正在解析传入的元素并将它们转换为其他格式,但有时会出现错误(即传入的数据无效).
我看到了两种可能的处理方式:
所以,我有两个问题:
MapFunction?kafka的文档给出了以下描述的方法:
每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例.
我的代码:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CloudKafkaConsumer consumer;
private final String topicName;
public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
this.consumer = consumer;
this.topicName = topicName;
}
@Override
public void run() {
try {
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if …Run Code Online (Sandbox Code Playgroud) java multithreading distributed-computing apache-kafka apache-flink
有很多问题比较Flink vs Spark Streaming,Flink vs Storm和Storm vs Heron.
这个问题的根源在于Apache Flink和Twitter Heron都是真正的流处理框架(不是微批处理,如Spark Streaming).Storm去年已经退役,他们正在使用Heron(基本上是Storm重做).
Slim Baltagi在Flink和Flink vs Spark上有很好的演示:https://www.youtube.com/watch?v = G77m6Ou_kFA
Ilya Ganelin对各种流媒体框架的精彩研究:https://www.youtube.com/watch?v = KkjhyBLupvs
关于Flink vs Storm的相当有趣的想法: Flink和Storm之间的主要区别是什么?
但我没有看到任何新的Storm/Heron与Apache Flink的比较.
这两个项目都很年轻,都支持使用以前编写的Storm应用程序和许多其他东西.Flink更适合Hadoop生态系统,Heron更多地融入基于Twitter的生态系统堆栈.
有什么想法吗?
我已将以下行添加到flink-conf.yaml:
env.java.opts:" - Ddy.props.path =/PATH/TO/PROPS/FILE"
当启动jobmanager(jobmanager.sh启动集群)时,我在日志中看到jvm选项确实被识别
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options:
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO org.apache.flink.runtime.jobmanager.JobManager - -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
Run Code Online (Sandbox Code Playgroud)
但是当我运行flink作业(flink run -d PROG.JAR)时,System.getProperty("dy.props.path")返回null(当打印系统属性时,我发现它确实不存在.)
问题是 - 如何设置flink-job代码中可用的系统属性?
我只是得到下面关于并行性的示例,并且有一些相关的问题:
setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?
是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。
根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud) 我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题FlinkKafkaProducer09并FlinkKafkaConsumer09分别使用和从相同的kafka主题读取数据.我正在传递产品中的测试数据:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
Run Code Online (Sandbox Code Playgroud)
并检查相同的数据是否来自消费者:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
Run Code Online (Sandbox Code Playgroud)
使用TestListResultSink.
我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.
是以任何方式进入Flink或FlinkKafkaConsumer09停止进程或运行特定时间?
我想在网上看到我的工作.
我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作
val conf: Configuration = new Configuration()
import org.apache.flink.configuration.ConfigConstants
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rides = env.addSource(
new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))
val filteredRides = rides
.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
.map(r => (r.passengerCnt, 1))
.keyBy(_._1)
.window(TumblingTimeWindows.of(Time.seconds(5)))
.sum(1)
.map(r => (r._1.toString+"test", r._2))
filteredRides.print()
env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)
我需要设置其他东西吗?
apache-flink ×10
apache-kafka ×2
java ×2
apache-storm ×1
hadoop ×1
heron ×1
junit ×1
kryo ×1
scala ×1
twitter ×1