小编Fab*_*ske的帖子

Flink和Storm之间的主要区别是什么?

Flink已被比作Spark,正如我所看到的那样,它是错误的比较,因为它将窗口事件处理系统与微批处理进行比较; 同样,将Flink与Samza进行比较对我来说没有多大意义.在这两种情况下,它都会比较实时与批量事件处理策略,即使在Samza的情况下规模较小的"规模".但我想知道Flink与Storm的比较,它在概念上看起来与它更相似.

我发现这个(幻灯片#4)将主要区别记录为Flink的"可调延迟".另一个提示似乎是Slicon Angle的一篇文章,该文章表明Flink更好地集成到Spark或HadoopMR世界中,但没有提及或引用实际细节.最后,Fabian Hueske 在接受采访时指出:"与Apache Storm相比,Flink的流分析功能提供了一个高级API,并使用更轻量级的容错策略来提供一次性处理保证."

这对我来说有点稀疏,我不太清楚.有人可以通过Flink解释Storm中的流处理是什么问题(??)?什么是Hueske所指的API问题及其"更轻量级的容错策略"?

apache-storm apache-flink flink-streaming

131
推荐指数
3
解决办法
4万
查看次数

Apache Flink中的并行度

我可以在Flink的程序中为不同的任务部分设置不同程度的并行度吗?例如,Flink如何解释以下示例代码?两个自定义从业者MyPartitioner1,MyPartitioner2,将输入数据分为两个4和2个分区.

partitionedData1 = inputData1
  .partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
  .mapPartition(new calculateFun());

partitionedData2 = inputData2
  .partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
  .mapPartition(new calculateFun());
Run Code Online (Sandbox Code Playgroud)

我为此代码收到以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 …
Run Code Online (Sandbox Code Playgroud)

apache-flink

9
推荐指数
1
解决办法
2235
查看次数

Apache Flink的JoinFunction和CoGroupFunction有何不同?

Apache Flink中a JoinFunction和a有什么区别CoGroupFunction?语义和执行有何不同?

apache-flink

9
推荐指数
1
解决办法
1914
查看次数

为什么不建议将RAID用于Hadoop HDFS设置?

各种网站(如Hortonworks)建议不要为HDFS设置配置RAID主要是因为两个原因:

  1. 速度限制为较慢的磁盘(JBOD表现更好).
  2. 可靠性

建议在NameNode上使用RAID.

但是在每个DataNode存储磁盘上实现RAID呢?

raid hadoop distributed-system hdfs

7
推荐指数
1
解决办法
1万
查看次数

Flink在distinct()中使用了什么魔法?如何生成代理键?

关于生成代理键,第一步是获取distinct,然后为每个元组构建增量键.

因此,我使用Java Set来获取不同的元素,并且它不在堆空间中.然后,我使用Flink的distinct(),它完全有效.

我能问一下这有什么不同吗?

另一个相关的问题是,Flink能否在mapper中生成代理键?

apache-flink

7
推荐指数
1
解决办法
931
查看次数

Apache Flink DataStream API没有mapPartition转换

Spark DStream有mapPartitionAPI,而Flink DataStreamAPI没有.是否有人可以帮助解释原因.我想要做的是实现类似于reduceByKeyFlink 上的Spark的API .

apache-flink

7
推荐指数
1
解决办法
863
查看次数

Flink Streaming:如何实现由start和end元素定义的窗口?

我有以下格式的数据,

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | INVITE RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58: 58 IST 2016 …

apache-flink flink-streaming

7
推荐指数
1
解决办法
251
查看次数

Apache Flink:在DataStream API中,侧面输出和split()有什么区别?

Apache Flink有一个splitAPI,可用于分支数据流:

val splited = datastream.split { i => i match {
   case i if ... => Seq("red", "blue")
   case _ => Seq("green")
}}

splited.select("green").flatMap { .... }
Run Code Online (Sandbox Code Playgroud)

它还提供了另一种称为Side Output的方法(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/side_output.html),使您可以执行相同的操作!

这两种方式有什么区别?他们是否使用相同的较低层结构?它们的价格一样吗?我们何时以及如何选择其中之一?

apache-flink flink-streaming

7
推荐指数
1
解决办法
1022
查看次数

Apache Flink:由TupleSerializer引起的NullPointerException

当我执行Flink应用程序时,它会给我以下信息NullPointerException

 2017-08-08 13:21:57,690 INFO com.datastax.driver.core.Cluster  - New  Cassandra host /127.0.0.1:9042 added 
 2017-08-08 13:22:02,427 INFO  org.apache.flink.runtime.taskmanager.Task                     - TriggerWindow(TumblingEventTimeWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@15d1c80b}, EventTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:302)) -> Filter -> Flat Map -> Sink: Cassandra Sink (1/1) (092a7ef50209f7a050d9d82be1e03d80) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

6
推荐指数
1
解决办法
2357
查看次数

Flink 1.4 AvroUtils错误

我尝试在Flink 1.4上提交作业并获得以下异常.

知道如何解决问题吗?

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.VerifyError: Bad type on operand stack
Exception Details:
  Location:
    org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
  Reason:
Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
  Current Frame:
bci: @23
flags: { }
locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized …
Run Code Online (Sandbox Code Playgroud)

java avro kryo apache-flink

6
推荐指数
2
解决办法
386
查看次数