标签: spark-streaming

kafka只消费新消息

我的 Spark Streaming 工作正在消耗来自 Kafka 的数据

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
                        prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);
Run Code Online (Sandbox Code Playgroud)

每当我重新启动我的工作时,它就会开始从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要花费大量时间,并且如果我更改消费者组,它会立即处理新消息)

我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 都会从我离开的位置发送数据。

我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能做到这一点?任何建议

apache-kafka apache-spark spark-streaming kafka-consumer-api

1
推荐指数
1
解决办法
3385
查看次数

使用Spark Streaming后无输出

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");

String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));


JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
    StringDecoder.class, kafkaParams, topicsSet)
    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
      @Override
      public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
        rdd.saveAsTextFile("output");
        return rdd;
      }
    }).map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> kv) {
        return kv._2();
      }
    });
stream1.print();
jssc.start();
jssc.awaitTermination();
Run Code Online (Sandbox Code Playgroud)

交叉检查主题“test4”中是否有有效数据。

在此输入图像描述

我期望从 kafka 集群流式传输的字符串在控制台中打印。控制台中没有异常,但也没有输出。我这里缺少什么吗?

apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1243
查看次数

如何仅在 Spark Streaming 中的分区内进行“减少”(也许使用combineByKey)?

我已经借助 Kafka 按键将数据排序到我的 Spark Streaming 分区中,即在一个节点上找到的键在任何其他节点上都找不到。

\n\n

我想使用 redis 及其incrby(增量)命令作为状态引擎,并减少发送到 redis 的请求数量,我想通过对每个工作节点本身进行字数统计来部分减少我的数据。(关键是标签+时间戳来从字数统计中获取我的功能)。\n我想避免洗牌并让 Redis 负责跨工作节点添加数据。

\n\n

即使我检查数据在工作节点之间干净地分割,.reduce(_ + _)(Scala 语法)也需要很长时间(映射任务需要几秒而不是亚秒),因为 HashPartitioner 似乎将我的数据洗牌到随机节点以添加它那里。

\n\n

如何使用 Spark Streaming 在每个分区器上编写一个简单的字数减少而不触发 Scala 中的洗牌步骤?

\n\n

注意 DStream 对象缺少一些 RDD 方法,这些方法只能通过该transform方法获得。

\n\n

看来我或许可以使用combineByKey。我想跳过这mergeCombiners()一步,而是将累积的元组保留在原处。\n《Learning Spark》一书神秘地说:

\n\n
\n

如果我们知道我们的数据不会从中受益,我们可以在combineByKey()中禁用地图端聚合。例如,groupByKey() 禁用映射端聚合,因为聚合函数(附加到列表)不会节省任何空间。如果我们想禁用映射端联合,我们需要指定分区器;现在,您可以通过传递 rdd.partitioner 在源 RDD 上使用分区器。

\n
\n\n

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

\n\n

然后,这本书仍然没有提供如何执行此操作的语法,到目前为止我也没有在谷歌上找到任何运气。

\n\n

更糟糕的是,据我所知,Spark Streaming 中没有为 DStream RDD 设置分区器,所以我不知道如何为ombineByKey 提供分区器,而不会最终打乱数据。

\n\n

另外,“地图端”实际上意味着什么以及mapSideCombine = false到底会产生什么后果?

\n\n

的 scala 实现combineByKey位于\n …

scala partitioning redis apache-spark spark-streaming

1
推荐指数
1
解决办法
4097
查看次数

java.lang.ClassCastException: [解析 json[String,String] 时无法将 B 转换为 java.lang.String

我又来了,我尝试使用用 scala -2.10.5 编写的 Spark Streaming_1.6.1 类从 kafka_0.9.0.0 主题读取数据。这是一个简单的程序,我在 sbt_0.13.12 中构建了它。当我运行该程序时,我收到此异常

(run-main-0) org.apache.spark.SparkException: 由于阶段失败,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机):java.lang. lang.ClassCastException:[B 无法在 org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [错误] 在 org.kafka.receiver.AvroCons 处转换为 java.lang.String [错误] $$anonfun$1.apply(AvroConsumer.scala:54) [错误] 位于 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [错误]
位于 org.apache.spark.util.Utils$。 getIteratorSize(Utils.scala:1597) [错误] 在 org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.rdd.RDD$ $anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark。 SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [错误] 在 org.apache.spark.scheduler。 Task.run(Task.scala:89) [错误] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [错误] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1145) [错误] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [错误] 在 java.lang.Thread.run(Thread.java:745) [错误] [错误]驱动程序堆栈跟踪:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 …

scala sbt spark-streaming kafka-consumer-api

1
推荐指数
1
解决办法
9408
查看次数

如何将 Spark 流输出转换为数据帧或存储在表中

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
Run Code Online (Sandbox Code Playgroud)

我的输出有 50 个不同的值,格式如下

{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮助我将这些数据存储在表格形式中

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  
Run Code Online (Sandbox Code Playgroud)

我会非常感激。

scala apache-spark spark-streaming apache-spark-sql

1
推荐指数
1
解决办法
5926
查看次数

Spark Streaming - 将 json 格式的消息 Dstream 到 DataFrame

我正在尝试通过 Apache Spark Streaming 读取 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,它工作正常,直到它执行spark.read.json读取 json 到数据帧。

package consumerTest


import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._

import scala.util.parsing.json.{JSON, JSONObject}

object Consumer {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder
      .master("local")
      .appName("my-spark-app")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate();

    import spark.implicits._


    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "<kafka-server>:9092",
      "key.deserializer" -> …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming kafka-consumer-api spark-dataframe

1
推荐指数
1
解决办法
6397
查看次数

如何推断 JSON 文件的模式?

我在 Java 中有以下字符串

{
    "header": {
        "gtfs_realtime_version": "1.0",
        "incrementality": 0,
        "timestamp": 1528460625,
        "user-data": "metra"
    },
    "entity": [{
            "id": "8424",
            "vehicle": {
                "trip": {
                    "trip_id": "UP-N_UN314_V1_D",
                    "route_id": "UP-N",
                    "start_time": "06:17:00",
                    "start_date": "20180608",
                    "schedule_relationship": 0
                },
                "vehicle": {
                    "id": "8424",
                    "label": "314"
                },
                "position": {
                    "latitude": 42.10085,
                    "longitude": -87.72896
                },
                "current_status": 2,
                "timestamp": 1528460601
            }
        }
    ]
}
Run Code Online (Sandbox Code Playgroud)

表示 JSON 文档。我想为流应用程序推断Spark Dataframe 中的模式。

如何将 String 的字段拆分为类似于 CSV 文档(我可以在其中调用.split(""))?

java json apache-spark spark-streaming

1
推荐指数
1
解决办法
9802
查看次数

如何将 AWS Kinesis 流用于多个不同的数据源

我们有一个传统的批处理应用程序,我们从多个来源(Oracle、Salesforce、FTP 文件、Web 日志等)获取数据。我们将传入数据存储在 S3 存储桶中,并在 EMR 上运行 Spark 来处理数据并在 S3 和 Redshift 上加载。

现在,我们正在考虑引入 AWS Kinesis,然后使用 EMR 中的 Spark Structured Streaming 来处理流数据并将其加载到 S3 和 Redshift,从而使该应用程序接近实时。鉴于我们有不同种类的数据,例如来自 Oracle 的 100 多个表、100 多个 salesforce 对象、来自 FTP 位置的 20 多个文件、Web 日志等。这里使用 AWS Kinesis 的最佳方法是什么。

1) 对每个源(Salesforce、Oracle、FTP)使用单独的流,然后对每个表/对象使用单独的分片(在流内) - 每个消费者从自己的分片中读取数据,该分片具有特定的表/文件 2) 使用每个表/对象都有单独的流 - 在这种情况下我们最终将拥有 500 多个流。3)对所有内容使用单个流 - 不确定消费者应用程序将如何在这种情况下读取数据。

amazon-emr amazon-kinesis spark-streaming

1
推荐指数
1
解决办法
2788
查看次数

java.lang.NoClassDefFoundError:org/apache/spark/internal/Logging

我的 Spark Streaming 程序收到以下错误: Exception in thread "main" java.lang.NoClassDefFoundError:org/apache/spark/internal/Logging 我的 Spark 版本是 2.1,与集群中运行的版本相同。

网上查到的资料提示我旧版本的org.apache.spark.Logging在新版本变成了org.apache.spark.internal.Logging,导致找不到jar包。但是我的pom中引入的依赖是新版本。为什么找不到jar包?

    <properties>
            <spark.version>2.1.0</spark.version>
            <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_${scala.version}</artifactId>
                    <version>${spark.version}</version>
            </dependency>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_${scala.version}</artifactId>
                    <version>${spark.version}</version>
            </dependency>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_${scala.version}</artifactId>
                    <version>${spark.version}</version>
            </dependency>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-hive_${scala.version}</artifactId>
                    <version>${spark.version}</version>
            </dependency>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                    <version>2.1.0</version>
            </dependency>
            <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                    <version>2.6.0</version>
            </dependency>
            <dependency>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
            </dependency>
            <dependency>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                    <version>1.2.17</version>
            </dependency>
    </dependencies>
    <build>
            <plugins>
                    <plugin>
                            <groupId>org.scala-tools</groupId>
                            <artifactId>maven-scala-plugin</artifactId>
                            <version>2.15.2</version>
                            <executions>
                                    <execution>
                                            <goals>
                                                    <goal>compile</goal>
                                                    <goal>testCompile</goal>
                                            </goals>
                                    </execution>
                            </executions>
                    </plugin>

                    <plugin>
                            <artifactId>maven-compiler-plugin</artifactId>
                            <version>3.6.0</version>
                            <configuration>
                                    <source>1.8</source> …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming

1
推荐指数
1
解决办法
6672
查看次数

在火花结构化流中动态更改 hdfs 写入路径

我有一个 Spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流媒体似乎不能那样工作。它只是创建一个应用程序启动日期的文件夹,即使日期发生变化,也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径?

下面是我的 writestream 的样子

 val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
 val outPath = "maindir/sb_topic/data/loaddate="

val dswWriteStream =dfresult.writeStream
    .outputMode(outputMode) 
    .format(writeformat) 
    .option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
    .option("checkpointLocation", checkpointdir) 
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime("10 minutes")) 
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
954
查看次数