标签: apache-flink

如果我将自动提交设置为 true 且

我正在阅读https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-offset-committing-behaviour-configuration

它说:

启用检查点:如果启用检查点,Flink Kafka Consumer 将在检查点完成时提交存储在检查点状态中的偏移量。这确保了 Kafka 代理中提交的偏移量与检查点状态中的偏移量一致。用户可以通过调用消费者上的 setCommitOffsetsOnCheckpoints(boolean) 方法来选择禁用或启用偏移量提交(默认情况下,该行为为 true)。请注意,在这种情况下,属性中的自动定期偏移提交设置将被完全忽略。

如果我以 10 秒的间隔启用检查点,我也会设置为 true,并在 Kafka 消费者属性中have setCommitOffsetsOnCheckpoints设置 enable.auto.commit=true和 。auto.commit.interval.ms=5000

那么偏移量提交会发生什么行为呢?偏移量会每 10 秒提交 3 次吗?一次来自 flink 执行检查点时,两次来自 Kafka 消费者的自动提交?

apache-flink

1
推荐指数
1
解决办法
2067
查看次数

我可以使用 Flink 的文件系统连接器作为查找表吗?

Yarn 上的 Flink 1.13.2 (Flink SQL)。

\n

有点困惑 - 我发现了两个(据我所知)不同规格的文件系统连接器(Ververica.com 与 ci.apache.org):

\n
    \n
  1. https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors \xe2\x80\x94 文件系统是“有界和无界扫描、查找

    \n
  2. \n
  3. https://docs.ververica.com/user_guide/sql_development/connectors.html#packaged-connectors \xe2\x80\x94 仅 JDBC 标记为可用于查找。

    \n
  4. \n
\n

我可以使用文件系统连接器 (csv) 创建查找(维度)表来丰富 Kafka 事件表吗?如果是的话——如何使用 Flink SQL?

\n

(我尝试过简单的左连接FOR SYSTEM_TIME AS OF a.event_datetime- 它在具有少量 Kafka 事件的测试环境中有效,但在生产中我收到GC overhead limit exceeded错误。我猜这是因为没有将小型 csv 表广播到工作节点。在 Spark 中,我曾经使用相关提示来解决这些类型问题。)

\n

apache-flink flink-streaming flink-sql

1
推荐指数
1
解决办法
485
查看次数

无法在 Windows 上访问 Flink 仪表板 localhost:8081

我按照第一步安装 Flink。我可以毫无问题地启动集群

$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host DESKTOP-....
Starting taskexecutor daemon on host DESKTOP-....
Run Code Online (Sandbox Code Playgroud)

但我没有得到任何状态

$ ps aux | grep flink
Run Code Online (Sandbox Code Playgroud)

我也无法通过 localhost:8081 访问仪表板。

有一篇较旧的帖子存在这些问题,但该解决方案对我不起作用,因为显然所描述的conf文件不再存在。

我的 JAVA_HOME 设置为 C:\Progra~1\Java\jdk1.8.0_311 以避免 Program Files 中的空间问题。

windows localhost apache-flink

1
推荐指数
1
解决办法
3306
查看次数

Flink KeyedCoProcessFunction 处理状态

我使用KeyedCoProcessFunction函数用来自另一个流的数据来丰富主数据流

代码:

class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

1
推荐指数
1
解决办法
1164
查看次数

Flink 表 api 文件系统连接器不可用

我正在尝试使用 flink table api (1.15.1 版本)写入本地文件系统。我正在使用 TableDescriptor.forConnector('filesystem') 但出现异常:无法找到在类路径中实现 DynamicTableFactory 的标识符“filesystem”的任何工厂。

谢谢:)

apache-flink flink-streaming flink-sql flink-batch flink-table-api

1
推荐指数
1
解决办法
588
查看次数

Apache Flink 是否放弃了 StreamOperatorTestHarness 类,或者它们是否转移到了不同​​的工件?

我正在使用 intellij、maven 3 和 flink 1.15.1 编写有状态流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试遵循此处的文档以及添加此处提到的依赖项。我对使用 KeyedOneInputStreamOperatorTestHarness 类感兴趣,但我在任何依赖项中都找不到该类(下面发布的经过编辑的 pom)。我唯一能找到该课程的方法是包含

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.5</version>
            <scope>test</scope>
            <type>test-jar</type>
        </dependency>
Run Code Online (Sandbox Code Playgroud)

这是来自以前的版本。Flink >= 1.15.0 现在不需要 scala 依赖库,因此 1.15.1 没有 flink-streaming-java_2.12。

我想要使​​用的类是否被移到其他地方或故意排除?我已经尝试了我所知道的所有 google foo,并且可以在 flink 存储库中找到该类,但在我迄今为止尝试过的任何 flink 依赖项中都找不到该类,除了 <=1.14.5 之外。我是否做错了什么或缺少一些文档?使用 1.14.5 库是我唯一的选择还是有一些我不知道的新型测试实用程序?

没有该类的 Pom 依赖项:

...
        <flink.version>1.15.1</flink.version>
...
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils-junit</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope> …
Run Code Online (Sandbox Code Playgroud)

maven apache-flink flink-streaming

1
推荐指数
1
解决办法
465
查看次数

Flink自定义触发器提供了意外的输出

我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindowsTrigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)

这是中的代码TradeTrigger

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            } …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

0
推荐指数
1
解决办法
970
查看次数

Apache Flink流程多次流

我正在尝试使用Apache Flink使用两种不同的算法处理数据流.我的伪代码如下:

env = getEnvironment();
DataStream<Event> inputStream = getInputStream();
// How to replicate the input stream?
Array[DataStream<Event>] inputStreams = inputStream.clone()

// apply different operations on the replicated streams
outputOne = inputStreams[0].map(func1);
outputTwo = inputStreams[1].map(func2);
 ...
outputOne.addSink(sink1);
outputTwo.addSink(sink2);
env.execute();
Run Code Online (Sandbox Code Playgroud)

我用Flink文档做了一些研究.似乎没有克隆流的概念.无论DataStream.iterate()也不DataStream.split()正在做的正是我想要的.是否有从源代码中多次创建流的替代方法?谢谢您的帮助.

apache-flink

0
推荐指数
1
解决办法
964
查看次数

KeyBy不会为不同的键创建不同的键流

我在读一个简单的JSON字符串作为输入和键控基于两个领域流AB。但KeyBy产生了对不同值的相同键合流B,但为的特定组合AB

输入:

{
    "A": "352580084349898",
    "B": "1546559127",
    "C": "A"
}
Run Code Online (Sandbox Code Playgroud)

这是我的Flink代码的核心逻辑:

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
            .map(new MapFunction<String, GenericDataObject>() {
                @Override
                public GenericDataObject map(String s) throws Exception {
                    JSONObject jsonObject = new JSONObject(s);
                    GenericDataObject genericDataObject = new GenericDataObject();
                    genericDataObject.setA(jsonObject.getString("A"));
                    genericDataObject.setB(jsonObject.getString("B"));
                    genericDataObject.setC(jsonObject.getString("C"));
                    return genericDataObject;
                }
            });
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
            .keyBy("A", "B")
            .map(new MapFunction<GenericDataObject, GenericDataObject>() {
                @Override
                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
                    return genericDataObject;
                }
            });
testStream.print();
Run Code Online (Sandbox Code Playgroud)

GenericDataObject是一个POJO,具有三个字段A, …

apache-flink flink-streaming

0
推荐指数
1
解决办法
329
查看次数

从案例类的集合创建Flink DataStream时,“未找到隐式”

我正在尝试做一个flinkscala api hello世界,但是无法复制从https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics/20看到的开始的教程片段。

在此处输入图片说明

我尝试的副本是:

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  case class Order(user: String, product: String, amount: Double, proctime: Int, rowtime: Int)

  def basic() = {
    val seq = (1 to 50).map { i => Order(s"User" + (i % 10).toString, "Product" + (i % 20), 2.0 * (4 * i +.5 * i * i -.1 * i * i * i), i * 10, i * 3) }
    val ds: DataStream[Order] = env.fromElements(seq:_*)
Run Code Online (Sandbox Code Playgroud)

隐式不起作用,但是

错误:(21,30)找不到类型为org.apache.flink.api.common.typeinfo.TypeInformation [com.blazedb.spark.FlinkDemo.Order]的证据参数的隐含值

C

这里需要更改什么?

scala implicit apache-flink

0
推荐指数
1
解决办法
45
查看次数