标签: apache-flink

如何在Flink中读取.bsq文件?

正确设置项目后,我需要读取/导入几个.bsq文件到我的环境中.我尝试使用这样的env.readFile()方法:

DataSet<T> data = env.readFile(*insertFileInputFormatHere*, filePath);
Run Code Online (Sandbox Code Playgroud)

但我无法得到合适的FileInputFormat.由于它是抽象的,我不能拥有自己的实例.我应该扩展abstract class FileInputFormat并使用我自己的扩展来实例化一个FileInputFormat?还是有另一种我不认识的方式?

java apache-flink

2
推荐指数
1
解决办法
464
查看次数

kafka - > flink - 性能问题

我正在看一些生成~30K消息/秒的kafka主题.我有一个flink拓扑设置来读取其中一个,聚合一点(5秒窗口)然后(最终)写入DB.

当我运行拓扑并删除除了读取 - >聚合步骤之外的所有内容时,我每分钟只能获得~30K消息.背压不会发生在任何地方.

我究竟做错了什么?


编辑:

  1. 我无法改变主题空间的任何内容.每个主题都有一个分区,其中有数百个分区.
  2. 每条消息都是一个压缩的thrift对象,平均为2-3Kb

看起来我只能得到~1.5 MB/s.不是接近提到的100MB/s.

当前的代码路径:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
Run Code Online (Sandbox Code Playgroud)
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-flink flink-streaming

2
推荐指数
1
解决办法
1639
查看次数

将 reduceByKey 从 Spark 转换为 Flink

如何将此示例 scala spark 代码转换为 apache flink?

reduceByKey( (x, y) => (x._1 + y._1, ( (x._2) ++ y._2) ) )
Run Code Online (Sandbox Code Playgroud)

我意识到reduceByKeyflink 中不存在,但它显示了我想要实现的目标。

谢谢任何帮助!

scala apache-spark apache-flink

2
推荐指数
1
解决办法
1134
查看次数

如何将流程的新变化部署到Apache Flink集群?

例如,我在流程中上传了JAR,并通过Apache Flink仪表板运行它。然后,我对流程进行了一些更改,并希望部署它们。

谁能一步一步地向我解释如何将我的流程的新版本正确地部署到Apache Flink集群(不造成停机,状态丢失等)?我没有在官方文档中找到有关部署过程的描述。

apache-flink

2
推荐指数
1
解决办法
671
查看次数

为什么 env.readTextFile(...).first(10).print 在 Flink 中读取所有数据?

当我只想在 Apache Spark 中获取前 N 条记录时,我使用:

sc.textFile(path_to_files).take(10)
Run Code Online (Sandbox Code Playgroud)

它将快速返回并给我前 10 行文本。当我在 Apache Flink 中做类似的事情时:

env.readTextFile(path_to_files).first(10).print()
Run Code Online (Sandbox Code Playgroud)

它将在返回结果之前完全读取所有文件。这是为什么?

apache-flink

2
推荐指数
1
解决办法
980
查看次数

从Apache Flink中的HDFS地址流式传输文件

在我的Flink代码中,我正在流式传输一个位于HDFS文件夹的文件,我收到错误"(没有这样的文件或目录)",但我确定文件名和地址是正确的,因为我在批处理中使用了相同的方法和每件事都顺利进行.有谁知道可能是什么问题?这是我的代码:

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));
Run Code Online (Sandbox Code Playgroud)

及其相关课程:

public class MyObjectGenerator implements SourceFunction<MyObject> {

    private String dataFilePath;
    private float servingSpeedFactor;
    private Integer rowNo ; 
    private transient BufferedReader reader;
    private transient InputStream inputStream;

    public MyObjectGenerator(String dataFilePath) {
        this(dataFilePath, 1.0f);
    }

    public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
        this.dataFilePath = dataFilePath;
        this.servingSpeedFactor = servingSpeedFactor;
        rowNo = 0 ;
    }

    @Override
    public void run(SourceContext<MyObject> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        inputStream = new DataInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(inputStream));
        String line;
        long …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
808
查看次数

如何在其他的基础上过滤Apache flink流?

我有两个流,一个是 Int ,另一个是 json 。在 json Schema 中有一个键是一些 int 。所以我需要通过与另一个整数流的键比较来过滤 json 流,所以它可能在 Flink 中吗?

apache-flink flink-streaming

2
推荐指数
1
解决办法
3696
查看次数

Flink 是否保证所有情况下的任务容错?

我正在寻找一个容错的流处理引擎。出于这个原因,我用一个简单的工作来测试 Flink:从文本套接字读取单词的 SocketTextStreamWordCount 示例!我在一个有 3 个任务管理器的独立集群上运行它,我找到了负责从套接字读取的任务管理器!我杀死了 TaskManger (kill -9) 并等待查看结果:大约 30 秒后,JobManger 删除了死的 TaskManger!并将工作分配为失败

看来容错保证不是一般的东西,取决于Job!我对吗?有没有可以解释的参考资料?

apache-flink

2
推荐指数
1
解决办法
263
查看次数

Flink 中键控流中的记录排序

我有一个流,其中记录按顺序到达。我应用了一个 map 函数,然后在它上面应用了 keyBy 函数。记录的顺序是否会在每个具有相同键的记录流中保持?

Ordering of Records in Stream 中有一个类似的问题。但是我在那里给出的答案和从链接“ https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html ”复制的以下描述之间感到困惑。

" 在重新分配交换中,元素之间的排序仅保留在每对发送和接收子任务中(例如,map() 的子任务 [1] 和 keyBy/window 的子任务 [2])。所以在这个例子中,保留每个键内的排序,但并行性确实引入了关于不同键的聚合结果到达接收器的顺序的不确定性。”

在给出的示例中,keyBy 的子任务 [2] 接收来自 map 的子任务 [1] 和子任务 [2] 的元素。如果仅在子任务之间维护排序,如何保留每个键内的排序?

apache-flink flink-streaming

2
推荐指数
1
解决办法
819
查看次数

Flink JobExecutionException:akka.client.timeout

我正在使用Flink v.1.4.0.

我正在尝试使用DataSet APIthrough运行作业IntelliJ。请注意,如果我通过Flink UI该作业运行相同的作业,则该作业运行良好。为了运行作业,我需要首先通过环境变量指定将要处理的数据量。当数量相对较小时,作业运行良好。但随着它变大,我开始收到以下错误:

ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at …
Run Code Online (Sandbox Code Playgroud)

java intellij-idea akka apache-flink

2
推荐指数
1
解决办法
1386
查看次数