我是 Flink 的新手,试图了解如何最有效地使用它。
我正在尝试使用 Window API,从 CSV 文件中读取。读取的行被转换为案例类,因此
case class IncomingDataUnit (
sensorUUID: String, radiationLevel: Int,photoSensor: Float,
humidity: Float,timeStamp: Long, ambientTemperature: Float)
extends Serializable {
}
Run Code Online (Sandbox Code Playgroud)
而且,这就是我阅读行的方式:
env.readTextFile(inputPath).map(datum => {
val fields = datum.split(",")
IncomingDataUnit(
fields(0), // sensorUUID
fields(1).toInt, // radiationLevel
fields(2).toFloat, // photoSensor
fields(3).toFloat, // humidity
fields(4).toLong, // timeStamp
fields(5).toFloat // ambientTemperature
)
})
Run Code Online (Sandbox Code Playgroud)
后来,使用一个简单的窗口,我尝试打印最大环境温度,因此:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val readings =
readIncomingReadings(env,"./sampleIOTTiny.csv")
.map(e => (e.sensorUUID,e.ambientTemperature))
.timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
.trigger(CountTrigger.of(5))
.evictor(CountEvictor.of(4))
.max(1)
readings.print
Run Code Online (Sandbox Code Playgroud)
输出包含这些(来自一堆 DEBUG 日志语句):
1> (probe-987f2cb6,29.43)
1> (probe-987f2cb6,29.43)
3> (probe-dccefede,30.02) …Run Code Online (Sandbox Code Playgroud) 我想在 Flink 中的 WindowedStream 上执行一些操作,比如平均操作。但可用的预定义操作非常有限,例如求和、最小值、最大值等。
val windowedStream = valueStream
.keyBy(0)
.timeWindow(Time.minutes(5))
.sum(2) //Change this to average?
Run Code Online (Sandbox Code Playgroud)
假设我想求平均值,我该怎么做呢?
我有来自 Kafka 的消息流,如下所示
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));
Run Code Online (Sandbox Code Playgroud)
如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法,DataStream但它不返回Iterator<String>.
我尝试使用 flink 的表和 sql api 作为一个简单的示例,我从文件中读取字符串,将其转换为 Tuple2 并尝试将其插入表中。这是我的代码。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class table_streaming_test
{
public static void main (String[] args) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env);
env.setParallelism(1);
DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
DataStream<Tuple2<String,Integer>> ds= datastream_in
.flatMap(new Splitter()); // transformation flatmap
Table msg=tEnv.fromDataStream(ds).as("symbol,price");
Table result = msg.select("symbol ='A'");
DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
ds2.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, …Run Code Online (Sandbox Code Playgroud) 我正在使用 Flink FlinkKafkaConsumer09,我想知道 kafka 消费者的偏移量存储在哪里?
我在 Zookeeper 和 Kafka 的偏移主题中都找不到它们。kafka-consumer-offset工具也找不到。
这是 Flink 内部处理的吗?
我试图理解 .yaml 文档中 flink 插槽和并行配置背后的逻辑。
Flink 官方文档指出,对于 cpu 中的每个核心,您必须分配 1 个插槽,同时将并行度提高 1 个。
但我认为这只是一个建议。例如,如果我有一个强大的核心(例如具有最大 GHz 的最新 i7),那么它与具有有限 GHz 的旧 CPU 不同。因此,运行比我的系统 CPU 最大核心数更多的插槽和并行度并不是不合理的。
但是,除了测试不同的配置之外,还有其他方法来检查我的系统使用 flink 的最大功能吗?
仅供记录,我使用 Flink 的 Batch Python API。
我的问题是,如果我们有两个原始事件流,即烟雾和温度,并且我们想通过将运算符应用于原始流来查明复杂事件(即火灾)是否发生,我们可以在 Flink 中执行此操作吗?
我问这个问题是因为到目前为止我所看到的 Flink CEP 的所有示例都只包含一个输入流。如果我错了,请纠正我。
开发apache flink有状态函数时使用python或java有什么优缺点。
b/w Native Kubernetes和Kubernetes部署的主要区别是什么?
我是 Kubernetes 的新手,并试图了解它们上的 Flink 部署有何不同。如果对内部结构有任何见解,那将有很大帮助。
我最近在 kubernetes 集群中将我的 Apache Flink 升级到 1.11 版,但今天我发现一个任务检查点总是失败。该任务从 RabbitMQ 读取数据并计算结果并调用 HTTP 请求将数据保存到 MySQL。这是任务管理器错误日志输出:
2020-08-21 15:53:28
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
这是 Apache Flink UI 错误消息:
Failure Message: Checkpoint expired before completing.
Run Code Online (Sandbox Code Playgroud)
作业总是在一段时间内重新启动。我有 2 个任务,另一个任务检查点保持成功。那么问题出在哪里,我应该怎么做才能解决这个问题?
这是我的 flink 配置:
public static void initEnv(StreamExecutionEnvironment env) {
env.setParallelism(1);
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StateBackend stateBackend = new …Run Code Online (Sandbox Code Playgroud)