我的 Spark Streaming 工作正在消耗来自 Kafka 的数据
KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);
Run Code Online (Sandbox Code Playgroud)
每当我重新启动我的工作时,它就会开始从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要花费大量时间,并且如果我更改消费者组,它会立即处理新消息)
我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 都会从我离开的位置发送数据。
我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能做到这一点?任何建议
apache-kafka apache-spark spark-streaming kafka-consumer-api
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
rdd.saveAsTextFile("output");
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> kv) {
return kv._2();
}
});
stream1.print();
jssc.start();
jssc.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
交叉检查主题“test4”中是否有有效数据。
我期望从 kafka 集群流式传输的字符串在控制台中打印。控制台中没有异常,但也没有输出。我这里缺少什么吗?
我已经借助 Kafka 按键将数据排序到我的 Spark Streaming 分区中,即在一个节点上找到的键在任何其他节点上都找不到。
\n\n我想使用 redis 及其incrby(增量)命令作为状态引擎,并减少发送到 redis 的请求数量,我想通过对每个工作节点本身进行字数统计来部分减少我的数据。(关键是标签+时间戳来从字数统计中获取我的功能)。\n我想避免洗牌并让 Redis 负责跨工作节点添加数据。
即使我检查数据在工作节点之间干净地分割,.reduce(_ + _)(Scala 语法)也需要很长时间(映射任务需要几秒而不是亚秒),因为 HashPartitioner 似乎将我的数据洗牌到随机节点以添加它那里。
如何使用 Spark Streaming 在每个分区器上编写一个简单的字数减少而不触发 Scala 中的洗牌步骤?
\n\n注意 DStream 对象缺少一些 RDD 方法,这些方法只能通过该transform方法获得。
看来我或许可以使用combineByKey。我想跳过这mergeCombiners()一步,而是将累积的元组保留在原处。\n《Learning Spark》一书神秘地说:
\n\n\n如果我们知道我们的数据不会从中受益,我们可以在combineByKey()中禁用地图端聚合。例如,groupByKey() 禁用映射端聚合,因为聚合函数(附加到列表)不会节省任何空间。如果我们想禁用映射端联合,我们需要指定分区器;现在,您可以通过传递 rdd.partitioner 在源 RDD 上使用分区器。
\n
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
\n\n然后,这本书仍然没有提供如何执行此操作的语法,到目前为止我也没有在谷歌上找到任何运气。
\n\n更糟糕的是,据我所知,Spark Streaming 中没有为 DStream RDD 设置分区器,所以我不知道如何为ombineByKey 提供分区器,而不会最终打乱数据。
\n\n另外,“地图端”实际上意味着什么以及mapSideCombine = false到底会产生什么后果?
的 scala 实现combineByKey位于\n …
我又来了,我尝试使用用 scala -2.10.5 编写的 Spark Streaming_1.6.1 类从 kafka_0.9.0.0 主题读取数据。这是一个简单的程序,我在 sbt_0.13.12 中构建了它。当我运行该程序时,我收到此异常
(run-main-0) org.apache.spark.SparkException: 由于阶段失败,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机):java.lang. lang.ClassCastException:[B 无法在 org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [错误] 在 org.kafka.receiver.AvroCons 处转换为 java.lang.String [错误] $$anonfun$1.apply(AvroConsumer.scala:54) [错误] 位于 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [错误]
位于 org.apache.spark.util.Utils$。 getIteratorSize(Utils.scala:1597) [错误] 在 org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.rdd.RDD$ $anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark。 SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [错误] 在 org.apache.spark.scheduler。 Task.run(Task.scala:89) [错误] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [错误] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1145) [错误] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [错误] 在 java.lang.Thread.run(Thread.java:745) [错误] [错误]驱动程序堆栈跟踪:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 …
我的代码是:
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
Run Code Online (Sandbox Code Playgroud)
我的输出有 50 个不同的值,格式如下
{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮助我将这些数据存储在表格形式中
| id |date |temp|press|
|st01|26-02-2018 20:30:40| 30 |20 |
|st01|26-02-2018 20:30:45| 80 |70 |
Run Code Online (Sandbox Code Playgroud)
我会非常感激。
我正在尝试通过 Apache Spark Streaming 读取 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,它工作正常,直到它执行spark.read.json读取 json 到数据帧。
package consumerTest
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import scala.util.parsing.json.{JSON, JSONObject}
object Consumer {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "<kafka-server>:9092",
"key.deserializer" -> …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming kafka-consumer-api spark-dataframe
我在 Java 中有以下字符串
{
"header": {
"gtfs_realtime_version": "1.0",
"incrementality": 0,
"timestamp": 1528460625,
"user-data": "metra"
},
"entity": [{
"id": "8424",
"vehicle": {
"trip": {
"trip_id": "UP-N_UN314_V1_D",
"route_id": "UP-N",
"start_time": "06:17:00",
"start_date": "20180608",
"schedule_relationship": 0
},
"vehicle": {
"id": "8424",
"label": "314"
},
"position": {
"latitude": 42.10085,
"longitude": -87.72896
},
"current_status": 2,
"timestamp": 1528460601
}
}
]
}
Run Code Online (Sandbox Code Playgroud)
表示 JSON 文档。我想为流应用程序推断Spark Dataframe 中的模式。
如何将 String 的字段拆分为类似于 CSV 文档(我可以在其中调用.split(""))?
我们有一个传统的批处理应用程序,我们从多个来源(Oracle、Salesforce、FTP 文件、Web 日志等)获取数据。我们将传入数据存储在 S3 存储桶中,并在 EMR 上运行 Spark 来处理数据并在 S3 和 Redshift 上加载。
现在,我们正在考虑引入 AWS Kinesis,然后使用 EMR 中的 Spark Structured Streaming 来处理流数据并将其加载到 S3 和 Redshift,从而使该应用程序接近实时。鉴于我们有不同种类的数据,例如来自 Oracle 的 100 多个表、100 多个 salesforce 对象、来自 FTP 位置的 20 多个文件、Web 日志等。这里使用 AWS Kinesis 的最佳方法是什么。
1) 对每个源(Salesforce、Oracle、FTP)使用单独的流,然后对每个表/对象使用单独的分片(在流内) - 每个消费者从自己的分片中读取数据,该分片具有特定的表/文件 2) 使用每个表/对象都有单独的流 - 在这种情况下我们最终将拥有 500 多个流。3)对所有内容使用单个流 - 不确定消费者应用程序将如何在这种情况下读取数据。
我的 Spark Streaming 程序收到以下错误: Exception in thread "main" java.lang.NoClassDefFoundError:org/apache/spark/internal/Logging 我的 Spark 版本是 2.1,与集群中运行的版本相同。
网上查到的资料提示我旧版本的org.apache.spark.Logging在新版本变成了org.apache.spark.internal.Logging,导致找不到jar包。但是我的pom中引入的依赖是新版本。为什么找不到jar包?
<properties>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source> …Run Code Online (Sandbox Code Playgroud) 我有一个 Spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流媒体似乎不能那样工作。它只是创建一个应用程序启动日期的文件夹,即使日期发生变化,也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径?
下面是我的 writestream 的样子
val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outPath = "maindir/sb_topic/data/loaddate="
val dswWriteStream =dfresult.writeStream
.outputMode(outputMode)
.format(writeformat)
.option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
.option("checkpointLocation", checkpointdir)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("10 minutes"))
Run Code Online (Sandbox Code Playgroud) spark-streaming ×10
apache-spark ×8
scala ×4
apache-kafka ×2
amazon-emr ×1
java ×1
json ×1
partitioning ×1
redis ×1
sbt ×1