Flink已被比作Spark,正如我所看到的那样,它是错误的比较,因为它将窗口事件处理系统与微批处理进行比较; 同样,将Flink与Samza进行比较对我来说没有多大意义.在这两种情况下,它都会比较实时与批量事件处理策略,即使在Samza的情况下规模较小的"规模".但我想知道Flink与Storm的比较,它在概念上看起来与它更相似.
我发现这个(幻灯片#4)将主要区别记录为Flink的"可调延迟".另一个提示似乎是Slicon Angle的一篇文章,该文章表明Flink更好地集成到Spark或HadoopMR世界中,但没有提及或引用实际细节.最后,Fabian Hueske 在接受采访时指出:"与Apache Storm相比,Flink的流分析功能提供了一个高级API,并使用更轻量级的容错策略来提供一次性处理保证."
这对我来说有点稀疏,我不太清楚.有人可以通过Flink解释Storm中的流处理是什么问题(??)?什么是Hueske所指的API问题及其"更轻量级的容错策略"?
Apache Beam支持多个运行后端,包括Apache Spark和Flink.我熟悉Spark/Flink,我试图看到Beam的批处理优缺点.
看一下Beam字数统计示例,它感觉它与原生的Spark/Flink等价物非常相似,可能有一个稍微冗长的语法.
我目前没有看到为这样的任务选择Beam over Spark/Flink的一大好处.到目前为止我能做的唯一观察:
是否有更好的例子突出了梁模型的其他优点/缺点?是否有关于失控如何影响性能的信息?
有人可以比较Flink和Spark作为机器学习的平台吗?哪种迭代算法可能更好?链接到一般的Flink vs Spark讨论:Apache Spark和Apache Flink有什么区别?
我正在尝试为Apache Flink编写一些用例.我经常遇到的一个错误是
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[SomeType]
Run Code Online (Sandbox Code Playgroud)
我的问题是,当它们发生时以及它们不发生时我无法确定.
最近的例子如下
...
val largeJoinDataGen = new LargeJoinDataGen(dataSetSize, dataGen, hitRatio)
val see = StreamExecutionEnvironment.getExecutionEnvironment
val newStreamInput = see.addSource(largeJoinDataGen)
...
Run Code Online (Sandbox Code Playgroud)
where LargeJoinDataGen extends GeneratorSource[(Int, String)]
和GeneratorSource[T] extends SourceFunction[T]
,都在单独的文件中定义.
当我试图建立这个时,我得到了
Error:(22, 39) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val newStreamInput = see.addSource(largeJoinDataGen)
Run Code Online (Sandbox Code Playgroud)
1.为什么给定的例子中有错误?
2.当这些错误发生时以及如何在将来避免它们时,一般指导原则是什么?
PS:第一个scala项目和第一个flink项目所以请耐心等待
在Apache Flink中,我有一组元组.让我们假设一个非常简单Tuple1<String>
.元组可以在其值字段中具有任意值(例如,"P1","P2"等).可能值的集合是有限的,但我事先并不知道全集(所以可能有'P362').我想根据元组内部的值将该元组写入某个输出位置.所以我希望有以下文件结构:
/output/P1
/output/P2
在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv("/output/somewhere")
),但没有办法让数据的内容决定数据实际结束的位置.
我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的).
可以使用Flink API完成,如果是这样,怎么做?如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?
更新
根据Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化之后将元组写入相应的文件.我把它放在这里作为参考,也许对其他人有用:
public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {
private final OutputSelector<IT> outputSelector;
private final MapFunction<IT, String> serializationFunction;
private final String basePath;
Map<String, TextOutputFormat<String>> formats = new HashMap<>();
/**
* @param outputSelector the selector which determines into which output(s) a record is written.
* @param serializationFunction a function which serializes the record to a string.
* @param basePath the base path for writing the records. It will be appended …
Run Code Online (Sandbox Code Playgroud) 我有两个想要组合的数据流.问题是一个数据流的频率比另一个高得多,并且有时一个流根本没有接收事件.是否可以使用来自一个流的最后一个事件,并在即将到来的每个事件上将其与另一个流连接?
我找到的唯一解决方案是使用join函数,但您必须指定一个公共窗口,您可以在其中应用join函数.当一个流没有接收到任何事件时,这是未达到的窗口.
是否有可能对来自任何一个流或另一个流的每个事件应用join函数并维护上次使用的事件的状态并将此事件用于join函数?
提前感谢任何有用的提示!
我已经构建了Spark和Flink k-means应用程序.我的测试用例是在3节点集群上对100万个点进行聚类.
当内存中的瓶颈开始时,Flink开始外包到磁盘并且工作缓慢但工作正常.但是,如果内存已满并且再次启动(无限循环?),Spark会丢失执行程序.
我试着在邮件列表的帮助下自定义内存设置,谢谢.但Spark仍然无效.
是否有必要设置任何配置?我的意思是Flink工作的内存很低,Spark也必须能够; 或不?
我正在尝试将 Flink 连接到 Kafka 消费者
我正在使用 Docker Compose 构建 4 个容器 zookeeper、kafka、Flink JobManager 和 Flink TaskManager。
对于 zookeeper 和 Kafka,我使用 wurstmeister 图像,对于 Flink,我使用官方图像。
docker-compose.yml
version: '3.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
hostname: zookeeper
expose:
- "2181"
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.11-2.0.0
depends_on:
- zookeeper
ports:
- "9092:9092"
hostname: kafka
links:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'
jobmanager:
build: ./flink_pipeline
depends_on:
- kafka
links:
- zookeeper
- kafka
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager …
Run Code Online (Sandbox Code Playgroud) 我想通过HTTP协议将我的DataStream流的计算结果发送到其他服务.我看到了两种可能的实现方式:
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
private static final String URL = "http://httpbin.org/post";
private CloseableHttpClient httpClient;
private Histogram httpStatusesAccumulator;
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
}
@Override
public void close() throws Exception {
httpClient.close();
httpStatusesAccumulator.resetLocal();
}
@Override
public void invoke(SessionItem value) throws Exception {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
HttpPost httpPost = …
Run Code Online (Sandbox Code Playgroud) 我正在研究数据流应用程序,我正在调查使用Apache Flink进行此项目的可能性.主要原因是它支持漂亮的高级流构造,非常类似于Java 8的Stream API.
我将接收与数据库中的特定记录相对应的事件,并且我希望能够处理这些事件(来自诸如RabbitMQ或Kafka之类的消息代理)并最终更新数据库中的记录并推送已处理的/将事件转换为另一个接收器(可能是另一个消息代理)
理想情况下,与特定记录相关的事件需要以FIFO顺序进行处理(尽管会有一个时间戳有助于检测无序事件),但可以并行处理与不同记录相关的事件.我打算使用keyBy()
构造来按记录分区流.
需要完成的处理取决于数据库中有关记录的当前信息.但是,我无法找到一个示例或建议的方法来查询数据库以获取此类记录,以便使用我需要处理它的其他信息来丰富正在处理的事件.
我想到的管道如下:
- > keyBy()对接收到的id - >从数据库中检索对应id的记录 - >对记录执行处理步骤 - >将处理后的事件推送到外部队列并更新数据库记录
需要更新数据库记录,因为另一个应用程序将查询数据.
在实现此管道之后,可能会有额外的优化措施.例如,可以将(更新的)记录缓存在托管状态,以便同一记录上的下一个事件不需要另一个数据库查询.但是,如果应用程序不知道特定记录,则需要从数据库中检索它.
Apache Flink中用于此类场景的最佳方法是什么?
apache-flink ×10
apache-spark ×3
java ×2
stream ×2
apache ×1
apache-beam ×1
apache-storm ×1
bigdata ×1
docker ×1
join ×1
memory ×1
scala ×1
streaming ×1