例如,假设我有DataFrame:
var myDF = sc.parallelize(Seq(("one",1),("two",2),("three",3))).toDF("a", "b")
Run Code Online (Sandbox Code Playgroud)
我可以将它转换为RDD[(String, Int)]带有地图的a:
var myRDD = myDF.map(r => (r(0).asInstanceOf[String], r(1).asInstanceOf[Int]))
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来实现这一点,可能使用DF架构?
所有,
我试图通过Databricks在Spark 1.6.0上使用Kinesis和Spark Streaming,我的ssc.start()命令正在挂起.
我使用以下函数来制作我的Spark Streaming上下文:
def creatingFunc(sc: SparkContext): StreamingContext =
{
// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
// Creata a Kinesis stream
val kinesisStream = KinesisUtils.createStream(ssc,
kinesisAppName, kinesisStreamName,
kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName,
InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
StorageLevel.MEMORY_AND_DISK_SER_2, config.awsAccessKeyId, config.awsSecretKey)
kinesisStream.print()
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
}
Run Code Online (Sandbox Code Playgroud)
但是当我运行以下命令来启动流上下文时:
// Stop any existing StreamingContext
val stopActiveContext = true
if (stopActiveContext) {
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
}
// Get or create a streaming context.
val ssc = StreamingContext.getActiveOrCreate(() => main.creatingFunc(sc)) …Run Code Online (Sandbox Code Playgroud) 我正在尝试在Windows 10上安装SDK 7.1.这对许多人来说似乎是一个棘手的任务,我尝试了以下解决方案:
在每种情况下,我都会收到以下错误之一:
错误1304.写入文件时出错:Microsoft.Build.CPPTasks.Common.dll.验证您是否有权访问该目录.
要么
错误1304.写入文件时出错:mscorcfg.dll.验证您是否有权访问该目录.
我正在运行具有管理员权限的安装程序,并且我已验证我的.NET安装正在运行(使用.NET安装验证工具).
我也尝试禁用所有AV和防火墙,甚至尝试干净启动,但所有这些步骤都导致了同样的错误.
编辑:
我也尝试过安装Visual Studio [2015,2013,2012],但都失败了Error writing to file: ****.dll.
我有一个spark数据帧,我需要为特定列过滤空值和空格.
可以说dataframe有两列.col2既有空值也有空格.
col1 col2
1 abc
2 null
3 null
4
5 def
Run Code Online (Sandbox Code Playgroud)
我想应用过滤掉col2为空或空白的记录.任何人都可以帮忙解决这个问题.
版本:Spark1.6.2 Scala 2.10
我正在尝试做一个尾递归方法,但我正在使用Map,我不知道如何使用模式匹配来检查Map是否为空/ null并得到head/tail:
def aa(a:Map[String, Seq[Operation]]): Map[String, (Seq[Operation], Double)] = {
def aaRec(xx:Map[String, Seq[Operation]],
res:Map[String, (Seq[Operation], Double)],
acc:Double = 0): Map[String, (Seq[Operation], Double)] = xx match {
case ? =>
res
case _ =>
val head = xx.head
val balance = head._2.foldLeft(acc)(_ + _.amount)
aaRec(xx.tail, res + (head._1 -> (head._2, balance)), balance)
}
aaRec(a, Map[String, (Seq[Operation], Double)]())
}
}
Run Code Online (Sandbox Code Playgroud)
案例空地图和案例h :: t的正确语法是什么?
提前致谢