我正在处理使用simple-salesforce包从SFDC中提取的数据.我使用Python3编写脚本和Spark 1.5.2.
我创建了一个包含以下数据的rdd:
[('Id', 'a0w1a0000003xB1A'), ('PackSize', 1.0), ('Name', 'A')]
[('Id', 'a0w1a0000003xAAI'), ('PackSize', 1.0), ('Name', 'B')]
[('Id', 'a0w1a00000xB3AAI'), ('PackSize', 30.0), ('Name', 'C')]
...
Run Code Online (Sandbox Code Playgroud)
此数据位于RDD中,名为v_rdd
我的架构如下所示:
StructType(List(StructField(Id,StringType,true),StructField(PackSize,StringType,true),StructField(Name,StringType,true)))
Run Code Online (Sandbox Code Playgroud)
我试图从这个RDD创建DataFrame:
sqlDataFrame = sqlContext.createDataFrame(v_rdd, schema)
Run Code Online (Sandbox Code Playgroud)
我打印我的DataFrame:
sqlDataFrame.printSchema()
Run Code Online (Sandbox Code Playgroud)
并获得以下内容:
+--------------------+--------------------+--------------------+
| Id| PackSize| Name|
+--------------------+--------------------+--------------------+
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
|[Ljava.lang.Objec...|[Ljava.lang.Objec...|[Ljava.lang.Objec...|
Run Code Online (Sandbox Code Playgroud)
我期待看到实际数据,如下所示:
+------------------+------------------+--------------------+
| Id|PackSize| Name|
+------------------+------------------+--------------------+
|a0w1a0000003xB1A | 1.0| A |
|a0w1a0000003xAAI | 1.0| B |
|a0w1a00000xB3AAI | 30.0| C |
Run Code Online (Sandbox Code Playgroud)
你能帮我辨认一下我在做错了吗?
我的Python脚本很长,我不确定人们对它进行筛选是否方便,所以我只发布了我遇到问题的部分.
提前谢谢!
假设我有下表:
+--------------------+--------------------+------+------------+--------------------+
| host| path|status|content_size| time|
+--------------------+--------------------+------+------------+--------------------+
|js002.cc.utsunomi...|/shuttle/resource...| 404| 0|1995-08-01 00:07:...|
| tia1.eskimo.com |/pub/winvn/releas...| 404| 0|1995-08-01 00:28:...|
|grimnet23.idirect...|/www/software/win...| 404| 0|1995-08-01 00:50:...|
|miriworld.its.uni...|/history/history.htm| 404| 0|1995-08-01 01:04:...|
| ras38.srv.net |/elv/DELTA/uncons...| 404| 0|1995-08-01 01:05:...|
| cs1-06.leh.ptd.net | | 404| 0|1995-08-01 01:17:...|
|dialip-24.athenet...|/history/apollo/a...| 404| 0|1995-08-01 01:33:...|
| h96-158.ccnet.com |/history/apollo/a...| 404| 0|1995-08-01 01:35:...|
| h96-158.ccnet.com |/history/apollo/a...| 404| 0|1995-08-01 01:36:...|
| h96-158.ccnet.com |/history/apollo/a...| 404| 0|1995-08-01 01:36:...|
| h96-158.ccnet.com |/history/apollo/a...| 404| 0|1995-08-01 01:36:...|
| h96-158.ccnet.com |/history/apollo/a...| 404| 0|1995-08-01 01:36:...|
| h96-158.ccnet.com |/history/apollo/a...| 404| 0|1995-08-01 01:36:...|
| h96-158.ccnet.com …Run Code Online (Sandbox Code Playgroud) dataframe apache-spark apache-spark-sql pyspark spark-dataframe
我有DataFrame3列,即Id, First Name, Last Name
我想申请GroupBy的基础上,Id并希望收集First Name, Last Name列作为列表.
示例: - 我有这样的DF
+---+-------+--------+
|id |fName |lName |
+---+-------+--------+
|1 |Akash |Sethi |
|2 |Kunal |Kapoor |
|3 |Rishabh|Verma |
|2 |Sonu |Mehrotra|
+---+-------+--------+
Run Code Online (Sandbox Code Playgroud)
我希望我的输出像这样
+---+-------+--------+--------------------+
|id |fname |lName |
+---+-------+--------+--------------------+
|1 |[Akash] |[Sethi] |
|2 |[Kunal, Sonu] |[Kapoor, Mehrotra] |
|3 |[Rishabh] |[Verma] |
+---+-------+--------+--------------------+
Run Code Online (Sandbox Code Playgroud)
提前致谢
我需要从提供REST接口的Web服务中读取一些JSON数据,以便从我的SPARK SQL代码中查询数据以进行分析.我能够读取存储在blob存储中的JSON并使用它.
我想知道什么是从REST服务读取数据的最佳方式,并像其他任何方式一样使用它DataFrame.
BTW我正在使用,SPARK 1.6 of Linux cluster on HD insight如果这有帮助.如果有人可以共享任何代码片段,我也会很感激,因为我对SPARK环境仍然很新.
我withColumn在Spark-Scala环境中遇到了一些问题.我想在我的DataFrame中添加一个新列,如下所示:
+---+----+---+
| A| B| C|
+---+----+---+
| 4|blah| 2|
| 2| | 3|
| 56| foo| 3|
|100|null| 5|
+---+----+---+
Run Code Online (Sandbox Code Playgroud)
成为:
+---+----+---+-----+
| A| B| C| D |
+---+----+---+-----+
| 4|blah| 2| 750|
| 2| | 3| 750|
| 56| foo| 3| 750|
|100|null| 5| 750|
+---+----+---+-----+
Run Code Online (Sandbox Code Playgroud)
对于我的DataFrame中的每一行,一列中的列D重复N次.
代码是这样的:
var totVehicles : Double = df_totVehicles(0).getDouble(0); //return 750
Run Code Online (Sandbox Code Playgroud)
变量totVehicles返回正确的值,它的工作原理!
第二个DataFrame必须计算2个字段(id_zipcode,n_vehicles),并添加第三列(具有相同的值-750):
var df_nVehicles =
df_carPark.filter(
substring($"id_time",1,4) < 2013
).groupBy(
$"id_zipcode"
).agg(
sum($"n_vehicles") as 'n_vehicles
).select(
$"id_zipcode" as 'id_zipcode,
'n_vehicles …Run Code Online (Sandbox Code Playgroud) 我有一个数据集,包括(sensor_id, timestamp, data)(在sensor_id是的IoT设备的ID,时间戳是UNIX时间和数据是它们的输出在当时的MD5散列).表中没有主键,但每行都是唯一的.
我需要找到所有sensor_ids 对,s1并且s2这两个传感器在它们之间至少具有n(n=50)条目(timestamp, data),即在n不同的情况下它们在相同的时间戳发出相同的数据.
对于数据的大小感,我有10B行和~50M不同sensor_ids,我相信大约有~5M对传感器ID,它们在同一时间戳发出相同数据至少50次.
Spark中最好的方法是什么?我尝试了各种方法(分组(timestamp, data)和/或自连接),但它们的复杂性非常昂贵.
algorithm apache-spark spark-streaming apache-spark-sql spark-dataframe
对于spark sql,我们应该如何从HDFS中的一个文件夹中获取数据,进行一些修改,并通过覆盖保存模式将更新的数据保存到HDFS中的同一文件夹而不会得到FileNotFoundException?
import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf
val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
Run Code Online (Sandbox Code Playgroud)
当我们从hdfs目录"d = 2017-03-20"读取数据时,会发生FileNotFoundException,并将(SaveMode.Overwrite)更新的数据保存到相同的hdfs目录"d = 2017-03-20"
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files …Run Code Online (Sandbox Code Playgroud) 我有一个PySpark Dataframe,它有两列Id和rank,
+---+----+
| Id|Rank|
+---+----+
| a| 5|
| b| 7|
| c| 8|
| d| 1|
+---+----+
Run Code Online (Sandbox Code Playgroud)
对于每一行,如果Rank大于5,我希望用"other"替换Id.
如果我使用伪代码来解释:
For row in df:
if row.Rank>5:
then replace(row.Id,"other")
Run Code Online (Sandbox Code Playgroud)
结果应该是这样的,
+-----+----+
| Id|Rank|
+-----+----+
| a| 5|
|other| 7|
|other| 8|
| d| 1|
+-----+----+
Run Code Online (Sandbox Code Playgroud)
任何线索如何实现这一目标?谢谢!!!
要创建此Dataframe:
df = spark.createDataFrame([('a',5),('b',7),('c',8),('d',1)], ["Id","Rank"])
Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql
如何将spark数据帧中的所有列名称转换为Seq变量.
输入数据和架构
val dataset1 = Seq(("66", "a", "4"), ("67", "a", "0"), ("70", "b", "4"), ("71", "d", "4")).toDF("KEY1", "KEY2", "ID")
dataset1.printSchema()
root
|-- KEY1: string (nullable = true)
|-- KEY2: string (nullable = true)
|-- ID: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我需要使用scala编程将所有列名存储在变量中.我试过如下,但它不起作用.
val selectColumns = dataset1.schema.fields.toSeq
selectColumns: Seq[org.apache.spark.sql.types.StructField] = WrappedArray(StructField(KEY1,StringType,true),StructField(KEY2,StringType,true),StructField(ID,StringType,true))
Run Code Online (Sandbox Code Playgroud)
预期产量:
val selectColumns = Seq(
col("KEY1"),
col("KEY2"),
col("ID")
)
selectColumns: Seq[org.apache.spark.sql.Column] = List(KEY1, KEY2, ID)
Run Code Online (Sandbox Code Playgroud) 我需要将列表传递给UDF,列表将确定距离的分数/类别.就目前而言,我很难将所有距离编码为第4分.
a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
from pyspark.sql.functions import udf
def cate(label, feature_list):
if feature_list == 0:
return label[4]
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
udf_score=udf(cate, StringType())
a.withColumn("category", udf_score(label_list,a["distances"])).show(10)
Run Code Online (Sandbox Code Playgroud)
当我尝试这样的事情时,我得到了这个错误.
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud) spark-dataframe ×10
apache-spark ×7
pyspark ×4
scala ×3
algorithm ×1
dataframe ×1
hdinsight ×1
pyspark-sql ×1
python ×1
python-3.x ×1