我尝试使用 flink 的表和 sql api 作为一个简单的示例,我从文件中读取字符串,将其转换为 Tuple2 并尝试将其插入表中。这是我的代码。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.table.Table;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class table_streaming_test
{
public static void main (String[] args) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create execution environment
StreamTableEnvironment tEnv= StreamTableEnvironment.getTableEnvironment(env);
env.setParallelism(1);
DataStream<String> datastream_in= env.readTextFile("file:/home/rishikesh/new_workspace1/table_streaming/stocks.txt");
DataStream<Tuple2<String,Integer>> ds= datastream_in
.flatMap(new Splitter()); // transformation flatmap
Table msg=tEnv.fromDataStream(ds).as("symbol,price");
Table result = msg.select("symbol ='A'");
DataStream<String> ds2 =tEnv.toDataStream(result, String.class);
ds2.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, …Run Code Online (Sandbox Code Playgroud) 我正在评估 Flink,专门针对可能生成警报的流窗口支持。我担心的是内存使用情况,因此如果有人可以提供帮助,我们将不胜感激。
例如,该应用程序可能会在给定的滚动窗口(例如 5 分钟)内消耗来自流的大量数据。在评估时,如果有一百万个文档符合标准,它们是否都会被加载到内存中?
一般流程是:
producer -> kafka -> flinkkafkaconsumer -> table.window(Tumble.over("5.minutes").select("...").where("...").writeToSink(someKafkaSink)
此外,如果有一些明确的文档描述了在这些情况下如何处理内存,我可能会忽略有人可能会有所帮助。
谢谢
我正在 Flink 项目上工作,想将源 JSON 字符串数据解析为 Json 对象。我正在使用jackson-module-scala进行 JSON 解析。但是,我在 Flink API 中使用 JSON 解析器时遇到了一些问题(map例如)。
以下是代码的一些示例,我无法理解其行为背后的原因。
在这种情况下,我正在做jackson-module-scala 的官方 exmaple 代码告诉我要做的事情:
ObjectMapperDefaultScalaModule
DefaultScalaModule是一个 Scala 对象,包含对所有当前支持的 Scala 数据类型的支持。
readValue以将 JSON 解析为Map我得到的错误是:org.apache.flink.api.common.InvalidProgramException:Task not serializable。
object JsonProcessing {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("xxx")
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val …Run Code Online (Sandbox Code Playgroud) 我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。
该流通过 4 个线程并行化。
如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:
1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)
问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?
当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:
45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)
这是该程序的代码片段:
imports...
public class StreamingCase {
public static void main(String[] args) throws Exception {
int parallelism = 4;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(parallelism);
DataStream<Cube> start = env
.addSource(new …Run Code Online (Sandbox Code Playgroud) Flink Web UI 有一个出色的背压部分。但我看不到 Prometheus 记者给出的任何指标,这些指标可用于以与 Grafana 仪表板相同的方式检测背压。
有没有办法在 Flink Web UI 之外获取相同的指标?使用此处描述的指标https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html。或者甚至有一个 prometheus scraper 来抓取 web api?
我正在寻找有关延迟监控的帮助(flink 1.8.0)。
假设我有一个简单的流数据流,具有以下运算符:FlinkKafkaConsumer -> Map -> print。
如果我想测量数据流中记录处理的延迟,最好的机会是什么?我想获取处理源中接收到的输入的持续时间,直到接收器/完成接收器操作接收到输入为止。
我添加了我的代码: env.getConfig().setLatencyTrackingInterval(100);
然后,可以使用以下延迟指标:
但我不明白他们到底在测量什么?此外,据我所知,延迟平均值似乎与延迟无关。
我还尝试使用 codahale 指标来获取某些方法的持续时间,但这并不能帮助我获取在整个管道中处理的记录的延迟。
该解决方案与 LatencyMarker 相关吗?如果是,我如何在接收器操作中到达它以检索它?
谢谢,罗伊。
我正在尝试实现一个类,该类使用户能够操作 N 个输入流,而不受输入流类型的限制。
首先,我想将所有输入数据流转换为 keyedStreams。因此,我将输入数据流映射到元组中,然后应用 KeyBy 将其转换为密钥流。
我总是遇到序列化问题,我尝试遵循本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html但它不起作用。
我想知道的是:
非常感谢。
主要类别:
public class CEP {
private Integer streamsIdComp = 0;
final private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();
public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){
Preconditions.checkNotNull(inputStream, "dataStream");
TypeInformation<T> streamType = inputStream.getType();
KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
map(new MapFunction<T, Tuple2<Integer,T>>() {
@Override
public Tuple2<Integer, T> map(T value) throws Exception {
return Tuple2.of(streamsIdComp, value);
}
}). …Run Code Online (Sandbox Code Playgroud) 根据我对Flink的理解,它引入了基于键(键组)的并行性。然而,假设一个人有大量未加密的流并且希望并行完成工作,那么实现这一目标的最佳方法是什么?
如果流有一些字段,人们可能会考虑任意地使用其中一个字段进行键控,但这并不能保证工作负载能够正确平衡。例如,因为该字段中的一个值可能出现在 90% 的消息中。因此我的问题是:
我能想到的一个可能的解决方案是为每条消息分配一个随机数(如果您希望并行度为 3,则为 1-3;如果您希望并行度更灵活,则为 1-1000)。然而,我想知道这是否是推荐的方法,因为它感觉不太优雅。
假设我在键控过程后得到一个流。
DataStream<T> stream= sourceStream.keyBy(key).window(window).apply(function);
Run Code Online (Sandbox Code Playgroud)
生成的流是否仍被键入?我可以在该流中使用某些 Keyed 状态吗?
apache-flink ×10
flink-streaming ×10
java ×3
grafana ×1
jackson ×1
latency ×1
metrics ×1
monitoring ×1
prometheus ×1
scala ×1