我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。
当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:
public class QueryExample {
public static Long throughputCounterAfter=new Long("0");
public static void main(String[] args) {
int k_partitions = 10;
reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(5 * 32);
Properties props = new Properties();
props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
// not to be shared with another job consuming the same topic
props.setProperty("group.id", "flink-group");
props.setProperty("enable.auto.commit","false");
FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
new SimpleStringSchema(),
props);
DataStream<String> purchasesStream = env
.addSource(purchasesConsumer)
.setParallelism(Math.min(5 * 32, k_partitions));
DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …
Run Code Online (Sandbox Code Playgroud) 如何在 Flink 中跳过损坏的消息
我有 DAG:KafkaSrcConsumer > FlatMap > Window > SinkFunction
现在,如果我在操作员“KafkaSrcConsumer”中收到来自 Kafka 的corruptedMessage,我想抛出/跳过该消息,并且我不想将该损坏的消息转发给下一个操作员“FlatMap”
我们如何在 Apache Flink 中实现这一点?
(注意:从 KafkaSrcConsumer 抛出异常将重新启动 flink 作业,我想避免这种情况,因为我只想跳过消息并移至下一条消息)
假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:
KeyedStream<...> streamA = env.addSource(...).keyBy(...);
KeyedStream<...> streamB = env.addSource(...).keyBy(...);
streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
Run Code Online (Sandbox Code Playgroud)
一个任务管理器将使用来自 Kafka 主题的数据,另一个任务管理器将使用来自另一个 Kafka 主题的数据。
我将作业发送给作业管理器来执行它。Flink 分配两个任务管理器来处理 flatMap(因为任务管理器只有一个任务槽)。
flatMap 在事件之间进行简单的连接(使用两个键控状态):
public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
private ValueState<A> AState;
private ValueState<B> BState;
@Override
public void open(Configuration config) {
AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
}
@Override
public void flatMap1(A event, Collector<String> out) throws Exception {
B secondEvent = BState.value();
if (secondEvent == null)
AState.update(event);
else {
out.collect(...); …
Run Code Online (Sandbox Code Playgroud) 我正在尝试在我的 Flink 作业中使用事件时间,并BoundedOutOfOrdernessTimestampExtractor
用于提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得根本没有调用getResult
in AggregateFunction
。我可以看到数据正在add
发挥作用。
我已经设置getEnv().getConfig().setAutoWatermarkInterval(1000L);
我试过了
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
.allowedLateness(WINDOW_LATENESS)
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)
还有会话窗口
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)
所有水印指标都显示No Watermark
如何让 Flink 忽略无水印的东西?
早些时候我询问了Flink 的一个简单的hello world 示例。这给了我一些很好的例子!
\n不过,我想询问更多 \xe2\x80\x98streaming\xe2\x80\x99 示例,其中我们每秒生成一个输入值。理想情况下,这应该是随机的,但即使每次都是相同的值也可以。
\n目标是获得一个在没有/最少外部接触的情况下 \xe2\x80\x98moves\xe2\x80\x99 的流。
\n因此我的问题是:
\n我找到了如何通过在外部生成数据并写入 Kafka 或收听公共源来展示这一点,但是我试图以最小的依赖来解决它(就像从 Nifi 中的GenerateFlowFile 开始)。
\n每个人。
请帮我。
我编写了 apache flink streraming 作业,它从 apache kafka 读取 json 消息(几秒钟内 500-1000 条消息),在 POJO 中反序列化它们并执行一些操作(filter-keyby-process-sink)。我使用具有 ExactlyOnce 语义的 RocksDB 状态后端。但我不明白我需要设置哪个检查点间隔?
有些论坛的人写的时间大多是 1000 或 5000 毫秒。我尝试将间隔设置为10ms、100ms、500ms、1000ms、5000ms。我没有注意到任何差异。
我使用的是flink 1.3,我定义了两个流源,它们将发出相同的事件以供后续运算符处理(我定义的进程运算符和接收器运算符)
但看起来在 source-process-pink 管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器
object FlinkApplication {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.addSource(new MySource1()) //How to MySource2 here?
.setParallelism(1)
.name("source1")
.process(new MyProcess())
.setParallelism(4)
.addSink(new MySink())
.setParallelism(2)
env.execute("FlinkApplication")
}
}
Run Code Online (Sandbox Code Playgroud) 假设我有一个包含 3 个节点的 Flink 集群。一个节点用于作业管理器,另外 2 个节点用于任务管理器。每个任务管理器有 3 个任务槽。因此,当我提交并行度等于 2 的作业时,Flink 将分配两个任务槽。那么,我的问题是,Flink 将如何分配这些任务槽?
一些场景
Flink 是否为每个任务管理器分配一个任务槽?
两个任务槽是否有可能从同一个任务管理器分配?如果是,如果该特定节点由于某种原因关闭,我的作业将无法运行。在这种情况下如何避免停机?
我想在父类中将函数名称作为参数,以便子类可以设置它。此变量将用于父类的方法之一。
abstract class Parent[T: TypeInformation] {
val myfun: T => Unit
// A different method uses myfun
}
class Child1 extends Parent[User] {
val service = new Service()
val myfun: User => Unit = service.callme
}
class Service {
def callme(user: User) => Unit = {
println("We are here for user")
}
}
Run Code Online (Sandbox Code Playgroud)
我是 Scala 的新手,但这看起来不错。虽然编译器没有抱怨,但我收到运行时异常并且工作无法启动:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at …
Run Code Online (Sandbox Code Playgroud) Doc说这个模式会出现bug,但是没有告诉我这个模式的使用规则,什么情况下会出现bug?假设我有一份工作,
这种情况下,开启‘对象复用模式’可以吗?
apache-flink ×10
java ×2
backpressure ×1
flink-cep ×1
flink-sql ×1
join ×1
redis ×1
scala ×1
state ×1