我正在评估 Flink,专门针对可能生成警报的流窗口支持。我担心的是内存使用情况,因此如果有人可以提供帮助,我们将不胜感激。
例如,该应用程序可能会在给定的滚动窗口(例如 5 分钟)内消耗来自流的大量数据。在评估时,如果有一百万个文档符合标准,它们是否都会被加载到内存中?
一般流程是:
producer -> kafka -> flinkkafkaconsumer -> table.window(Tumble.over("5.minutes").select("...").where("...").writeToSink(someKafkaSink)
此外,如果有一些明确的文档描述了在这些情况下如何处理内存,我可能会忽略有人可能会有所帮助。
谢谢
我正在 Flink 项目上工作,想将源 JSON 字符串数据解析为 Json 对象。我正在使用jackson-module-scala进行 JSON 解析。但是,我在 Flink API 中使用 JSON 解析器时遇到了一些问题(map例如)。
以下是代码的一些示例,我无法理解其行为背后的原因。
在这种情况下,我正在做jackson-module-scala 的官方 exmaple 代码告诉我要做的事情:
ObjectMapperDefaultScalaModule
DefaultScalaModule是一个 Scala 对象,包含对所有当前支持的 Scala 数据类型的支持。
readValue以将 JSON 解析为Map我得到的错误是:org.apache.flink.api.common.InvalidProgramException:Task not serializable。
object JsonProcessing {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("xxx")
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val …Run Code Online (Sandbox Code Playgroud) 我在 Flink 中有一个流,它从源发送多维数据集,对多维数据集进行转换(为多维数据集中的每个元素添加 1),然后最后将其发送到下游以打印每秒的吞吐量。
该流通过 4 个线程并行化。
如果我理解正确的话,该windowAll运算符是一个非并行转换,因此应该将并行度缩小到 1,并通过将其与 一起使用TumblingProcessingTimeWindows.of(Time.seconds(1)),对最近一秒内所有并行子任务的吞吐量求和并打印它。我不确定是否得到正确的输出,因为每秒的吞吐量打印如下:
1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...
Run Code Online (Sandbox Code Playgroud)
问题:流打印机是否打印每个线程(1、2、3 和 4)的吞吐量,还是仅选择线程 3 来打印所有子任务的吞吐量总和?
当我一开始将环境的并行度设置为 1 时env.setParallelism(1),我在吞吐量之前没有得到“x>”,但我似乎获得了与设置为 4 时相同(甚至更好)的吞吐量。这:
45
429
499
505
1
503
524
530
...
Run Code Online (Sandbox Code Playgroud)
这是该程序的代码片段:
imports...
public class StreamingCase {
public static void main(String[] args) throws Exception {
int parallelism = 4;
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(parallelism);
DataStream<Cube> start = env
.addSource(new …Run Code Online (Sandbox Code Playgroud) 我们正在使用 Apache Flink(1.4.2) 进行批处理,出于性能原因,我们希望在输出之前对 100 个项目进行分组,而不是直接输出每个项目。
如果我们要使用 DataStream API,我们将能够使用诸如翻滚窗口之类的东西(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html #翻滚窗口)
但这在进行批处理时不可用。
是否可以使用 DataSet Api 或许通过某些 group/reduce 函数来做到这一点?
我的问题可能会引起一些混乱,因此请先查看说明。确定我的问题可能会有所帮助。我稍后将在问题末尾添加我的代码(也欢迎有关我的代码结构/实现的任何建议)。\n感谢您提前提供的任何帮助!
\n\n我的问题:
\n\n如何在 Flink Batch 处理中定义多个接收器而不让它重复从一个源获取数据?
createCollectionEnvironment()和 和有什么区别getExecutionEnvironment()?我应该在本地环境中使用哪一个?
有什么用env.execute()?我的代码将输出没有这句话的结果。如果我添加这句话,它会弹出一个异常:
-
\n\nException in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to \'execute()\', \'count()\', \'collect()\', or \'print()\'. \n at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) \n at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) \n at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) \n at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) \n at MainClass.main(MainClass.java:114)\nRun Code Online (Sandbox Code Playgroud)\n\n描述: \n编程新手。最近我需要使用 Flink Batch 处理一些数据(对数据进行分组、计算标准差等)。\n但是我遇到了需要输出两个 DataSet 的情况。\n结构是这样的
\n\n\n从 …
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的罐子的路径,但无法。Flink 是否将这些 jar 文件保留在服务器上?如果有的话,我在哪里可以找到?有什么文档吗?请帮忙。谢谢!
我使用官方 flink 存储库中的以下 docker-compose.yml 启动了 flink。我只添加了到外部hadoop网络的连接。
version: "2.1"
networks:
hadoop:
external:
name: flink_hadoop
services:
jobmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-jobmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-taskmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)
此后一切运行,我可以访问 WebUI。
然后我打包了以下工作。
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie
object HdfsJob {
private lazy val logger = LoggerFactory.getLogger(getClass)
def …Run Code Online (Sandbox Code Playgroud) Flink Web UI 有一个出色的背压部分。但我看不到 Prometheus 记者给出的任何指标,这些指标可用于以与 Grafana 仪表板相同的方式检测背压。
有没有办法在 Flink Web UI 之外获取相同的指标?使用此处描述的指标https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html。或者甚至有一个 prometheus scraper 来抓取 web api?
我正在寻找有关延迟监控的帮助(flink 1.8.0)。
假设我有一个简单的流数据流,具有以下运算符:FlinkKafkaConsumer -> Map -> print。
如果我想测量数据流中记录处理的延迟,最好的机会是什么?我想获取处理源中接收到的输入的持续时间,直到接收器/完成接收器操作接收到输入为止。
我添加了我的代码: env.getConfig().setLatencyTrackingInterval(100);
然后,可以使用以下延迟指标:
但我不明白他们到底在测量什么?此外,据我所知,延迟平均值似乎与延迟无关。
我还尝试使用 codahale 指标来获取某些方法的持续时间,但这并不能帮助我获取在整个管道中处理的记录的延迟。
该解决方案与 LatencyMarker 相关吗?如果是,我如何在接收器操作中到达它以检索它?
谢谢,罗伊。
根据我的理解,Flink 中运算符的示例有 Source 运算符、Transformation 运算符等。我对 Flink 中运算符的理解是否正确?
在算子状态中,Flink 是维护每个算子的状态(例如每个作业/任务的 map()、reduce() 等)还是维护一个完整作业/任务的状态?另外,如果我的作业以多个并行方式提交,每个槽是否都有自己的状态?
假设,我提交了两个带键流的作业,并且两个作业具有相同的键“颜色”,但两个作业完全不同。Flink 是否会维护两种不同的状态,或者为这两项工作维护一种状态。
apache-flink ×10
java ×2
scala ×2
docker ×1
grafana ×1
jackson ×1
latency ×1
logging ×1
metrics ×1
monitoring ×1
prometheus ×1