我有一个使用Spark 2.0新API的Spark应用程序SparkSession.我正在使用另一个应用程序之上构建此应用程序SparkContext.我想传递SparkContext给我的应用程序并SparkSession使用现有的初始化SparkContext.
但是我找不到怎么做的方法.我发现SparkSession构造函数SparkContext是私有的,所以我不能以这种方式初始化它,构建器不提供任何setSparkContext方法.你认为有一些解决方法吗?
我正在尝试使用基于DataFrame/Dataset API的Spark-Streaming来加载来自Kafka的数据流的结构化流方法.
我用:
Spark Kafka DataSource定义了底层架构:
|key|value|topic|partition|offset|timestamp|timestampType|
Run Code Online (Sandbox Code Playgroud)
我的数据采用json格式,并存储在值列中.我正在寻找一种方法如何从值列中提取底层模式并将接收到的数据帧更新为存储在值中的列?我尝试了下面的方法,但它不起作用:
val columns = Array("column1", "column2") // column names
val rawKafkaDF = sparkSession.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic)
.load()
val columnsToSelect = columns.map( x => new Column("value." + x))
val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
// some analytics using stream dataframe kafkaDF
val query = kafkaDF.writeStream.format("console").start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
在这里我得到了Exception,org.apache.spark.sql.AnalysisException: Can't extract value from value#337;因为在创建流时,里面的值是未知的...
你有什么建议吗?
scala apache-kafka apache-spark apache-spark-sql spark-structured-streaming
我正在使用Apache Spark 2.0 Dataframe/Dataset API我想从值列表中向我的数据框添加一个新列.我的列表具有与给定数据帧相同数量的值.
val list = List(4,5,10,7,2)
val df = List("a","b","c","d","e").toDF("row1")
Run Code Online (Sandbox Code Playgroud)
我想做的事情如下:
val appendedDF = df.withColumn("row2",somefunc(list))
df.show()
// +----+------+
// |row1 |row2 |
// +----+------+
// |a |4 |
// |b |5 |
// |c |10 |
// |d |7 |
// |e |2 |
// +----+------+
Run Code Online (Sandbox Code Playgroud)
对于任何想法我会很高兴,我的数据帧实际上包含更多列.
我是Kafka Streams的新手,正在使用1.0.0版。我想从一个值中为KTable设置一个新键。
使用KStream时,可以通过使用像这样的selectKey()方法来完成。
kstream.selectKey ((k,v) -> v.newKey)
Run Code Online (Sandbox Code Playgroud)
但是,KTable中缺少这种方法。唯一的方法是将给定的KTable转换为KStream。对这个问题有什么想法吗?它改变了反对KTable设计的关键吗?