它说:
启用检查点:如果启用检查点,Flink Kafka Consumer 将在检查点完成时提交存储在检查点状态中的偏移量。这确保了 Kafka 代理中提交的偏移量与检查点状态中的偏移量一致。用户可以通过调用消费者上的 setCommitOffsetsOnCheckpoints(boolean) 方法来选择禁用或启用偏移量提交(默认情况下,该行为为 true)。请注意,在这种情况下,属性中的自动定期偏移提交设置将被完全忽略。
如果我以 10 秒的间隔启用检查点,我也会设置为 true,并在 Kafka 消费者属性中have setCommitOffsetsOnCheckpoints设置 enable.auto.commit=true和 。auto.commit.interval.ms=5000
那么偏移量提交会发生什么行为呢?偏移量会每 10 秒提交 3 次吗?一次来自 flink 执行检查点时,两次来自 Kafka 消费者的自动提交?
Yarn 上的 Flink 1.13.2 (Flink SQL)。
\n有点困惑 - 我发现了两个(据我所知)不同规格的文件系统连接器(Ververica.com 与 ci.apache.org):
\nhttps://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors \xe2\x80\x94 文件系统是“有界和无界扫描、查找”
\nhttps://docs.ververica.com/user_guide/sql_development/connectors.html#packaged-connectors \xe2\x80\x94 仅 JDBC 标记为可用于查找。
\n我可以使用文件系统连接器 (csv) 创建查找(维度)表来丰富 Kafka 事件表吗?如果是的话——如何使用 Flink SQL?
\n(我尝试过简单的左连接FOR SYSTEM_TIME AS OF a.event_datetime- 它在具有少量 Kafka 事件的测试环境中有效,但在生产中我收到GC overhead limit exceeded错误。我猜这是因为没有将小型 csv 表广播到工作节点。在 Spark 中,我曾经使用相关提示来解决这些类型问题。)
我按照第一步安装 Flink。我可以毫无问题地启动集群
$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host DESKTOP-....
Starting taskexecutor daemon on host DESKTOP-....
Run Code Online (Sandbox Code Playgroud)
但我没有得到任何状态
$ ps aux | grep flink
Run Code Online (Sandbox Code Playgroud)
我也无法通过 localhost:8081 访问仪表板。
有一篇较旧的帖子存在这些问题,但该解决方案对我不起作用,因为显然所描述的conf文件不再存在。
我的 JAVA_HOME 设置为 C:\Progra~1\Java\jdk1.8.0_311 以避免 Program Files 中的空间问题。
我使用KeyedCoProcessFunction函数用来自另一个流的数据来丰富主数据流
代码:
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {
case class AssetStateDoc(assetId: Option[String])
private var associatedDevices: ValueState[AssetStateDoc] = _
override def open(parameters: Configuration): Unit = {
val associatedDevicesDescriptor =
new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
}
override def processElement1(
packet: PacketData,
ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
out: Collector[AssetData]): Unit = {
val tmpState = associatedDevices.value
val state = if (tmpState == null) AssetStateDoc(None) else tmpState
state.assetId match {
case Some(assetId) =>
logger.debug(s"There are state for …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 flink table api (1.15.1 版本)写入本地文件系统。我正在使用 TableDescriptor.forConnector('filesystem') 但出现异常:无法找到在类路径中实现 DynamicTableFactory 的标识符“filesystem”的任何工厂。
谢谢:)
apache-flink flink-streaming flink-sql flink-batch flink-table-api
我正在使用 intellij、maven 3 和 flink 1.15.1 编写有状态流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试遵循此处的文档以及添加此处提到的依赖项。我对使用 KeyedOneInputStreamOperatorTestHarness 类感兴趣,但我在任何依赖项中都找不到该类(下面发布的经过编辑的 pom)。我唯一能找到该课程的方法是包含
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.5</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这是来自以前的版本。Flink >= 1.15.0 现在不需要 scala 依赖库,因此 1.15.1 没有 flink-streaming-java_2.12。
我想要使用的类是否被移到其他地方或故意排除?我已经尝试了我所知道的所有 google foo,并且可以在 flink 存储库中找到该类,但在我迄今为止尝试过的任何 flink 依赖项中都找不到该类,除了 <=1.14.5 之外。我是否做错了什么或缺少一些文档?使用 1.14.5 库是我唯一的选择还是有一些我不知道的新型测试实用程序?
没有该类的 Pom 依赖项:
...
<flink.version>1.15.1</flink.version>
...
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope> …Run Code Online (Sandbox Code Playgroud) 我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindows了Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)
这是中的代码TradeTrigger:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
} …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Apache Flink使用两种不同的算法处理数据流.我的伪代码如下:
env = getEnvironment();
DataStream<Event> inputStream = getInputStream();
// How to replicate the input stream?
Array[DataStream<Event>] inputStreams = inputStream.clone()
// apply different operations on the replicated streams
outputOne = inputStreams[0].map(func1);
outputTwo = inputStreams[1].map(func2);
...
outputOne.addSink(sink1);
outputTwo.addSink(sink2);
env.execute();
Run Code Online (Sandbox Code Playgroud)
我用Flink文档做了一些研究.似乎没有克隆流的概念.无论DataStream.iterate()也不DataStream.split()正在做的正是我想要的.是否有从源代码中多次创建流的替代方法?谢谢您的帮助.
我在读一个简单的JSON字符串作为输入和键控基于两个领域流A和B。但KeyBy产生了对不同值的相同键合流B,但为的特定组合A和B。
输入:
{
"A": "352580084349898",
"B": "1546559127",
"C": "A"
}
Run Code Online (Sandbox Code Playgroud)
这是我的Flink代码的核心逻辑:
DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
.map(new MapFunction<String, GenericDataObject>() {
@Override
public GenericDataObject map(String s) throws Exception {
JSONObject jsonObject = new JSONObject(s);
GenericDataObject genericDataObject = new GenericDataObject();
genericDataObject.setA(jsonObject.getString("A"));
genericDataObject.setB(jsonObject.getString("B"));
genericDataObject.setC(jsonObject.getString("C"));
return genericDataObject;
}
});
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
.keyBy("A", "B")
.map(new MapFunction<GenericDataObject, GenericDataObject>() {
@Override
public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
return genericDataObject;
}
});
testStream.print();
Run Code Online (Sandbox Code Playgroud)
GenericDataObject是一个POJO,具有三个字段A, …
我正在尝试做一个flinkscala api hello世界,但是无法复制从https://www.slideshare.net/dataArtisans/apache-flink-datastream-api-basics/20看到的开始的教程片段。
我尝试的副本是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Order(user: String, product: String, amount: Double, proctime: Int, rowtime: Int)
def basic() = {
val seq = (1 to 50).map { i => Order(s"User" + (i % 10).toString, "Product" + (i % 20), 2.0 * (4 * i +.5 * i * i -.1 * i * i * i), i * 10, i * 3) }
val ds: DataStream[Order] = env.fromElements(seq:_*)
Run Code Online (Sandbox Code Playgroud)
隐式不起作用,但是
错误:(21,30)找不到类型为org.apache.flink.api.common.typeinfo.TypeInformation [com.blazedb.spark.FlinkDemo.Order]的证据参数的隐含值
这里需要更改什么?
apache-flink ×10
flink-sql ×2
flink-batch ×1
implicit ×1
localhost ×1
maven ×1
scala ×1
windows ×1