我想将一个函数应用于flatMap生成的每个组DataSet.groupBy.试图调用flatMap我得到编译器错误:
error: value flatMap is not a member of org.apache.flink.api.scala.GroupedDataSet
Run Code Online (Sandbox Code Playgroud)
我的代码:
var mapped = env.fromCollection(Array[(Int, Int)]())
var groups = mapped.groupBy("myGroupField")
groups.flatMap( myFunction: (Int, Array[Int]) => Array[(Int, Array[(Int, Int)])] ) // error: GroupedDataSet has no member flatMap
Run Code Online (Sandbox Code Playgroud)
实际上,在flink-scala 0.9-SNAPSHOT的文档中没有map列出或类似的.是否有类似的方法可以使用?如何在节点上单独实现每个组的所需分布式映射?
使用Apache Flink时使用以下代码:
DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {
@Override
public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
collector.collect(top5);
}
}).flatten();
Run Code Online (Sandbox Code Playgroud)
我得到了这个例外
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我怎么UnmodifiableCollection能跟Flink一起玩?
kafka的文档给出了以下描述的方法:
每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例.
我的代码:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final CloudKafkaConsumer consumer;
private final String topicName;
public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
this.consumer = consumer;
this.topicName = topicName;
}
@Override
public void run() {
try {
this.consumer.subscribe(topicName);
ConsumerRecords<String, String> records;
while (!closed.get()) {
synchronized (consumer) {
records = consumer.poll(100);
}
for (ConsumerRecord<String, String> tmp : records) {
System.out.println(tmp.value());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
System.out.println(e);
//if …Run Code Online (Sandbox Code Playgroud) java multithreading distributed-computing apache-kafka apache-flink
有很多问题比较Flink vs Spark Streaming,Flink vs Storm和Storm vs Heron.
这个问题的根源在于Apache Flink和Twitter Heron都是真正的流处理框架(不是微批处理,如Spark Streaming).Storm去年已经退役,他们正在使用Heron(基本上是Storm重做).
Slim Baltagi在Flink和Flink vs Spark上有很好的演示:https://www.youtube.com/watch?v = G77m6Ou_kFA
Ilya Ganelin对各种流媒体框架的精彩研究:https://www.youtube.com/watch?v = KkjhyBLupvs
关于Flink vs Storm的相当有趣的想法: Flink和Storm之间的主要区别是什么?
但我没有看到任何新的Storm/Heron与Apache Flink的比较.
这两个项目都很年轻,都支持使用以前编写的Storm应用程序和许多其他东西.Flink更适合Hadoop生态系统,Heron更多地融入基于Twitter的生态系统堆栈.
有什么想法吗?
我只是得到下面关于并行性的示例,并且有一些相关的问题:
setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?
是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。
根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud) 我想在网上看到我的工作.
我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作
val conf: Configuration = new Configuration()
import org.apache.flink.configuration.ConfigConstants
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rides = env.addSource(
new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))
val filteredRides = rides
.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
.map(r => (r.passengerCnt, 1))
.keyBy(_._1)
.window(TumblingTimeWindows.of(Time.seconds(5)))
.sum(1)
.map(r => (r._1.toString+"test", r._2))
filteredRides.print()
env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)
我需要设置其他东西吗?
在我的项目中,我想在执行流之前访问Flink User Classloader.在流执行之前,我一直在实例化我自己的类加载器以反序列化类(尽量避免与多个类加载器相关的问题).
然而,我正在进一步推进更多的问题,我不得不写(坏)代码,以避免这个问题.
如果我可以访问Flink用户类加载器并使用它,这可以解决,但是在"RichFunctions"之外我没有看到这样做的机制(https://ci.apache.org/projects/flink/flink-docs -stable/api/java/org/apache/flink/api/common/functions/RichFunction.html),它们要求流运行.
这里的任何指导将不胜感激
我有一个用bzip2压缩的维基百科转储(从http://dumps.wikimedia.org/enwiki/下载),但我不想解压缩它:我想在动态解压缩时处理它.
我知道可以用普通的Java做到这一点(参见例如Java - 读取BZ2文件并动态解压缩/解析),但我想知道它是如何在Apache Flink中做到的?我可能需要的是像https://github.com/whym/wikihadoop,但对于Flink,而不是Hadoop.
在我们的项目中,我们有一个Flink(1.1.3)流式作业,它从一个kafka队列中读取,执行映射函数转换并写入另一个队列.
在我们作为流程的一部分引入传出REST请求之前,这一点很有效.为此,我们使用了PlayFramework WSClient(因为它在我们堆栈的其他位置使用),并以这种方式在代码中创建它:
val config = new AhcWSClientConfig(wsClientConfig = WSClientConfig())
val builder = new AhcConfigBuilder(config)
val ahcConfig = builder.configure().build()
new AhcWSClient(ahcConfig)(ActorMaterializer()(ActorSystem()))
Run Code Online (Sandbox Code Playgroud)
这在本地运行良好,但在部署并在群集上运行时,我遇到了以下异常:
java.lang.NoSuchMethodError: akka.util.Helpers$.toRootLowerCase(Ljava/lang/String;)Ljava/lang/String;
at akka.stream.StreamSubscriptionTimeoutSettings$.apply(ActorMaterializer.scala:491)
at akka.stream.ActorMaterializerSettings$.apply(ActorMaterializer.scala:243)
at akka.stream.ActorMaterializerSettings$.apply(ActorMaterializer.scala:232)
at akka.stream.ActorMaterializer$$anonfun$1.apply(ActorMaterializer.scala:41)
at akka.stream.ActorMaterializer$$anonfun$1.apply(ActorMaterializer.scala:41)
at scala.Option.getOrElse(Option.scala:121)
at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:41)
at com.ourstuff.etl.core.utils.web.GlobalWSClient$.generateClient(WSClientFactory.scala:32)
Run Code Online (Sandbox Code Playgroud)
调查一下,我认为这是Akka 2.3.x(由Flink 1.1.X带来)和Akka 2.4.x(由PlayFramework带来)之间的碰撞.
我们将Flink集群升级到1.3.1(以及我们的代码对Flink的依赖),假设这将解决问题.但同样的问题似乎仍然存在.
什么可能仍然导致这个?
akka playframework apache-flink flink-streaming playframework-2.5
apache-flink ×10
hadoop ×2
java ×2
scala ×2
akka ×1
apache-kafka ×1
apache-spark ×1
apache-storm ×1
apache-tez ×1
bzip2 ×1
heron ×1
kryo ×1
tez ×1
twitter ×1