我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.
结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.
在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.
在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.
我查看了这个示例,例如我在Kafka中的Avro对象是退出复杂的,不能简单地像示例中的String一样进行转换.
到目前为止,我尝试了这种代码(我在这里看到了另一个问题):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
Run Code Online (Sandbox Code Playgroud)
我得到"数据类型不匹配:无法将BinaryType转换为StructType(StructField(...."
我怎样才能反序化值?
我正在使用HTTP POST调用在Livy Server中执行语句localhost:8998/sessions/0/statements
,具有以下正文
{
"code": "spark.sql(\"select * from test_table limit 10\")"
}
Run Code Online (Sandbox Code Playgroud)
我想以下列格式给出答案
(...)
"data": {
"application/json": "[
{"id": "123", "init_date": 1481649345, ...},
{"id": "133", "init_date": 1481649333, ...},
{"id": "155", "init_date": 1481642153, ...},
]"
}
(...)
Run Code Online (Sandbox Code Playgroud)
但我得到的是
(...)
"data": {
"text/plain": "res0: org.apache.spark.sql.DataFrame = [id: string, init_date: timestamp ... 64 more fields]"
}
(...)
Run Code Online (Sandbox Code Playgroud)
哪个是toString()
数据帧的版本.
有没有办法使用Livy Server将数据帧作为JSON返回?
找到解决问题的JIRA问题:https://issues.cloudera.org/browse/LIVY-72
根据评论,可以说Livy不会也不会支持这样的功能?
我正在使用Spark 2.1.1,我正在使用Scala API,尽管语言不太重要.我有兴趣以有效的方式优化火花查询/管道.我已经阅读了很多材料(包括伟大的"学习星火"书,我对Spark网站,Jacek Laskowski的博客以及其他人非常熟悉,而且我已经和Spark一起工作了将近两年.
但是,有太多的信息和概念需要注意,而且我没有做足够的优化来了解它们.不幸的是,一旦一切工作100%,可能只需要几天甚至几小时才能交付代码.我需要优先考虑我可以应用的修复程序.我之前已经优化了工作火花代码,但我正在寻找最好的整体策略以及尝试熟悉最好的低挂水果.总有一天,我会记住所有要调整的旋钮,但至少现在有十个非常好的旋钮.我目前认为重要的一些事情是(不是按顺序排列,但前4个恰好是我认为最重要的)...
对我来说最有趣的增强功能是那些通过查看查询计划或DAG可视化而显而易见的增强功能.此外,使火花用户/开发人员走向"啊哈!"的老生常谈 您可能愿意分享.免责声明:以上十件事对我来说并不完全是"前十名",比如使用火花库函数代替UDF并不是非常重要(当然不是至少十大),但我想帮助给出一个好的例子.提示可能看起来像某人.
scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0
我希望这个问题有一个明显的答案!
我刚刚升级到Spark v2.0并且遇到了spark-shell(Scala 2.11版本)的奇怪问题.
如果我输入以下最小的Scala,
import java.sql.Timestamp case class Crime(caseNumber: String, date: Timestamp, description: String, detail: String, arrest: Boolean)
我收到以下错误,
<console>:11: error: not found: type Timestamp
如果我在其他地方使用Java Timestamp类,例如在函数中,则不会生成错误(正如您所期望的那样导致导入).
如果我完全符合条件并在case类中使用java.sql.Timestamp它就可以了!
我错过了一些明显的东西吗
我有一个包含如下数据的数据集:
|c1| c2|
---------
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | a |
| 2 | b |
Run Code Online (Sandbox Code Playgroud)
...
现在,我希望将数据分组如下(col1:String Key,col2:List):
| c1| c2 |
-----------
| 1 |a,b,c|
| 2 | a, b|
...
Run Code Online (Sandbox Code Playgroud)
我认为使用goupByKey是一个足够的解决方案,但我找不到任何例子,如何使用它.
任何人都可以帮我找到使用groupByKey或使用任何其他转换和动作组合的解决方案来通过使用数据集获得此输出,而不是RDD?
我想调整我的Java Spark应用程序(实际上使用RDD进行某些计算)来Datasets
代替RDDs
.我是数据集的新手,不知道如何将哪个事务映射到相应的数据集操作.
目前我将它们映射为:
JavaSparkContext.textFile(...) -> SQLContext.read().textFile(...)
JavaRDD.filter(Function) -> Dataset.filter(FilterFunction)
JavaRDD.map(Function) -> Dataset.map(MapFunction)
JavaRDD.mapToPair(PairFunction) -> Dataset.groupByKey(MapFunction) ???
JavaPairRDD.aggregateByKey(U, Function2, Function2) -> KeyValueGroupedDataset.???
Run Code Online (Sandbox Code Playgroud)
相应的问题是:
JavaRDD.mapToPair
该Dataset.groupByKey
方法?JavaPairRDD
映射到KeyValueGroupedDataset
?JavaPairRDD.aggregateByKey
方法?但是,我想将以下RDD代码移植到数据集中:
JavaRDD<Article> goodRdd = ...
JavaPairRDD<String, Article> ArticlePairRdd = goodRdd.mapToPair(new PairFunction<Article, String, Article>() { // Build PairRDD<<Date|Store|Transaction><Article>>
public Tuple2<String, Article> call(Article article) throws Exception {
String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter(); …
Run Code Online (Sandbox Code Playgroud) 假设我创建一个实木复合地板文件,如下所示:
case class A (i:Int,j:Double,s:String)
var l1 = List(A(1,2.0,"s1"),A(2,3.0,"S2"))
val ds = spark.createDataset(l1)
ds.write.parquet("/tmp/test.parquet")
Run Code Online (Sandbox Code Playgroud)
是否可以将其读入具有不同架构的类型的数据集中,其中唯一的区别是很少有其他字段?
例如:
case class B (i:Int,j:Double,s:String,d:Double=1.0) // d is extra and has a default value
Run Code Online (Sandbox Code Playgroud)
有什么办法可以使我工作吗?:
val ds2 = spark.read.parquet("/tmp/test.parquet").as[B]
Run Code Online (Sandbox Code Playgroud) apache-spark parquet apache-spark-sql apache-spark-dataset apache-spark-2.0
使用Dataset API时是否仍需要Kryo序列化?
因为数据集使用编码器进行序列化和反序列化:
我正在使用Spark 2.0并寻找在Scala中实现以下功能的方法:
需要两个数据框列值之间的时间戳差异(以毫秒为单位).
Value_1 = 06/13/2017 16:44:20.044
Value_2 = 06/13/2017 16:44:21.067
Run Code Online (Sandbox Code Playgroud)
两者的数据类型都是时间戳.
注意:对两个值应用函数unix_timestamp(列s)并减去工作但不要达到要求的毫秒值.
最终查询如下所示:
Select **timestamp_diff**(Value_2,Value_1) from table1
Run Code Online (Sandbox Code Playgroud)
这应该返回以下输出:
1023毫秒
在哪里timestamp_diff
是计算差异的函数,以毫秒为单位.
timestamp scala user-defined-functions apache-spark-sql apache-spark-2.0
我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。
我在应用程序启动时从检查点进行流式查询恢复期间遇到问题。由于有多个流查询源自单个 kafka 流点,并且每个流查询都有不同的 checkpint 目录。因此,如果作业失败,当我们重新启动作业时,会有一些流查询无法从检查点位置恢复,因此会抛出 Error Reading Delta file异常。这是日志:
Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most recent failure: Lost task 2.3 in stage 13.0 (TID 831, ip-172-31-10-246.us-west-2.compute.internal, executor 3): java.lang.IllegalStateException: Error reading delta file /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /checkpointing/wifiHealthPerUserPerMinute/state/0/2]: /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta does not exist
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360) …
Run Code Online (Sandbox Code Playgroud) checkpoint apache-kafka apache-spark-2.0 spark-structured-streaming
apache-spark-2.0 ×10
apache-spark ×5
scala ×4
apache-kafka ×2
dataset ×2
avro ×1
checkpoint ×1
cloudera ×1
group-by ×1
java ×1
json ×1
kryo ×1
livy ×1
parquet ×1
rdd ×1
timestamp ×1