标签: spark-dataframe

使用pyspark从元组列表创建DataFrame

我正在处理使用simple-salesforce包从SFDC中提取的数据.我使用Python3编写脚本和Spark 1.5.2.

我创建了一个包含以下数据的rdd:

[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')]
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')]
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')]
...
Run Code Online (Sandbox Code Playgroud)

此数据位于RDD中,名为v_rdd

我的架构如下所示:

StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true)))
Run Code Online (Sandbox Code Playgroud)

我试图从这个RDD创建DataFrame:

sqlDataFrame = sqlContext.createDataFrame(v_rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我打印我的DataFrame:

sqlDataFrame.printSchema()
Run Code Online (Sandbox Code Playgroud)

并获得以下内容:

+--------------------+--------------------+--------------------+
|                  Id|  PackSize|                          Name|
+--------------------+--------------------+--------------------+
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
Run Code Online (Sandbox Code Playgroud)

我期待看到实际数据,如下所示:

+------------------+------------------+--------------------+
|                Id|PackSize|                          Name|
+------------------+------------------+--------------------+
|a0w1a0000003xB1A  |               1.0|       A            |
|a0w1a0000003xAAI  |               1.0|       B            |
|a0w1a00000xB3AAI  |              30.0|       C            |
Run Code Online (Sandbox Code Playgroud)

你能帮我辨认一下我在做错了吗?

我的Python脚本很长,我不确定人们对它进行筛选是否方便,所以我只发布了我遇到问题的部分.

提前谢谢!

python-3.x pyspark spark-dataframe

12
推荐指数
1
解决办法
3万
查看次数

在PySpark中按一列中的不同值过滤行

假设我有下表:

+--------------------+--------------------+------+------------+--------------------+
|                host|                path|status|content_size|                time|
+--------------------+--------------------+------+------------+--------------------+
|js002.cc.utsunomi...|/shuttle/resource...|   404|           0|1995-08-01 00:07:...|
|    tia1.eskimo.com |/pub/winvn/releas...|   404|           0|1995-08-01 00:28:...|
|grimnet23.idirect...|/www/software/win...|   404|           0|1995-08-01 00:50:...|
|miriworld.its.uni...|/history/history.htm|   404|           0|1995-08-01 01:04:...|
|      ras38.srv.net |/elv/DELTA/uncons...|   404|           0|1995-08-01 01:05:...|
| cs1-06.leh.ptd.net |                    |   404|           0|1995-08-01 01:17:...|
|dialip-24.athenet...|/history/apollo/a...|   404|           0|1995-08-01 01:33:...|
|  h96-158.ccnet.com |/history/apollo/a...|   404|           0|1995-08-01 01:35:...|
|  h96-158.ccnet.com |/history/apollo/a...|   404|           0|1995-08-01 01:36:...|
|  h96-158.ccnet.com |/history/apollo/a...|   404|           0|1995-08-01 01:36:...|
|  h96-158.ccnet.com |/history/apollo/a...|   404|           0|1995-08-01 01:36:...|
|  h96-158.ccnet.com |/history/apollo/a...|   404|           0|1995-08-01 01:36:...|
|  h96-158.ccnet.com |/history/apollo/a...|   404|           0|1995-08-01 01:36:...|
|  h96-158.ccnet.com …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql pyspark spark-dataframe

12
推荐指数
1
解决办法
8622
查看次数

Apache Spark Dataframe Groupby agg()用于多列

我有DataFrame3列,即Id, First Name, Last Name

我想申请GroupBy的基础上,Id并希望收集First Name, Last Name列作为列表.

示例: - 我有这样的DF

+---+-------+--------+
|id |fName  |lName   |
+---+-------+--------+
|1  |Akash  |Sethi   |
|2  |Kunal  |Kapoor  |
|3  |Rishabh|Verma   |
|2  |Sonu   |Mehrotra|
+---+-------+--------+
Run Code Online (Sandbox Code Playgroud)

我希望我的输出像这样

+---+-------+--------+--------------------+
|id |fname           |lName               |
+---+-------+--------+--------------------+
|1  |[Akash]         |[Sethi]             |
|2  |[Kunal, Sonu]   |[Kapoor, Mehrotra]  |
|3  |[Rishabh]       |[Verma]             |
+---+-------+--------+--------------------+
Run Code Online (Sandbox Code Playgroud)

提前致谢

scala apache-spark spark-dataframe

12
推荐指数
1
解决办法
7555
查看次数

Spark SQL:如何将REST服务中的json数据作为DataFrame使用

我需要从提供REST接口的Web服务中读取一些JSON数据,以便从我的SPARK SQL代码中查询数据以进行分析.我能够读取存储在blob存储中的JSON并使用它.

我想知道什么是从REST服务读取数据的最佳方式,并像其他任何方式一样使用它DataFrame.

BTW我正在使用,SPARK 1.6 of Linux cluster on HD insight如果这有帮助.如果有人可以共享任何代码片段,我也会很感激,因为我对SPARK环境仍然很新.

hdinsight apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
1万
查看次数

Spark,在Scala中添加具有相同值的新列

withColumn在Spark-Scala环境中遇到了一些问题.我想在我的DataFrame中添加一个新列,如下所示:

+---+----+---+
|  A|   B|  C|
+---+----+---+
|  4|blah|  2|
|  2|    |  3|
| 56| foo|  3|
|100|null|  5|
+---+----+---+
Run Code Online (Sandbox Code Playgroud)

成为:

+---+----+---+-----+
|  A|   B|  C|  D  |
+---+----+---+-----+
|  4|blah|  2|  750|
|  2|    |  3|  750|
| 56| foo|  3|  750|
|100|null|  5|  750|
+---+----+---+-----+
Run Code Online (Sandbox Code Playgroud)

对于我的DataFrame中的每一行,一列中的列D重复N次.

代码是这样的:

var totVehicles : Double = df_totVehicles(0).getDouble(0); //return 750
Run Code Online (Sandbox Code Playgroud)

变量totVehicles返回正确的值,它的工作原理!

第二个DataFrame必须计算2个字段(id_zipcode,n_vehicles),并添加第三列(具有相同的值-750):

var df_nVehicles =
df_carPark.filter(
      substring($"id_time",1,4) < 2013
    ).groupBy(
      $"id_zipcode"
    ).agg(
      sum($"n_vehicles") as 'n_vehicles
    ).select(
      $"id_zipcode" as 'id_zipcode,
      'n_vehicles …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

11
推荐指数
1
解决办法
2万
查看次数

Spark:查找至少具有n个共同属性的对?

我有一个数据集,包括(sensor_id, timestamp, data)(在sensor_id是的IoT设备的ID,时间戳是UNIX时间和数据是它们的输出在当时的MD5散列).表中没有主键,但每行都是唯一的.

我需要找到所有sensor_ids 对,s1并且s2这两个传感器在它们之间至少具有n(n=50)条目(timestamp, data),即在n不同的情况下它们在相同的时间戳发出相同的数据.

对于数据的大小感,我有10B行和~50M不同sensor_ids,我相信大约有~5M对传感器ID,它们在同一时间戳发出相同数据至少50次.

Spark中最好的方法是什么?我尝试了各种方法(分组(timestamp, data)和/或自连接),但它们的复杂性非常昂贵.

algorithm apache-spark spark-streaming apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
332
查看次数

Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并要求'REFRESH TABLE tableName'

对于spark sql,我们应该如何从HDFS中的一个文件夹中获取数据,进行一些修改,并通过覆盖保存模式将更新的数据保存到HDFS中的同一文件夹而不会得到FileNotFoundException?

import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf

val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")

newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
Run Code Online (Sandbox Code Playgroud)

当我们从hdfs目录"d = 2017-03-20"读取数据时,会发生FileNotFoundException,并将(SaveMode.Overwrite)更新的数据保存到相同的hdfs目录"d = 2017-03-20"

Caused by: org.apache.spark.SparkException: Task failed while writing rows
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

11
推荐指数
2
解决办法
1万
查看次数

PySpark:当另一个列值满足条件时修改列值

我有一个PySpark Dataframe,它有两列Id和rank,

+---+----+
| Id|Rank|
+---+----+
|  a|   5|
|  b|   7|
|  c|   8|
|  d|   1|
+---+----+
Run Code Online (Sandbox Code Playgroud)

对于每一行,如果Rank大于5,我希望用"other"替换Id.

如果我使用伪代码来解释:

For row in df:
  if row.Rank>5:
     then replace(row.Id,"other")
Run Code Online (Sandbox Code Playgroud)

结果应该是这样的,

+-----+----+
|   Id|Rank|
+-----+----+
|    a|   5|
|other|   7|
|other|   8|
|    d|   1|
+-----+----+
Run Code Online (Sandbox Code Playgroud)

任何线索如何实现这一目标?谢谢!!!


要创建此Dataframe:

df = spark.createDataFrame([('a',5),('b',7),('c',8),('d',1)], ["Id","Rank"])
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

11
推荐指数
2
解决办法
2万
查看次数

获取Spark数据帧列列表

如何将spark数据帧中的所有列名称转换为Seq变量.

输入数据和架构

val dataset1 = Seq(("66", "a", "4"), ("67", "a", "0"), ("70", "b", "4"), ("71", "d", "4")).toDF("KEY1", "KEY2", "ID")

dataset1.printSchema()
root
|-- KEY1: string (nullable = true)
|-- KEY2: string (nullable = true)
|-- ID: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我需要使用scala编程将所有列名存储在变量中.我试过如下,但它不起作用.

val selectColumns = dataset1.schema.fields.toSeq

selectColumns: Seq[org.apache.spark.sql.types.StructField] = WrappedArray(StructField(KEY1,StringType,true),StructField(KEY2,StringType,true),StructField(ID,StringType,true))
Run Code Online (Sandbox Code Playgroud)

预期产量:

val selectColumns = Seq(
  col("KEY1"),
  col("KEY2"),
  col("ID")
)

selectColumns: Seq[org.apache.spark.sql.Column] = List(KEY1, KEY2, ID)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

11
推荐指数
3
解决办法
2万
查看次数

PySpark - 将列表作为参数传递给UDF

我需要将列表传递给UDF,列表将确定距离的分数/类别.就目前而言,我很难将所有距离编码为第4分.

a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])

from pyspark.sql.functions import udf
def cate(label, feature_list):
    if feature_list == 0:
        return label[4]
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
udf_score=udf(cate, StringType())
a.withColumn("category", udf_score(label_list,a["distances"])).show(10)
Run Code Online (Sandbox Code Playgroud)

当我尝试这样的事情时,我得到了这个错误.

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

python user-defined-functions pyspark spark-dataframe

11
推荐指数
2
解决办法
1万
查看次数