Flink已被比作Spark,正如我所看到的那样,它是错误的比较,因为它将窗口事件处理系统与微批处理进行比较; 同样,将Flink与Samza进行比较对我来说没有多大意义.在这两种情况下,它都会比较实时与批量事件处理策略,即使在Samza的情况下规模较小的"规模".但我想知道Flink与Storm的比较,它在概念上看起来与它更相似.
我发现这个(幻灯片#4)将主要区别记录为Flink的"可调延迟".另一个提示似乎是Slicon Angle的一篇文章,该文章表明Flink更好地集成到Spark或HadoopMR世界中,但没有提及或引用实际细节.最后,Fabian Hueske 在接受采访时指出:"与Apache Storm相比,Flink的流分析功能提供了一个高级API,并使用更轻量级的容错策略来提供一次性处理保证."
这对我来说有点稀疏,我不太清楚.有人可以通过Flink解释Storm中的流处理是什么问题(??)?什么是Hueske所指的API问题及其"更轻量级的容错策略"?
我正在尝试为Apache Flink编写一些用例.我经常遇到的一个错误是
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[SomeType]
Run Code Online (Sandbox Code Playgroud)
我的问题是,当它们发生时以及它们不发生时我无法确定.
最近的例子如下
...
val largeJoinDataGen = new LargeJoinDataGen(dataSetSize, dataGen, hitRatio)
val see = StreamExecutionEnvironment.getExecutionEnvironment
val newStreamInput = see.addSource(largeJoinDataGen)
...
Run Code Online (Sandbox Code Playgroud)
where LargeJoinDataGen extends GeneratorSource[(Int, String)]
和GeneratorSource[T] extends SourceFunction[T]
,都在单独的文件中定义.
当我试图建立这个时,我得到了
Error:(22, 39) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val newStreamInput = see.addSource(largeJoinDataGen)
Run Code Online (Sandbox Code Playgroud)
1.为什么给定的例子中有错误?
2.当这些错误发生时以及如何在将来避免它们时,一般指导原则是什么?
PS:第一个scala项目和第一个flink项目所以请耐心等待
在Apache Flink中,我有一组元组.让我们假设一个非常简单Tuple1<String>
.元组可以在其值字段中具有任意值(例如,"P1","P2"等).可能值的集合是有限的,但我事先并不知道全集(所以可能有'P362').我想根据元组内部的值将该元组写入某个输出位置.所以我希望有以下文件结构:
/output/P1
/output/P2
在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv("/output/somewhere")
),但没有办法让数据的内容决定数据实际结束的位置.
我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的).
可以使用Flink API完成,如果是这样,怎么做?如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?
更新
根据Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化之后将元组写入相应的文件.我把它放在这里作为参考,也许对其他人有用:
public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {
private final OutputSelector<IT> outputSelector;
private final MapFunction<IT, String> serializationFunction;
private final String basePath;
Map<String, TextOutputFormat<String>> formats = new HashMap<>();
/**
* @param outputSelector the selector which determines into which output(s) a record is written.
* @param serializationFunction a function which serializes the record to a string.
* @param basePath the base path for writing the records. It will be appended …
Run Code Online (Sandbox Code Playgroud) 我想通过HTTP协议将我的DataStream流的计算结果发送到其他服务.我看到了两种可能的实现方式:
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
private static final String URL = "http://httpbin.org/post";
private CloseableHttpClient httpClient;
private Histogram httpStatusesAccumulator;
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
}
@Override
public void close() throws Exception {
httpClient.close();
httpStatusesAccumulator.resetLocal();
}
@Override
public void invoke(SessionItem value) throws Exception {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
HttpPost httpPost = …
Run Code Online (Sandbox Code Playgroud) 有没有办法用Flink Streaming计算流中唯一单词的数量?结果将是一个不断增加的数字流.
卡住了一下CoFlatMapFunction
.如果我把它放在DataStream
之前的窗口上似乎工作正常但是如果放在窗口的"应用"功能之后就失败了.
我正在测试两个流,主要的"功能"在flatMap1
不断摄取数据和控制流"模型"时flatMap2
根据请求更改模型.
我能够设置并看到b0/b1正确设置flatMap2
,但flatMap1
始终看到b0和b1在初始化时设置为0.
我错过了一些明显的东西吗?
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " …
Run Code Online (Sandbox Code Playgroud) 我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object WindowExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.enableTimestamps()
env.setParallelism(1)
val start = 1449597577379L
val tuples = (1 to 10).map(t => (start + t * 1000, t))
env.fromCollection(tuples)
.assignAscendingTimestamps(_._1)
.timeWindowAll(Time.of(1, TimeUnit.SECONDS))
.sum(1)
.print()
env.execute()
}
Run Code Online (Sandbox Code Playgroud)
输入:
(1449597578379,1)
(1449597579379,2)
(1449597580379,3)
(1449597581379,4)
(1449597582379,5)
(1449597583379,6)
(1449597584379,7)
(1449597585379,8)
(1449597586379,9)
(1449597587379,10)
Run Code Online (Sandbox Code Playgroud)
结果:
[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Run Code Online (Sandbox Code Playgroud) 这是在 Amazon Kinesis Data Analytics Flink 环境中运行的 Flink 1.13.2。
该应用程序在 Kafka 主题上运行。当主题的流量较小时,该应用程序运行良好,当流量较大时,我收到此错误。如何排除故障、调整和修复?
我看到类似的问题,但这在旧版本的 Flink 中显然是一个单独的问题: Apache Flink - WordCount - NoResourceAvailableException
异常跟踪是:
2021-12-30 18:16:45
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:128)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:362)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:351)
at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:824)
at jdk.internal.reflect.GeneratedMethodAccessor133.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) …
Run Code Online (Sandbox Code Playgroud) 我只是得到下面关于并行性的示例,并且有一些相关的问题:
setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?
是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。
根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud) 我想在网上看到我的工作.
我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作
val conf: Configuration = new Configuration()
import org.apache.flink.configuration.ConfigConstants
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rides = env.addSource(
new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))
val filteredRides = rides
.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
.map(r => (r.passengerCnt, 1))
.keyBy(_._1)
.window(TumblingTimeWindows.of(Time.seconds(5)))
.sum(1)
.map(r => (r._1.toString+"test", r._2))
filteredRides.print()
env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)
我需要设置其他东西吗?