我是Spark和Spark SQL的新手.
如何createOrReplaceTempView在Spark工作?
如果我们将一个RDD对象注册为一个表,那么火花会将所有数据保存在内存中吗?
我试图将Spark RDD转换为DataFrame.我已经看到了将方案传递给sqlContext.CreateDataFrame(rdd,schema)函数的文档和示例
.
但我有38列或字段,这将进一步增加.如果我手动给出指定每个字段信息的模式,那将会是如此繁琐的工作.
有没有其他方法可以在不知道先前列的信息的情况下指定模式.
我正在尝试将Pandas DF转换为Spark.DF头:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Run Code Online (Sandbox Code Playgroud)
码:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
Run Code Online (Sandbox Code Playgroud)
我收到一个错误:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Run Code Online (Sandbox Code Playgroud) 我正在尝试获取输入数据:
A B C
--------------
4 blah 2
2 3
56 foo 3
Run Code Online (Sandbox Code Playgroud)
并根据B是否为空来在末尾添加一列:
A B C D
--------------------
4 blah 2 1
2 3 0
56 foo 3 1
Run Code Online (Sandbox Code Playgroud)
我可以通过将输入数据帧注册为临时表,然后键入SQL查询来轻松完成此操作.
但我真的想知道如何使用Scala方法执行此操作,而不必在Scala中键入SQL查询.
我已经尝试过了.withColumn,但我无法做到我想做的事情.
我有以下Scala值:
val values: List[Iterable[Any]] = Traces().evaluate(features).toList
Run Code Online (Sandbox Code Playgroud)
我想将其转换为DataFrame.
当我尝试以下内容时:
sqlContext.createDataFrame(values)
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误:
error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$1: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (List[Iterable[Any]])
sqlContext.createDataFrame(values)
Run Code Online (Sandbox Code Playgroud)
为什么?
我想使用下面的代码转换spark数据框:
from pyspark.mllib.clustering import KMeans
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)
详细的错误消息是:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-a19a1763d3ac> in <module>()
1 from pyspark.mllib.clustering import KMeans
2 spark_df = sqlContext.createDataFrame(pandas_df)
----> 3 rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
4 model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
842 if name not in self.columns:
843 raise AttributeError(
--> 844 "'%s' object …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark spark-dataframe apache-spark-mllib
我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.
spark = SparkSession.builder
.master("ip")
.enableHiveSupport()
.getOrCreate()
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)
当我将配置放入spark提交时,它工作正常
spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud) 我正在尝试为每个用户构建一个包含每小时每小时平均记录数的向量.因此,矢量必须具有24维.
我的原始DataFrame有userID和hour列,我开始做一个groupBy并计算每个用户每小时的记录数,如下所示:
val hourFreqDF = df.groupBy("userID", "hour").agg(count("*") as "hfreq")
Run Code Online (Sandbox Code Playgroud)
现在,为了根据本答案中的第一个建议,我按照每个用户生成一个向量.
val hours = (0 to 23 map { n => s"$n" } toArray)
val assembler = new VectorAssembler()
.setInputCols(hours)
.setOutputCol("hourlyConnections")
val exprs = hours.map(c => avg(when($"hour" === c, $"hfreq").otherwise(lit(0))).alias(c))
val transformed = assembler.transform(hourFreqDF.groupBy($"userID")
.agg(exprs.head, exprs.tail: _*))
Run Code Online (Sandbox Code Playgroud)
当我运行此示例时,我收到以下警告:
Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Run Code Online (Sandbox Code Playgroud)
我认为这是因为表达太长了?
我的问题是:我能安全地忽略这个警告吗?
使用Spark 1.6.1版本我需要在列上获取不同的值,然后在其上执行一些特定的转换.该列包含超过5000万条记录,并且可以变大.
我知道做一个distinct.collect()会把呼叫带回驱动程序.目前我正在执行如下任务,是否有更好的方法?
import sqlContext.implicits._
preProcessedData.persist(StorageLevel.MEMORY_AND_DISK_2)
preProcessedData.select(ApplicationId).distinct.collect().foreach(x => {
val applicationId = x.getAs[String](ApplicationId)
val selectedApplicationData = preProcessedData.filter($"$ApplicationId" === applicationId)
// DO SOME TASK PER applicationId
})
preProcessedData.unpersist()
Run Code Online (Sandbox Code Playgroud) scala dataframe apache-spark apache-spark-sql spark-dataframe
我正在编写一个用户定义的函数,它将获取除数据帧中第一个之外的所有列并进行求和(或任何其他操作).现在数据框有时可以有3列或4列或更多列.它会有所不同.
我知道我可以硬编码4个列名作为UDF传递,但在这种情况下它会有所不同所以我想知道如何完成它?
以下是第一个示例中的两个示例,我们有两列要添加,第二个示例中我们有三列要添加.