标签: apache-flink

在Flink中,流窗口似乎不起作用?

我试图增强显示流的使用的Flink示例.我的目标是使用窗口功能(请参阅window函数调用).我假设下面的代码输出流的最后3个数字的总和.(由于nc -lk 9999在ubuntu上打开了流)实际上,输出总结了输入的所有数字.切换到时间窗口会产生相同的结果,即不会产生窗口.

那是一个错误吗?(使用的版本:github上的最新版本)

object SocketTextStreamWordCount {
  def main(args: Array[String]) {
    val hostName = args(0)
    val port = args(1).toInt
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port)    
    val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
    .filter { (x:String) => x.nonEmpty }      
    .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
    //  .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
      .map { (x:String) => ("not used; just to have a tuple for …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
787
查看次数

Apache Flink:如何使用Flink DataSet API从一个数据集创建两个数据集

我正在使用Flink 0.10.1的DataSet API编写应用程序.我可以在Flink中使用单个运算符获得多个收集器吗?

我想做的是如下:

val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
  (iterator, collector1, collector2) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector1.collect(elem1)
      collector2.collect(elem2)
    }
  } 
} 
Run Code Online (Sandbox Code Playgroud)

目前我正在调用mapPartition两次从一个源数据集生成两个数据集.

val lines = env.readTextFile(...)
val out_small = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem1)
    }
  } 
}
val out_large = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
476
查看次数

Flink字数统计示例缺少库

我正在尝试使用Flink运行示例程序。我使用下载了示例项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=0.8.1

然后,我在终端上运行以下命令

mvn package && java -cp target/test-1.0-SNAPSHOT.jar adfin.WordCount

我收到以下错误

线程“主要”中的异常java.lang.NoClassDefFoundError:adfin.WordCount.main(WordCount.scala)处的org / apache / flink / api / common / typeinfo / TypeInformation造成原因:java.lang.ClassNotFoundException:org.apache.flink .api.common.typeinfo.TypeInformation at java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher $ AppClassLoader.loadClass(Launcher .java:331),位于java.lang.ClassLoader.loadClass(ClassLoader.java:357)

我尝试将库从默认的0.8.1更新为1.0.0,但这没有任何改变。我想可能需要添加一个额外的jar。有人可以帮忙吗?

scala maven apache-flink

2
推荐指数
1
解决办法
1595
查看次数

FLINK:如何使用相同的StreamExecutionEnvironment从多个kafka集群中读取

我想从FLINK中的多个KAFKA集群中读取数据.

但结果是kafkaMessageStream只从第一个Kafka读取.

只有当我为Kafka分别2个流时,我才能从两个Kafka集群中读取,这不是我想要的.

是否可以将多个源连接到单个阅读器.

示例代码

public class KafkaReader<T> implements Reader<T>{

private StreamExecutionEnvironment executionEnvironment ;

public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){
    executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 1500));

    executionEnvironment.enableCheckpointing(
            Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE);
    executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
    //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    //try {
    //  executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH)));
        // The RocksDBStateBackend or The FsStateBackend
    //} catch (IOException e) {
        // LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage());
    }

    return executionEnvironment;
}
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) {


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
            properties_k1.getProperty(Constants.TOPIC),deserializationSchema, …
Run Code Online (Sandbox Code Playgroud)

java apache-flink flink-streaming

2
推荐指数
1
解决办法
837
查看次数

Apache Flink:如何处理背压?

对于操作员,输入流比其输出流快,因此其输入缓冲区将阻塞前一个操作员的输出线程,该线程将数据传输到该操作员。对?

Flink和Spark是否都通过阻塞线程来处理背压?那么它们之间有什么区别?

对于数据源,它会持续生成数据,如果其输出线程被阻塞怎么办?缓冲区会溢出吗?

backpressure apache-flink

2
推荐指数
1
解决办法
1231
查看次数

Flink完全一次消息处理

我已经设置了一个Flink 1.2独立集群,其中包含2个JobManagers和3个TaskManagers,我正在使用JMeter通过生成Kafka消息/事件对其进行加载测试,然后对其进行处理.处理作业在TaskManager上运行,通常需要~15K事件/秒.
该作业已设置EXACTLY_ONCE检查点,并将状态和检查点持久保存到Amazon S3.如果我关闭运行作业的TaskManager,它需要几秒钟,然后在另一个TaskManager上恢复作业.该作业主要记录连续整数的事件ID(例如,从0到1200000).
当我检查TaskManager上的输出时,我关闭了最后一次计数,例如500000,然后当我在另一个TaskManager上检查恢复作业的输出时,它以~400000开始.这意味着~100K的重复事件.这个数字取决于测试的速度可以更高或更低.
不确定我是否遗漏了一些东西,但我希望在重新启动不同的TaskManager后,该作业会显示下一个连续的数字(如500001).
有谁知道为什么这发生/额外的设置我必须配置,以获得一次?

apache-flink flink-streaming flink-cep

2
推荐指数
1
解决办法
1649
查看次数

流的重用是否是流的副本

例如,有一个密钥流:

val keyedStream: KeyedStream[event, Key] = env
    .addSource(...)
    .keyBy(...)

// several transformations on the same stream
keyedStream.map(....)
keyedStream.window(....)
keyedStream.split(....)
keyedStream...(....)
Run Code Online (Sandbox Code Playgroud)

我认为这是Flink中相同流的重用,我发现重用它时,流的内容不受其他转换的影响,因此我认为它是同一流的副本。

  • 但我不知道这是否正确。

  • 如果是,这将使用大量资源(哪些资源?)来保存副本?

apache-flink flink-streaming

2
推荐指数
1
解决办法
634
查看次数

命名FlinkKafka消费者或生产者

DataStreamSink类具有一个名称字段。为什么FlinkKafkaConsumer和FlinkKafkaProducer没有相似的字段?在“ Flink仪表板”中,我的洗手池所有人都说“未命名”

apache-flink flink-streaming

2
推荐指数
1
解决办法
348
查看次数

Apache Flink:由于类型擦除,无法自动确定函数的返回类型

我使用Java中的Flink编写了一个简单程序,该程序将文件或文本作为输入,然后使用flatMap函数打印所有单词。

这是我的代码:

        final ParameterTool params = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(params);
        // show user defined parameters in the apache flink dashboard

        DataStream<String> dataStream;

        if(params.has("input")) 
        {
            System.out.println("Executing Words example with file input");
            dataStream = env.readTextFile(params.get("input"));
        }else if (params.has("host") && params.has("port")) 
        {
            System.out.println("Executing Words example with socket stream");
            dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
        }
        else {
            System.exit(1);
            return;
        }

        DataStream<String> wordDataStream = dataStream.flatMap(
                (String sentence, Collector<String> out) -> {
                    for(String word: sentence.split(" "))
                        out.collect(word);
        });

        wordDataStream.print();

        env.execute("Word Split");  
Run Code Online (Sandbox Code Playgroud)

但是当我使用以下命令运行它时:

bin/flink run -c …
Run Code Online (Sandbox Code Playgroud)

java apache-flink

2
推荐指数
1
解决办法
1459
查看次数

提交flink作业时如何处理akka AskTimeoutException

Flink 1.5.3,当我将Flink作业提交到Flink群集(在Yarn上)时,它总是抛出AskTimeoutException。在flink配置文件中,我已经配置了参数“ akka.ask.timeout = 1000s”,但是下面的异常仍然是这样。

这意味着我增加了超时参数“ akka.ask.timeout=1000s”,但是它不起作用。

org.apache.flink.runtime.rest.handler.RestHandlerException: Job submission failed.
    at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$handleRequest$2(JobSubmitHandler.java:116)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
    at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:770)
    at akka.dispatch.OnComplete.internal(Future.scala:258)
    at akka.dispatch.OnComplete.internal(Future.scala:256)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1851759541]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
    at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) …
Run Code Online (Sandbox Code Playgroud)

java akka apache-flink flink-streaming

2
推荐指数
1
解决办法
1120
查看次数