标签: apache-flink

Flink Scala API在泛型参数上起作用

这是关于Flink Scala API"没有足够的论据"的后续问题.

我希望能够传递Flink的DataSet并使用它做一些事情,但数据集的参数是通用的.

这是我现在遇到的问题:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.reflect.ClassTag

object TestFlink {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val split = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    id(split).print()

    env.execute()
  }

  def id[K: ClassTag](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
}
Run Code Online (Sandbox Code Playgroud)

我有这个错误ds.map(r => r):

Multiple markers at this line
    - not enough arguments for …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
1129
查看次数

writeAsCSV()和writeAsText()是意外的

我通过Scala API使用apache flink,在某些时候我获得了一个 DataSet[(Int, Int, Int)].使用方法的结果 writeAsCSV()writeAsText()出乎意料.它创建了一个目录.该目录具有位置和名称方法调用的第一个参数(例如) filePath .在该目录中,两个文件以名称"1"和"2"出现.在这些文件中,我可以看到DataSet数据.他们似乎将DataSets内容划分为这两个文件.尝试重新创建此行为以显示更简洁的代码片段,我不能.那就是我目睹了在预期位置创建了一个具有预期名称的文件而没有创建目录.val mas = ma_ groupBy(0,1)sum(2)mas.writeAsCsv("c:\ flink\mas.csv")

导致创建名为"mas.csv"的目录,并在其中创建两个文件"1"和"2".什么时候发生这样的事情?使用flink 9.1本地模式,Windows 7,scala 2.10,eclipse3.0.3

scala writetofile apache-flink

2
推荐指数
1
解决办法
895
查看次数

为什么Flink SocketTextStreamWordCount不起作用?

我已经设置了示例项目并构建了它.我能按预期运行WordCount程序.但是当我运行SocketTextWordCount时,我没有打印出任何结果.

  • 我通过nc发送数据(localhost:9999两侧)
  • 在正在运行的作业的Web控制台中,我可以看到正在发送/接收消息

但我从未看到在任何地方打印出的counts.print()输出,即使在杀死nc会话之后也是如此.

编辑 - 当我改变它以将结果打印到文本文件时,没问题.所以问题似乎是count.print()没有正确写入我正在运行示例的控制台的stdout.

apache-flink flink-streaming

2
推荐指数
1
解决办法
633
查看次数

Apache Flink:创建一个滞后数据流

我刚刚开始使用Scala的Apache Flink.有人可以告诉我如何从我当前的数据流创建滞后流(滞后k事件或k单位时间)?

基本上,我想在数据流上实现自动回归模型(在流上使用时间滞后版本的线性回归).因此,需要一种类似于以下伪代码的方法.

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}
Run Code Online (Sandbox Code Playgroud)

如果每个事件的间隔为1秒,并且有2秒的延迟,我希望样本输入和输出如下.

输入:1,2,3,4,5,6,7 ...
输出:NA,NA,1,2,3,4,5 ......

scala apache-flink flink-streaming

2
推荐指数
1
解决办法
257
查看次数

使用S3AFileSystem的Flink不会从S3读取子文件夹

我们正在使用Flink 1.2.0和建议的S3AFileSystem配置.当源是S3存储桶中的单个文件夹时,简单的流式传输作业按预期工作.

该作业运行没有错误-但并没有产生输出-当它的来源是它本身包含子文件夹的文件夹.

为清楚起见,下面是S3存储桶的模型.运行作业以指向s3a://bucket/folder/2017/04/25/01/正确读取存储在存储桶中的所有三个对象和任何后续对象.将作业指向s3a://bucket/folder/2017/(或任何其他中间文件夹)会导致作业无法生成任何内容.

在绝望中,我们尝试了[in | ex]包含尾随的排列/.

.
`-- folder
    `-- 2017
        `-- 04
            |-- 25
            |   |-- 00
            |   |   |-- a.txt
            |   |   `-- b.txt
            |   `-- 01
            |       |-- c.txt
            |       |-- d.txt
            |       `-- e.txt
            `-- 26
Run Code Online (Sandbox Code Playgroud)

工作代码:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val lines: DataStream[String] = env.readFile(
    inputFormat …
Run Code Online (Sandbox Code Playgroud)

hadoop amazon-s3 apache-flink flink-streaming

2
推荐指数
1
解决办法
1104
查看次数

在Flink Mapstate中删除TTL过期密钥

我需要能够从地图状态中删除比固定时间还旧的旧键。我目前将每个事件的时间戳记保存在键状态图中,并且我希望有一个异步过程来删除这些陈旧的键。

我使用RocksDB作为国家后台,我不认为RocksDB的Java API的支持与TTL开放作为注意到这里

所以我的问题是:

  • 因为它在操作符函数中运行,所以根本不可能有一个访问Mapstate的异步线程吗?
  • 在这种情况下有更好的做法吗?

提前致谢,

apache-flink

2
推荐指数
1
解决办法
987
查看次数

Apache Flink:如何使用Table API查询关系数据库?

以下代码段摘自该博客文章

val sensorTable = ??? // can be a CSV file, Kafka topic, database, or ...

// register the table source
tEnv.registerTableSource("sensors", sensorTable)
Run Code Online (Sandbox Code Playgroud)

我想从关系数据库中读取数据。Flink是否有TableSource用于JDBC数据库的?

apache-flink flink-streaming flink-sql

2
推荐指数
1
解决办法
1067
查看次数

Apache Flink的分离模式是什么?

我在Flink文档中看到了这一行,但无法弄清楚“分离模式”的含义。请帮忙。谢谢。

在分离模式下运行示例程序:

./bin/flink run -d ./examples/batch/WordCount.jar
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
627
查看次数

如何在Apache Flink中加入两个流?

我正在开始使用flink,并查看其中的官方教程之一

据我了解,此练习的目标是将两个时间流加入time属性。

任务:

该练习的结果是Tuple2记录的数据流,每个不同的rideId一个。您应该忽略END事件,而仅将每次乘车的START事件及其相应的票价数据加入。

生成的流应打印为标准输出。

问题: EnrichmentFunction如何又可以加入两个流。它怎么知道参加哪个游乐项目的公平?我希望它可以缓冲多个博览会/竞赛,直到传入的博览会/竞赛有一个匹配的伙伴。

以我的理解,它只是保存了它看到的每一次乘车/展览,并将其与下一个最佳乘车/展览结合在一起。为什么这是适当的联接?

提供的解决方案:

/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, …
Run Code Online (Sandbox Code Playgroud)

java apache-flink

2
推荐指数
1
解决办法
841
查看次数

Apache Flink:重新启动应用程序后,我的应用程序无法从检查点恢复

我有一个Flink作业,我在其中从文件夹读取文件并将其转储到数据库中。每天都有新文件进入该文件夹。

我已启用检查点,以便如果由于某种原因Flink作业停止并且需要再次启动它,则Flink作业不应读取已读取的文件。

我在代码中添加了以下几行,但是当我重新启动作业时,Flink作业再次读取所有文件。

env.setStateBackend(new FsStateBackend(“ file:/// C:// Users // folder”)); env.enableCheckpointing(10L);

apache-flink flink-streaming

2
推荐指数
1
解决办法
604
查看次数