小编bug*_*ggy的帖子

如何将流数据集写入Kafka?

我正在尝试对主题数据进行一些丰富.因此,从Kafka读取使用Spark结构化流媒体回到Kafka.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()
Run Code Online (Sandbox Code Playgroud)

但我得到一个例外:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Run Code Online (Sandbox Code Playgroud)

任何解决方法?

apache-kafka apache-spark spark-structured-streaming

5
推荐指数
1
解决办法
2513
查看次数

js 事件未在 webpack 构建的 React 应用程序中触发

剧透:我更倾向于 Webpack 问题而不是 React(因为使用简单的 React 应用程序,直接形式create-react-app工作)。但是,它只发生在 React 中......

主要问题是一个简单的事件(例如onClick永远不会执行),这扩展到没有使用 js(手风琴、复杂的导航行为等)。所以以下不起作用:

<button onClick={() => alert("TEST YEAH!")}>Click Me!</button>
Run Code Online (Sandbox Code Playgroud)

使用Button诸如react-bootstrap或之类的库组件时也不会semantic-ui-react(在两者中都进行了测试)。

webpack 配置如下所示:

const path = require("path");
const HtmlWebPackPlugin = require("html-webpack-plugin");

module.exports = {
  entry: path.resolve(__dirname, "./src/index.js"),
  module: {
    rules: [
      {
        test: /\.(js|jsx)$/,
        use: ["babel-loader"]
      },
      {
        test: /\.(scss|css)$/,
        use: ["style-loader", "css-loader", "sass-loader"]
      }
    ]
  },
  resolve: {
    extensions: ["*", ".js", ".jsx"]
  },
  output: {
    path: path.resolve(__dirname, "dist"),
    filename: "./bundle.js"
  },
  plugins: [ …
Run Code Online (Sandbox Code Playgroud)

reactjs webpack webpack-5

4
推荐指数
1
解决办法
502
查看次数

结构化流式 Kafka 源偏移存储

我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?

干杯

offset apache-kafka apache-spark spark-streaming spark-structured-streaming

0
推荐指数
1
解决办法
2527
查看次数