我使用的是 Apache Flink 版本1.13.1
我编写了一个自定义指标报告器,但 JobManager 似乎无法识别它。启动时,JobManager 显示以下警告日志:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-08-25 14:54:06,245 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl [] - No metrics reporter configured, no metrics will be exposed/reported.
Run Code Online (Sandbox Code Playgroud)
我在 Flink 插件文件夹中有一个名为的文件夹metrics-kafka,其中包含指标报告器的打包 jar。我也把这个jar复制到lib文件夹中,这两个都不起作用。请参阅下面使用的配置和代码。
Flink配置文件:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, …Run Code Online (Sandbox Code Playgroud) 假设我开始一个新的 Flink Java 项目,如果我寻找“稳定的 Flink Java 生产体验”,我应该使用哪个版本?官方文档说它从 Flink 1.10 开始可以支持 Java-11,但很多用户仍在使用 Java-8,因此尝试了解我是否需要使用AdoptOpenJDK-8或AdoptOpenJDK-11。
我正在测量Flink中的应用程序(WordCount)的内存使用情况ps -p TaskManagerPID -o rss.然而,结果没有任何意义.因为对于每个数据量(1MB,10MB,100MB,1GB,10GB),所使用的内存量相同.对于10GB数据,测量结果甚至小于10GB.TaskManager是否是测量内存使用量的错误过程?Flink Process Model的哪个进程负责内存分配?
我正在构建一个关于Flink 1.2的教程,我想运行一些简单的窗口示例.其中之一是Session Windows.
我想要运行的代码如下:
import <package>.Session
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import scala.util.Try
object SessionWindowExample {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("localhost", 9000)
//session map
val values = source.map(value => {
val columns = value.split(",")
val endSignal = Try(Some(columns(2))).getOrElse(None)
Session(columns(0), columns(1).toDouble, endSignal)
})
val keyValue = values.keyBy(_.sessionId)
// create global window
val sessionWindowStream = keyValue.
window(GlobalWindows.create()).
trigger(PurgingTrigger.of(new SessionTrigger[GlobalWindow]()))
sessionWindowStream.sum("value").print()
env.execute()
}
}
Run Code Online (Sandbox Code Playgroud)
正如您将注意到的,我需要new SessionTrigger基于此类实例化我所做的对象:
import <package>.Session
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import …Run Code Online (Sandbox Code Playgroud) 我正在做一个项目,在该项目中我制作了一个flink数据流程序,并将代码的第一次运行交给另一个团队,以便他们查看。他们都是gradle的大用户,问我在将来的flink项目中使用gradle代替maven是否有任何不利之处或好处。
现在我知道有很多关于Maven和Gradle比较的文章,但是我的意思是专门针对Apache Flink。我发现有两个git hub仓库,乍一看就像他们在flink项目上实现了gradle一样,仅此而已。
而且,如果您能说出您使用flink的经验,那也将是有益的。
所以我想知道的是:
1.)是否有类似“如果flink项目试图切换到gradle,x,y和z根本无法工作”的内容?
2)将flink项目切换到gradle会比在maven和gradle之间切换任何其他Java项目(应该在Java中)更麻烦吗?
3.)gradle的repo是否具有与maven的repo相同的flink依赖项?
4.)您建议进行开关还是不进行开关?
感谢您提前提供的所有帮助!
我想验证我的时间戳.我附上了代码片段:
import org.apache.flink.streaming.api.windowing.time.Time;
Date date = new Date(Time.seconds(30).toMilliseconds());
DateFormat formatter = new SimpleDateFormat("HH:mm:ss:SSS");
String dateFormatted = formatter.format(date);
logger.debug(Time.seconds(30).toMilliseconds() + " " + dateFormatted);
Run Code Online (Sandbox Code Playgroud)
事实证明,结果是:
30000 01:00:30:000
Run Code Online (Sandbox Code Playgroud)
编辑:
[user@hdp ~]$ date
Thu Jun 29 11:39:27 CEST 2017
Run Code Online (Sandbox Code Playgroud)
我错过了什么?谢谢!
我正在实现具有大状态(可能不适合内存)的自定义运算符。我试图为此目的使用ListState。我在用
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
Run Code Online (Sandbox Code Playgroud)
在上面的链接中实现snapshotState()会清除checkpointedState,然后将内存数据结构中的元素添加到checkpointedState。
相反,我在snapshotState()中需要以下内容:
有什么方法可以有选择地从ListState中删除项目吗?
使用Apache Flink版本1.3.2和Cassandra 3.11,我编写了一个简单的代码,使用Apache Flink Cassandra连接器将数据写入Cassandra.以下是代码:
final Collection<String> collection = new ArrayList<>(50);
for (int i = 1; i <= 50; ++i) {
collection.add("element " + i);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<UUID, String>> dataStream = env
.fromCollection(collection)
.map(new MapFunction<String, Tuple2<UUID, String>>() {
final String mapped = " mapped ";
String[] splitted;
@Override
public Tuple2<UUID, String> map(String s) throws Exception {
splitted = s.split("\\s+");
return new Tuple2(
UUID.randomUUID(),
splitted[0] + mapped + splitted[1]
);
}
});
dataStream.print();
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO test.phases (id, …Run Code Online (Sandbox Code Playgroud) 我是Apache Flink的新手,并且正在研究Apache Flink的示例。我发现,如果发生故障,Flink可以从检查点恢复流处理。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000L);
Run Code Online (Sandbox Code Playgroud)
现在,我的问题是Flink在默认情况下将检查点保留在哪里?
任何帮助表示赞赏!
我正在尝试为GCS中的flink作业配置检查点。如果我在本地运行测试作业(没有docker和任何群集设置),则一切正常,但是如果我使用docker-compose或群集设置运行它,并在flink仪表板中部署带有工作的胖子,则它会失败并显示错误。
有什么想法吗?谢谢!
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
Run Code Online (Sandbox Code Playgroud)
环境配置是这样的:
StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setFailOnCheckpointingErrors(false);
checkpointConfig.setCheckpointInterval(10000);
checkpointConfig.setMinPauseBetweenCheckpoints(5000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
RocksDBStateBackend …Run Code Online (Sandbox Code Playgroud)