标签: apache-spark-2.0

使用Spark 2.0.2读取来自Kafka的Avro消息(结构化流式传输)

我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.

结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.

在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.

在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.

我查看了这个示例,例如我在Kafka中的Avro对象是退出复杂的,不能简单地像示例中的String一样进行转换.

到目前为止,我尝试了这种代码(我在这里看到了另一个问题):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()
Run Code Online (Sandbox Code Playgroud)

我得到"数据类型不匹配:无法将BinaryType转换为StructType(StructField(...."

我怎样才能反序化值?

scala avro apache-kafka spark-streaming apache-spark-2.0

8
推荐指数
1
解决办法
7297
查看次数

Livy Server:将数据帧作为JSON返回?

我正在使用HTTP POST调用在Livy Server中执行语句localhost:8998/sessions/0/statements,具有以下正文

{
  "code": "spark.sql(\"select * from test_table limit 10\")"
}
Run Code Online (Sandbox Code Playgroud)

我想以下列格式给出答案

(...)
"data": {
  "application/json": "[
    {"id": "123", "init_date": 1481649345, ...},
    {"id": "133", "init_date": 1481649333, ...},
    {"id": "155", "init_date": 1481642153, ...},
  ]"
}
(...)
Run Code Online (Sandbox Code Playgroud)

但我得到的是

(...)
"data": {
  "text/plain": "res0: org.apache.spark.sql.DataFrame = [id: string, init_date: timestamp ... 64 more fields]"
}
(...)
Run Code Online (Sandbox Code Playgroud)

哪个是toString()数据帧的版本.

有没有办法使用Livy Server将数据帧作为JSON返回?

编辑

找到解决问题的JIRA问题:https://issues.cloudera.org/browse/LIVY-72

根据评论,可以说Livy不会也不会支持这样的功能?

json cloudera apache-spark apache-spark-2.0 livy

8
推荐指数
1
解决办法
1741
查看次数

优化火花低挂水果,特别是催化剂优化和火花配置

我正在使用Spark 2.1.1,我正在使用Scala API,尽管语言不太重要.我有兴趣以有效的方式优化火花查询/管道.我已经阅读了很多材料(包括伟大的"学习星火"书,我对Spark网站,Jacek Laskowski的博客以及其他人非常熟悉,而且我已经和Spark一起工作了将近两年.

但是,有太多的信息和概念需要注意,而且我没有做足够的优化来了解它们.不幸的是,一旦一切工作100%,可能只需要几天甚至几小时才能交付代码.我需要优先考虑我可以应用的修复程序.我之前已经优化了工作火花代码,但我正在寻找最好的整体策略以及尝试熟悉最好的低挂水果.总有一天,我会记住所有要调整的旋钮,但至少现在有十个非常好的旋钮.我目前认为重要的一些事情是(不是按顺序排列,但前4个恰好是我认为最重要的)...

  1. 开发 - 通过重新分区数据集或从一个分区的配置单元表中检索来减少随机(交换).
  2. 策略 - 查看Spark UI以查看哪个作业和阶段占用时间最长,并且仔细观察该作业和阶段.
  3. 开发 - 尽可能在连接之前过滤数据集,以避免创建高基数"多对多"连接,并避免在连接期间发送更多数据.
  4. 配置 - 正确执行器和内存
  5. 开发 - 尽可能远离笛卡尔积和theta-join.
  6. 开发 - 如果可能,在创建UDF之前使用spark库函数.
  7. 开发 - 如果表足够小,请尝试强制进行广播散列连接.
  8. 策略 - 除非有特定原因(这意味着我从不使用RDD API),否则切勿使用RDD API代替数据集/数据帧.
  9. 开发 - 构建数据集过滤器,以便下推谓词可以与它们一起使用(制作更多,更简单的过滤器而不是多条件过滤器).
  10. 策略与开发 - 始终保持Spark源代码打开,以便更容易找到类型声明和其他代码实现.
  11. 我想念的东西......

对我来说最有趣的增强功能是那些通过查看查询计划或DAG可视化而显而易见的增强功能.此外,使火花用户/开发人员走向"啊哈!"的老生常谈 您可能愿意分享.免责声明:以上十件事对我来说并不完全是"前十名",比如使用火花库函数代替UDF并不是非常重要(当然不是至少十大),但我想帮助给出一个好的例子.提示可能看起来像某人.

scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0

8
推荐指数
0
解决办法
235
查看次数

Scala案例类忽略了Spark shell中的导入

我希望这个问题有一个明显的答案!

我刚刚升级到Spark v2.0并且遇到了spark-shell(Scala 2.11版本)的奇怪问题.

如果我输入以下最小的Scala,

import java.sql.Timestamp

case class Crime(caseNumber: String, date: Timestamp, description: String, detail: String, arrest: Boolean)

我收到以下错误,

<console>:11: error: not found: type Timestamp

如果我在其他地方使用Java Timestamp类,例如在函数中,则不会生成错误(正如您所期望的那样导致导入).

如果我完全符合条件并在case类中使用java.sql.Timestamp它就可以了!

我错过了一些明显的东西吗

scala apache-spark apache-spark-2.0

7
推荐指数
1
解决办法
4690
查看次数

使用Java在Spark 2.0中使用数据集的GroupByKey

我有一个包含如下数据的数据集:

|c1| c2|
---------
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | a |
| 2 | b |
Run Code Online (Sandbox Code Playgroud)

...

现在,我希望将数据分组如下(col1:String Key,col2:List):

| c1| c2 |
-----------
| 1 |a,b,c|
| 2 | a, b|
...
Run Code Online (Sandbox Code Playgroud)

我认为使用goupByKey是一个足够的解决方案,但我找不到任何例子,如何使用它.

任何人都可以帮我找到使用groupByKey或使用任何其他转换和动作组合的解决方案来通过使用数据集获得此输出,而不是RDD?

java group-by dataset apache-spark apache-spark-2.0

7
推荐指数
1
解决办法
5586
查看次数

Spark 2.0:从RDD迁移到Dataset

我想调整我的Java Spark应用程序(实际上使用RDD进行某些计算)来Datasets代替RDDs.我是数据集的新手,不知道如何将哪个事务映射到相应的数据集操作.

目前我将它们映射为:

JavaSparkContext.textFile(...)                       -> SQLContext.read().textFile(...)
JavaRDD.filter(Function)                             -> Dataset.filter(FilterFunction)
JavaRDD.map(Function)                                -> Dataset.map(MapFunction)
JavaRDD.mapToPair(PairFunction)                      -> Dataset.groupByKey(MapFunction) ???
JavaPairRDD.aggregateByKey(U, Function2, Function2)  -> KeyValueGroupedDataset.???
Run Code Online (Sandbox Code Playgroud)

相应的问题是:

  • 等于JavaRDD.mapToPairDataset.groupByKey方法?
  • 是否JavaPairRDD映射到KeyValueGroupedDataset
  • 哪种方法等于JavaPairRDD.aggregateByKey方法?

但是,我想将以下RDD代码移植到数据集中:

JavaRDD<Article> goodRdd = ...

JavaPairRDD<String, Article> ArticlePairRdd = goodRdd.mapToPair(new PairFunction<Article, String, Article>() {              // Build PairRDD<<Date|Store|Transaction><Article>>
    public Tuple2<String, Article> call(Article article) throws Exception {
        String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter(); …
Run Code Online (Sandbox Code Playgroud)

dataset rdd apache-spark-dataset apache-spark-2.0

7
推荐指数
0
解决办法
900
查看次数

将镶木地板读入Spark数据集而忽略缺少的字段

假设我创建一个实木复合地板文件,如下所示:

case class A (i:Int,j:Double,s:String)

var l1 = List(A(1,2.0,"s1"),A(2,3.0,"S2"))

val ds = spark.createDataset(l1)
ds.write.parquet("/tmp/test.parquet")
Run Code Online (Sandbox Code Playgroud)

是否可以将其读入具有不同架构的类型的数据集中,其中唯一的区别是很少有其他字段?

例如:

case class B (i:Int,j:Double,s:String,d:Double=1.0)  // d is extra and has a default value 
Run Code Online (Sandbox Code Playgroud)

有什么办法可以使我工作吗?:

val ds2 = spark.read.parquet("/tmp/test.parquet").as[B]
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet apache-spark-sql apache-spark-dataset apache-spark-2.0

7
推荐指数
1
解决办法
1677
查看次数

Spark 2.x数据集的Kryo序列化

使用Dataset API时是否仍需要Kryo序列化?

因为数据集使用编码器进行序列化和反序列化:

  1. Kyro序列化是否适用于数据集?(如果正确的配置传递给Spark,并且类已正确注册)
  2. 如果它有效,它会提供多少性能改进?谢谢.

kryo apache-spark-dataset apache-spark-2.0

7
推荐指数
1
解决办法
317
查看次数

Spark 2.0时间戳使用Scala以毫秒为单位的差异

我正在使用Spark 2.0并寻找在Scala中实现以下功能的方法:

需要两个数据框列值之间的时间戳差异(以毫秒为单位).

Value_1 = 06/13/2017 16:44:20.044
Value_2 = 06/13/2017 16:44:21.067
Run Code Online (Sandbox Code Playgroud)

两者的数据类型都是时间戳.

注意:对两个值应用函数unix_timestamp(列s)并减去工作但不要达到要求的毫秒值.

最终查询如下所示:

Select **timestamp_diff**(Value_2,Value_1) from table1
Run Code Online (Sandbox Code Playgroud)

这应该返回以下输出:

1023毫秒

在哪里timestamp_diff是计算差异的函数,以毫秒为单位.

timestamp scala user-defined-functions apache-spark-sql apache-spark-2.0

7
推荐指数
1
解决办法
3721
查看次数

java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理

我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。

我在应用程序启动时从检查点进行流式查询恢复期间遇到问题。由于有多个流查询源自单个 kafka 流点,并且每个流查询都有不同的 checkpint 目录。因此,如果作业失败,当我们重新启动作业时,会有一些流查询无法从检查点位置恢复,因此会抛出 Error Reading Delta file异常。这是日志:


Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most recent failure: Lost task 2.3 in stage 13.0 (TID 831, ip-172-31-10-246.us-west-2.compute.internal, executor 3): java.lang.IllegalStateException: Error reading delta file /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /checkpointing/wifiHealthPerUserPerMinute/state/0/2]: /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta does not exist
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360) …
Run Code Online (Sandbox Code Playgroud)

checkpoint apache-kafka apache-spark-2.0 spark-structured-streaming

6
推荐指数
1
解决办法
4021
查看次数