标签: apache-flink

apache flink session支持

我正在调查Apache Flink Streaming,用于我们的ETL和机器学习平台.我还没想到的是如何将事件流式传输到"会话"中.更具描述性:所有事件都包含会话ID,为了丰富数据,我需要将属于会话的所有事件组合在一起.请注意事件是连续流入的(因此没有批量支持,之后您可以简单地执行groupBy例如)

一种可能的解决方案是维护会话的LRU缓存并将所有传入事件排序到其关联会话.然后,在每个会话不活动X分钟后,可以从缓存中"关闭"或逐出会话.问题是如何在多租户系统中处理此缓存; flink是否具有分布式缓存的概念,还是包含某种智能负载均衡器,其中事件被定向到网格中的同一分区?

更一般地说:使用流式api建立会话支持的最佳方式(用例和陷阱)是什么?这有可能吗?以及如何处理重放流?(即从事件流入不完整会话的特定时间点开始(即在时间点之前发生事件)

对任何反馈,想法和/或指针感兴趣.

提前致谢

streaming session apache-flink

3
推荐指数
1
解决办法
1176
查看次数

Flink:写入dev/null

我想执行我的Flink Scala程序而不将结果写入文件.无法将结果保存DataSet到a中运行程序DataSink.所以我试过了<result dataset name>.writeAsText("file:///dev/null", WriteMode.OVERWRITE).那没用.发生以下错误:

> java.io.IOException: Output path 'file:/dev/null' could not be
> initialized. Canceling task...    at
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:228)
>   at
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:77)
>   at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:187)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)     at
> java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

有没有办法将DataSink与dev/null路径一起使用?

apache-flink

3
推荐指数
1
解决办法
321
查看次数

如何在 flink 独立安装上进行 kerberos 身份验证?

我有一个独立的 Flink 安装,我想在其上运行一个将数据写入 HDFS 安装的流作业。HDFS 安装是 Cloudera 部署的一部分,需要 Kerberos 身份验证才能读取和写入 HDFS。由于我没有找到有关如何使 Flink 与受 Kerberos 保护的 HDFS 连接的文档,因此我不得不对该过程进行一些有根据的猜测。这是我到目前为止所做的:

  • 我为我的用户创建了一个密钥表文件。
  • 在我的 Flink 工作中,我添加了以下代码:

    UserGroupInformation.loginUserFromKeytab("myusername", "/path/to/keytab");
    
    Run Code Online (Sandbox Code Playgroud)
  • 最后我使用 aTextOutputFormat将数据写入 HDFS。

当我运行作业时,出现以下错误:

org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled.  Available:[TOKEN, KERBE
ROS]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
        at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1730)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1668)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1593)
        at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
        at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
        at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:405)
Run Code Online (Sandbox Code Playgroud)

出于某种奇怪的原因,Flink 似乎尝试了 SIMPLE 身份验证,即使我调用了loginUserFromKeytab …

authentication hadoop kerberos apache-flink

3
推荐指数
1
解决办法
2413
查看次数

Apache Flink:如何创建并行JDBC InputFormat?

有一个名为flink-jdbc仅支持非并行元组类型的模块JDBC InputFormat.

为了使用并行InputFormatfor JDBC,似乎需要通过实现接口来定制:org.apache.flink.core.io.InputSplit.

那么在我的情况下,我如何自定义实现JdbcInputSplit从数据库并行查询数据?

apache-flink

3
推荐指数
1
解决办法
598
查看次数

Flink:默认分区/改组策略/功能

Flink默认使用哪些策略(散列,排序)对数据集进行缩小/分组(例如groupBy或reduce函数)?Flink使用哪些API函数

  1. 在洗牌步骤中进行分区并
  2. 对分区中的元素进行排序

默认?

apache-flink

3
推荐指数
1
解决办法
1240
查看次数

如何通过JMX远程连接Flink?

对于我即将毕业的学士论文,我想开发一种工具,该工具可以从Apache Flink收集系统和应用程序数据,并将这些数据以某种“事件”的形式发送到另一个系统。该工具将安装在Flink作业和任务管理器节点上。除了来自dstat之类的linux系统实用程序的数据外,我还想收集JMX数据。

我的问题是,我无法弄清楚如何使用Flinks jobmanager的端口通过远程JMX连接进行连接。尽管收集器将在同一台计算机上,但我确实尝试避免使用--javaagent访问Flink JVM的JMX数据。

另一个问题是,我有一个基于https://github.com/apache/flink/tree/master/flink-contrib/docker-flink的本地docker 设置,并更新为flink-1.0.2,无法通过jconsole连接因为我不知道如何为作业和任务管理器“打开” JMX远程端口。

有什么办法可以做到这一点?

在此先感谢,任何想法都非常感谢。

jmx apache-flink

3
推荐指数
1
解决办法
1568
查看次数

Flink + Kafka + JSON - java示例

我正在尝试使用以下代码从Kafka主题获取JSON:

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream messageStream = env.addSource(
                new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

        messageStream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        });

        env.execute();
    }
}
Run Code Online (Sandbox Code Playgroud)

}

问题是:

1)此程序未运行到期

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function …
Run Code Online (Sandbox Code Playgroud)

java json apache-kafka apache-flink

3
推荐指数
1
解决办法
7295
查看次数

将Kafka消费者和生产者集成到一个功能中

我们需要开发一个代码,让消费者在运行时侦听特定的kafka生产者,然后在同一函数中产生从当前消耗的数据到另一个生产者主题的已处理数据。

这是为了将代码与Java集成在一起,使Java将代码链接到Java,在Java中Java生成到一个主题的消息,而Flink使用它并为另一个主题生成新数据,以供Java进一步处理。

请让我们知道是否还有另一种方法可以执行此过程。

scala apache-kafka kafka-consumer-api apache-flink kafka-producer-api

3
推荐指数
1
解决办法
501
查看次数

Scala案例类中的init方法的java.lang.NoSuchMethodException

我正在编写一个Apache Flink流应用程序,该程序反序列化从Kafka总线读取的数据(Avro格式)(有关更多详细信息,请参见此处)。数据将反序列化为Scala案例类。运行程序时出现异常,它收到来自Kafka的第一条消息

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at org.myorg.quickstart.StreamingKafkaClient$.main(StreamingKafkaClient.scala:26)
    at org.myorg.quickstart.StreamingKafkaClient.main(StreamingKafkaClient.scala)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
    at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
    at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
    at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.myorg.quickstart.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.scala:20)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.getDeclaredConstructor(Class.java:2178)
    at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
    ... 16 more

Process finished with exit code 1 …
Run Code Online (Sandbox Code Playgroud)

scala avro apache-kafka apache-flink

3
推荐指数
1
解决办法
822
查看次数

对动态缩放flink作业有任何想法吗?

If there is a kafka topic with 10 partitions and we'd like to use flink to consume the topic. We want the system to allocate slots dynamically according to workload, which means if the workload is low, the flink job can use less slots(with less parallelism), and if the workload is high it can run with higher parallelism. Is there a good way to achieve this? It seems that the parallelism can be changed with stopping the job first. if …

apache-flink

3
推荐指数
1
解决办法
366
查看次数