我正在 Apache flink sql api 中构建管道。该管道执行简单的投影查询。但是,我需要在查询之前编写一次元组(确切地说是每个元组中的一些元素),在查询之后编写一次。事实证明,我用来写入 Redis 的代码严重降低了性能。即flink在很小的数据速率下做出反压。我的代码有什么问题以及如何改进。有什么建议请。
当我停止写入redis之前和之后,性能都非常好。这是我的管道代码:
public class QueryExample {
public static Long throughputCounterAfter=new Long("0");
public static void main(String[] args) {
int k_partitions = 10;
reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(5 * 32);
Properties props = new Properties();
props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
// not to be shared with another job consuming the same topic
props.setProperty("group.id", "flink-group");
props.setProperty("enable.auto.commit","false");
FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
new SimpleStringSchema(),
props);
DataStream<String> purchasesStream = env
.addSource(purchasesConsumer)
.setParallelism(Math.min(5 * 32, k_partitions));
DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks …Run Code Online (Sandbox Code Playgroud) 我正在尝试在我的 Flink 作业中使用事件时间,并BoundedOutOfOrdernessTimestampExtractor用于提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得根本没有调用getResultin AggregateFunction。我可以看到数据正在add发挥作用。
我已经设置getEnv().getConfig().setAutoWatermarkInterval(1000L);
我试过了
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
.allowedLateness(WINDOW_LATENESS)
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)
还有会话窗口
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)
所有水印指标都显示No Watermark
如何让 Flink 忽略无水印的东西?
早些时候我询问了Flink 的一个简单的hello world 示例。这给了我一些很好的例子!
\n不过,我想询问更多 \xe2\x80\x98streaming\xe2\x80\x99 示例,其中我们每秒生成一个输入值。理想情况下,这应该是随机的,但即使每次都是相同的值也可以。
\n目标是获得一个在没有/最少外部接触的情况下 \xe2\x80\x98moves\xe2\x80\x99 的流。
\n因此我的问题是:
\n我找到了如何通过在外部生成数据并写入 Kafka 或收听公共源来展示这一点,但是我试图以最小的依赖来解决它(就像从 Nifi 中的GenerateFlowFile 开始)。
\n每个人。
请帮我。
我编写了 apache flink streraming 作业,它从 apache kafka 读取 json 消息(几秒钟内 500-1000 条消息),在 POJO 中反序列化它们并执行一些操作(filter-keyby-process-sink)。我使用具有 ExactlyOnce 语义的 RocksDB 状态后端。但我不明白我需要设置哪个检查点间隔?
有些论坛的人写的时间大多是 1000 或 5000 毫秒。我尝试将间隔设置为10ms、100ms、500ms、1000ms、5000ms。我没有注意到任何差异。
我想在父类中将函数名称作为参数,以便子类可以设置它。此变量将用于父类的方法之一。
abstract class Parent[T: TypeInformation] {
val myfun: T => Unit
// A different method uses myfun
}
class Child1 extends Parent[User] {
val service = new Service()
val myfun: User => Unit = service.callme
}
class Service {
def callme(user: User) => Unit = {
println("We are here for user")
}
}
Run Code Online (Sandbox Code Playgroud)
我是 Scala 的新手,但这看起来不错。虽然编译器没有抱怨,但我收到运行时异常并且工作无法启动:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at …Run Code Online (Sandbox Code Playgroud)