小编Til*_*ann的帖子

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组.让我们假设一个非常简单Tuple1<String>.元组可以在其值字段中具有任意值(例如,"P1","P2"等).可能值的集合是有限的,但我事先并不知道全集(所以可能有'P362').我想根据元组内部的值将该元组写入某个输出位置.所以我希望有以下文件结构:

  • /output/P1
  • /output/P2

在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv("/output/somewhere")),但没有办法让数据的内容决定数据实际结束的位置.

我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的).

可以使用Flink API完成,如果是这样,怎么做?如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?

更新

根据Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化之后将元组写入相应的文件.我把它放在这里作为参考,也许对其他人有用:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {

    private final OutputSelector<IT> outputSelector;
    private final MapFunction<IT, String> serializationFunction;
    private final String basePath;
    Map<String, TextOutputFormat<String>> formats = new HashMap<>();

    /**
     * @param outputSelector        the selector which determines into which output(s) a record is written.
     * @param serializationFunction a function which serializes the record to a string.
     * @param basePath              the base path for writing the records. It will be appended …
Run Code Online (Sandbox Code Playgroud)

java bigdata apache-flink flink-streaming

18
推荐指数
1
解决办法
5293
查看次数

Flink流媒体事件时间窗口排序

我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}
Run Code Online (Sandbox Code Playgroud)

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)
Run Code Online (Sandbox Code Playgroud)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

10
推荐指数
1
解决办法
2377
查看次数

flink作业不是跨机器分布的

我在Apache flink中有一个小用例,即批处理系统.我需要处理一组文件.每个文件的处理必须由一台机器处理.我有以下代码.始终只占用一个任务槽,并且一个接一个地处理文件.我有6个节点(所以6个任务管理器),并在每个节点配置4个任务槽.所以,我希望一次处理24个文件.

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  override def mapPartition(
      myfiles: java.lang.Iterable[java.io.File],
      out:org.apache.flink.util.Collector[Int])
    : Unit  =  {
    var temp = myfiles.iterator()
    while(temp.hasNext()){
      val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
      val file = new File(temp.next().toURI)
      Process(
        "/bin/bash ./run.sh  " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
        new File(fp1.getAbsoluteFile.getParent))
        .lines
        .foreach{println}
      out.collect(1)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我启动了flink as ./bin/start-cluster.sh命令,Web用户界面显示它有6个任务管理器,24个任务槽.

这些文件夹包含大约49个文件.当我在这个集合上创建mapPartition时,我希望跨越49个并行进程.但是,在我的基础设施中,它们都是一个接一个地处理的.这意味着只有一台机器(一个任务管理器)处理所有49个文件名.我想要的是,每个插槽配置2个任务,我希望同时处理24个文件.

任何指针肯定会有所帮助.我在flink-conf.yaml文件中有这些参数

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Run Code Online (Sandbox Code Playgroud)

提前致谢.谁能让我知道我哪里出错了?

scala batch-processing apache-flink

9
推荐指数
1
解决办法
393
查看次数

在Apache Flink中使用集合$ UnmodifiableCollection

使用Apache Flink时使用以下代码:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() {

    @Override
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
        List<String> top5 = Ordering.natural().greatestOf(iterable, 5);
        collector.collect(top5);
    }
}).flatten();
Run Code Online (Sandbox Code Playgroud)

我得到了这个例外

Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我怎么UnmodifiableCollection能跟Flink一起玩?

kryo apache-flink

8
推荐指数
1
解决办法
1242
查看次数

Flink中的操作员并行性的一些困惑

我只是得到下面关于并行性的示例,并且有一些相关的问题:

  1. setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?

  2. 是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。

  3. 根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

8
推荐指数
1
解决办法
3113
查看次数

运行Apache Flink作业时链接失败

我在Flink 0.9中开发了一个使用图形模块(Gelly)的工作.作业在IDE(Eclipse)中成功运行,但在使用maven(mvn clean install)将其导出到JAR后,无法在本地flink实例上执行,并出现以下错误

"由于链接失败,无法加载程序的入口点类'myclass'"

java.lang.NoClassDefFoundError: org/apache/flink/graph/GraphAlgorithm
Run Code Online (Sandbox Code Playgroud)

知道为什么会发生这种情况以及如何解决它?

maven apache-flink gelly

5
推荐指数
1
解决办法
736
查看次数

如何在scala中使用flink fold函数

这是一个非常有效的尝试使用Flink折叠与scala匿名函数:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])
Run Code Online (Sandbox Code Playgroud)

它汇编得很好,但在执行时,我得到了"类型擦除问题"(见下文).在Java中这样做很好,但当然更冗长.我喜欢简洁明了的lambda.我怎么能在scala中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

5
推荐指数
1
解决办法
551
查看次数

如何在Zeppelin中将Flink var的内容写入屏幕?

我尝试在Apache Zeppelin中运行以下简单命令.

%flink

var rabbit = env.fromElements(
"ARTHUR:  What, behind the rabbit?",
"TIM:  It is the rabbit!", 
"ARTHUR:  You silly sod!  You got us all worked up!",
"TIM:  Well, that's no ordinary rabbit.  That's the most foul, cruel, and bad-tempered rodent you ever set eyes on.",
"ROBIN:  You tit!  I soiled my armor I was so scared!", 
"TIM:  Look, that rabbit's got a vicious streak a mile wide, it's a killer!")

var counts = rabbit.flatMap { _.toLowerCase.split("\\W+")}.map{ (_,1)}.groupBy(0).sum(1) 

counts.print()
Run Code Online (Sandbox Code Playgroud)

我尝试在笔记本中打印出结果.但不幸的是,我只得到以下输出. …

apache-zeppelin apache-flink

5
推荐指数
2
解决办法
803
查看次数

在Apache-spark中,如何添加稀疏向量?

我正在尝试使用spark开发自己的前馈神经网络.但是我无法在spark的稀疏向量中找到乘法,加法或除法等操作.该文件表示它是使用breeze vector实现的.但我可以在微风中找到添加操作,但不能在spark矢量中找到.如何解决这个问题呢?

scala scala-breeze apache-spark

5
推荐指数
1
解决办法
1197
查看次数

如何在Apache Flink Streaming 0.10.0中为OVERWRITE指定writeAsText?

我在scala中有一种方法

counts.writeAsText(path_to_file)

当文件已经存在并建议指定时,它将引发异常 File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.。但是我没有在DataStream类中找到可以接受的方法org.apache.flink.core.fs.FileSystem.WriteMode。只有一个签名可以接受Long毫秒。

scala apache-flink flink-streaming

5
推荐指数
1
解决办法
1515
查看次数