标签: flink-streaming

Flink和Storm之间的主要区别是什么?

Flink已被比作Spark,正如我所看到的那样,它是错误的比较,因为它将窗口事件处理系统与微批处理进行比较; 同样,将Flink与Samza进行比较对我来说没有多大意义.在这两种情况下,它都会比较实时与批量事件处理策略,即使在Samza的情况下规模较小的"规模".但我想知道Flink与Storm的比较,它在概念上看起来与它更相似.

我发现这个(幻灯片#4)将主要区别记录为Flink的"可调延迟".另一个提示似乎是Slicon Angle的一篇文章,该文章表明Flink更好地集成到Spark或HadoopMR世界中,但没有提及或引用实际细节.最后,Fabian Hueske 在接受采访时指出:"与Apache Storm相比,Flink的流分析功能提供了一个高级API,并使用更轻量级的容错策略来提供一次性处理保证."

这对我来说有点稀疏,我不太清楚.有人可以通过Flink解释Storm中的流处理是什么问题(??)?什么是Hueske所指的API问题及其"更轻量级的容错策略"?

apache-storm apache-flink flink-streaming

131
推荐指数
3
解决办法
4万
查看次数

找不到类型org.apache.flink.api.common.typeinfo.TypeInformation [...]的证据参数的隐含值

我正在尝试为Apache Flink编写一些用例.我经常遇到的一个错误是

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[SomeType]
Run Code Online (Sandbox Code Playgroud)

我的问题是,当它们发生时以及它们不发生时我无法确定.

最近的例子如下

...
val largeJoinDataGen = new LargeJoinDataGen(dataSetSize, dataGen, hitRatio)
val see = StreamExecutionEnvironment.getExecutionEnvironment
val newStreamInput = see.addSource(largeJoinDataGen)
...
Run Code Online (Sandbox Code Playgroud)

where LargeJoinDataGen extends GeneratorSource[(Int, String)]GeneratorSource[T] extends SourceFunction[T],都在单独的文件中定义.

当我试图建立这个时,我得到了

Error:(22, 39) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
val newStreamInput = see.addSource(largeJoinDataGen)
Run Code Online (Sandbox Code Playgroud)

1.为什么给定的例子中有错误?

2.当这些错误发生时以及如何在将来避免它们时,一般指导原则是什么?

PS:第一个scala项目和第一个flink项目所以请耐心等待

scala apache-flink flink-streaming

19
推荐指数
3
解决办法
1万
查看次数

Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

在Apache Flink中,我有一组元组.让我们假设一个非常简单Tuple1<String>.元组可以在其值字段中具有任意值(例如,"P1","P2"等).可能值的集合是有限的,但我事先并不知道全集(所以可能有'P362').我想根据元组内部的值将该元组写入某个输出位置.所以我希望有以下文件结构:

  • /output/P1
  • /output/P2

在文档中我只发现了写入我事先知道的位置的可能性(例如stream.writeCsv("/output/somewhere")),但没有办法让数据的内容决定数据实际结束的位置.

我在文档中读到了关于输出拆分的内容,但这似乎没有提供一种方法将输出重定向到我希望拥有它的方式(或者我只是不明白这是如何工作的).

可以使用Flink API完成,如果是这样,怎么做?如果没有,是否可能有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?

更新

根据Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在序列化之后将元组写入相应的文件.我把它放在这里作为参考,也许对其他人有用:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {

    private final OutputSelector<IT> outputSelector;
    private final MapFunction<IT, String> serializationFunction;
    private final String basePath;
    Map<String, TextOutputFormat<String>> formats = new HashMap<>();

    /**
     * @param outputSelector        the selector which determines into which output(s) a record is written.
     * @param serializationFunction a function which serializes the record to a string.
     * @param basePath              the base path for writing the records. It will be appended …
Run Code Online (Sandbox Code Playgroud)

java bigdata apache-flink flink-streaming

18
推荐指数
1
解决办法
5293
查看次数

如何正确实现HTTP接收器?

我想通过HTTP协议将我的DataStream流的计算结果发送到其他服务.我看到了两种可能的实现方式:

  1. 在接收器中使用同步Apache HttpClient客户端
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

13
推荐指数
1
解决办法
2624
查看次数

如何计算流中的唯一单词?

有没有办法用Flink Streaming计算流中唯一单词的数量?结果将是一个不断增加的数字流.

apache-flink flink-streaming

11
推荐指数
1
解决办法
2097
查看次数

Flink:在CoFlatMapFunction中共享状态

卡住了一下CoFlatMapFunction.如果我把它放在DataStream之前的窗口上似乎工作正常但是如果放在窗口的"应用"功能之后就失败了.

我正在测试两个流,主要的"功能"在flatMap1不断摄取数据和控制流"模型"时flatMap2根据请求更改模型.

我能够设置并看到b0/b1正确设置flatMap2,但flatMap1始终看到b0和b1在初始化时设置为0.

我错过了一些明显的东西吗?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

10
推荐指数
1
解决办法
1319
查看次数

Flink流媒体事件时间窗口排序

我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}
Run Code Online (Sandbox Code Playgroud)

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)
Run Code Online (Sandbox Code Playgroud)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

10
推荐指数
1
解决办法
2377
查看次数

Flink 1.13.2:NoResourceAvailableException

这是在 Amazon Kinesis Data Analytics Flink 环境中运行的 Flink 1.13.2。

该应用程序在 Kafka 主题上运行。当主题的流量较小时,该应用程序运行良好,当流量较大时,我收到此错误。如何排除故障、调整和修复?

我看到类似的问题,但这在旧版本的 Flink 中显然是一个单独的问题: Apache Flink - WordCount - NoResourceAvailableException

异常跟踪是:

2021-12-30 18:16:45
java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:128)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:362)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:351)
    at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:824)
    at jdk.internal.reflect.GeneratedMethodAccessor133.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

9
推荐指数
3
解决办法
1万
查看次数

Flink中的操作员并行性的一些困惑

我只是得到下面关于并行性的示例,并且有一些相关的问题:

  1. setParallelism(5)将Parallelism 5设置为求和或flatMap和求和?

  2. 是否可以分别为flatMap和sum等不同的运算符设置不同的Parallelism?例如将Parallelism 5设置为sum和10设置为flatMap。

  3. 根据我的理解,keyBy正在根据不同的密钥将DataStream划分为逻辑Stream \分区,并假设有10,000个不同的键值,因此有10,000个不同的分区,那么有多少个线程可以处理10,000个分区?只有5个线程?如果不设置setParallelism(5)怎么办?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

8
推荐指数
1
解决办法
3113
查看次数

从IDE运行时Flink webui

我想在网上看到我的工作.

我使用createLocalEnvironmentWithWebUI,代码在IDE中运行良好,但无法在http:// localhost:8081 /#/ overview中看到我的工作

  val conf: Configuration = new Configuration()
  import org.apache.flink.configuration.ConfigConstants
  conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
  val env =  StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val rides = env.addSource(
    new TaxiRideSource("nycTaxiRides.gz", 1,100))//60, 600))

  val filteredRides = rides
    .filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
    .map(r => (r.passengerCnt, 1))
    .keyBy(_._1)
    .window(TumblingTimeWindows.of(Time.seconds(5)))
    .sum(1)
    .map(r => (r._1.toString+"test", r._2))

  filteredRides.print()
  env.execute("Taxi Ride Cleansing")
Run Code Online (Sandbox Code Playgroud)

我需要设置其他东西吗?

apache-flink flink-streaming

8
推荐指数
2
解决办法
2486
查看次数

标签 统计

apache-flink ×10

flink-streaming ×10

apache-storm ×1

bigdata ×1

java ×1

scala ×1