标签: apache-flink

Flink和Storm之间的主要区别是什么?

Flink已被比作Spark,正如我所看到的那样,它是错误的比较,因为它将窗口事件处理系统与微批处理进行比较; 同样,将Flink与Samza进行比较对我来说没有多大意义.在这两种情况下,它都会比较实时与批量事件处理策略,即使在Samza的情况下规模较小的"规模".但我想知道Flink与Storm的比较,它在概念上看起来与它更相似.

我发现这个(幻灯片#4)将主要区别记录为Flink的"可调延迟".另一个提示似乎是Slicon Angle的一篇文章,该文章表明Flink更好地集成到Spark或HadoopMR世界中,但没有提及或引用实际细节.最后,Fabian Hueske 在接受采访时指出:"与Apache Storm相比,Flink的流分析功能提供了一个高级API,并使用更轻量级的容错策略来提供一次性处理保证."

这对我来说有点稀疏,我不太清楚.有人可以通过Flink解释Storm中的流处理是什么问题(??)?什么是Hueske所指的API问题及其"更轻量级的容错策略"?

apache-storm apache-flink flink-streaming

131
推荐指数
3
解决办法
4万
查看次数

Apache Beam相对于Spark/Flink进行批处理有什么好处?

Apache Beam支持多个运行后端,包括Apache Spark和Flink.我熟悉Spark/Flink,我试图看到Beam的批处理优缺点.

看一下Beam字数统计示例,它感觉它与原生的Spark/Flink等价物非常相似,可能有一个稍微冗长的语法.

我目前没有看到为这样的任务选择Beam over Spark/Flink的一大好处.到目前为止我能做的唯一观察:

  • Pro:不同执行后端的抽象.
  • Con:这种抽象的代价是对Spark/Flink中执行的内容的控制较少.

是否有更好的例子突出了梁模型的其他优点/缺点?是否有关于失控如何影响性能的信息?

请注意,我并不是要求在流方面存在差异,这些问题本文中已部分涵盖并在本文中进行了总结(由于Spark 1.X而过时).

apache-spark apache-flink apache-beam

63
推荐指数
2
解决办法
2万
查看次数

Apache Flink和Apache Spark作为大规模机器学习的平台?

有人可以比较Flink和Spark作为机器学习的平台吗?哪种迭代算法可能更好?链接到一般的Flink vs Spark讨论:Apache Spark和Apache Flink有什么区别?

machine-learning apache-spark apache-flink

23
推荐指数
1
解决办法
3575
查看次数

找不到类型org.apache.flink.api.common.typeinfo.TypeInformation [...]的证据参数的隐含值

我正在尝试为Apache Flink编写一些用例.我经常遇到的一个错误是

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[SomeType]
Run Code Online (Sandbox Code Playgroud)

我的问题是,当它们发生时以及它们不发生时我无法确定.

最近的例子如下

...
val largeJoinDataGen = new LargeJoinDataGen(dataSetSize, dataGen, hitRatio)
val see = StreamExecutionEnvironment.getExecutionEnvironment
val newStreamInput = see.addSource(largeJoinDataGen)
...
Run Code Online (Sandbox Code Playgroud)

where LargeJoinDataGen extends GeneratorSource[(Int, String)]GeneratorSource[T] extends SourceFunction[T],都在单独的文件中定义.

当我试图建立这个时,我得到了

Error:(22, 39) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val newStreamInput = see.addSource(largeJoinDataGen)
Run Code Online (Sandbox Code Playgroud)

1.为什么给定的例子中有错误?

2.当这些错误发生时以及如何在将来避免它们时,一般指导原则是什么?

PS:第一个scala项目和第一个flink项目所以请耐心等待

scala apache-flink flink-streaming

19
推荐指数
3
解决办法
1万
查看次数

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组.让我们假设一个非常简单Tuple1<String>.元组可以在其值字段中具有任意值(例如,"P1","P2"等).可能值的集合是有限的,但我事先并不知道全集(所以可能有'P362').我想根据元组内部的值将该元组写入某个输出位置.所以我希望有以下文件结构:

  • /output/P1
  • /output/P2

在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv("/output/somewhere")),但没有办法让数据的内容决定数据实际结束的位置.

我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的).

可以使用Flink API完成,如果是这样,怎么做?如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?

更新

根据Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化之后将元组写入相应的文件.我把它放在这里作为参考,也许对其他人有用:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {

    private final OutputSelector<IT> outputSelector;
    private final MapFunction<IT, String> serializationFunction;
    private final String basePath;
    Map<String, TextOutputFormat<String>> formats = new HashMap<>();

    /**
     * @param outputSelector        the selector which determines into which output(s) a record is written.
     * @param serializationFunction a function which serializes the record to a string.
     * @param basePath              the base path for writing the records. It will be appended …
Run Code Online (Sandbox Code Playgroud)

java bigdata apache-flink flink-streaming

18
推荐指数
1
解决办法
5293
查看次数

无论窗口时间如何,都可以在Apache Flink中组合两个流

我有两个想要组合的数据流.问题是一个数据流的频率比另一个高得多,并且有时一个流根本没有接收事件.是否可以使用来自一个流的最后一个事件,并在即将到来的每个事件上将其与另一个流连接?

我找到的唯一解决方案是使用join函数,但您必须指定一个公共窗口,您可以在其中应用join函数.当一个流没有接收到任何事件时,这是未达到的窗口.

是否有可能对来自任何一个流或另一个流的每个事件应用join函数并维护上次使用的事件的状态并将此事件用于join函数?

提前感谢任何有用的提示!

apache streaming join stream apache-flink

15
推荐指数
1
解决办法
6278
查看次数

Spark vs Flink内存不足

我已经构建了Spark和Flink k-means应用程序.我的测试用例是在3节点集群上对100万个点进行聚类.

当内存中的瓶颈开始时,Flink开始外包到磁盘并且工作缓慢但工作正常.但是,如果内存已满并且再次启动(无限循环?),Spark会丢失执行程序.

我试着在邮件列表的帮助下自定义内存设置,谢谢.但Spark仍然无效.

是否有必要设置任何配置?我的意思是Flink工作的内存很低,Spark也必须能够; 或不?

memory apache-spark apache-flink

14
推荐指数
1
解决办法
2384
查看次数

在确定分区位置之前 60000 毫秒的 Kafka 客户端超时已过期

我正在尝试将 Flink 连接到 Kafka 消费者

我正在使用 Docker Compose 构建 4 个容器 zookeeper、kafka、Flink JobManager 和 Flink TaskManager。

对于 zookeeper 和 Kafka,我使用 wurstmeister 图像,对于 Flink,我使用官方图像。

docker-compose.yml

version: '3.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    hostname: zookeeper
    expose:
      - "2181"
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    hostname: kafka
    links:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'

  jobmanager:
    build: ./flink_pipeline
    depends_on:
      - kafka
    links:
      - zookeeper
      - kafka
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager …
Run Code Online (Sandbox Code Playgroud)

docker docker-compose apache-flink

14
推荐指数
1
解决办法
1万
查看次数

如何正确实现HTTP接收器?

我想通过HTTP协议将我的DataStream流的计算结果发送到其他服务.我看到了两种可能的实现方式:

  1. 在接收器中使用同步Apache HttpClient客户端
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

13
推荐指数
1
解决办法
2624
查看次数

如何从Apache Flink中的数据库中查找和更新记录的状态?

我正在研究数据流应用程序,我正在调查使用Apache Flink进行此项目的可能性.主要原因是它支持漂亮的高级流构造,非常类似于Java 8的Stream API.

我将接收与数据库中的特定记录相对应的事件,并且我希望能够处理这些事件(来自诸如RabbitMQ或Kafka之类的消息代理)并最终更新数据库中的记录并推送已处理的/将事件转换为另一个接收器(可能是另一个消息代理)

理想情况下,与特定记录相关的事件需要以FIFO顺序进行处理(尽管会有一个时间戳有助于检测无序事件),但可以并行处理与不同记录相关的事件.我打算使用keyBy()构造来按记录分区流.

需要完成的处理取决于数据库中有关记录的当前信息.但是,我无法找到一个示例或建议的方法来查询数据库以获取此类记录,以便使用我需要处理它的其他信息来丰富正在处理的事件.

我想到的管道如下:

- > keyBy()对接收到的id - >从数据库中检索对应id的记录 - >对记录执行处理步骤 - >将处理后的事件推送到外部队列并更新数据库记录

需要更新数据库记录,因为另一个应用程序将查询数据.

在实现此管道之后,可能会有额外的优化措施.例如,可以将(更新的)记录缓存在托管状态,以便同一记录上的下一个事件不需要另一个数据库查询.但是,如果应用程序不知道特定记录,则需要从数据库中检索它.

Apache Flink中用于此类场景的最佳方法是什么?

java stream apache-flink

13
推荐指数
1
解决办法
1020
查看次数