我正在尝试对主题数据进行一些丰富.因此,从Kafka读取使用Spark结构化流媒体回到Kafka.
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("group.id", groupId)
.option("subscribe", "topicname")
.load()
val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
record._2, record._3)
val query = enriched.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("group.id", groupId)
.option("topic", "desttopic")
.start()
Run Code Online (Sandbox Code Playgroud)
但我得到一个例外:
Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Run Code Online (Sandbox Code Playgroud)
任何解决方法?
剧透:我更倾向于 Webpack 问题而不是 React(因为使用简单的 React 应用程序,直接形式create-react-app工作)。但是,它只发生在 React 中......
主要问题是一个简单的事件(例如onClick永远不会执行),这扩展到没有使用 js(手风琴、复杂的导航行为等)。所以以下不起作用:
<button onClick={() => alert("TEST YEAH!")}>Click Me!</button>
Run Code Online (Sandbox Code Playgroud)
使用Button诸如react-bootstrap或之类的库组件时也不会semantic-ui-react(在两者中都进行了测试)。
webpack 配置如下所示:
const path = require("path");
const HtmlWebPackPlugin = require("html-webpack-plugin");
module.exports = {
entry: path.resolve(__dirname, "./src/index.js"),
module: {
rules: [
{
test: /\.(js|jsx)$/,
use: ["babel-loader"]
},
{
test: /\.(scss|css)$/,
use: ["style-loader", "css-loader", "sass-loader"]
}
]
},
resolve: {
extensions: ["*", ".js", ".jsx"]
},
output: {
path: path.resolve(__dirname, "dist"),
filename: "./bundle.js"
},
plugins: [ …Run Code Online (Sandbox Code Playgroud) 我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。
我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:
它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?
干杯
offset apache-kafka apache-spark spark-streaming spark-structured-streaming