我在pyspark中有数据框.它的一些数字列包含'nan',因此当我读取数据并检查数据帧的模式时,这些列将具有"字符串"类型.我怎样才能改变他们为int type.I取代了"南"值是0,并再次检查的模式,但后来又它显示是跟着下面的代码对那些columns.I字符串类型:
data_df = sqlContext.read.format("csv").load('data.csv',header=True, inferSchema="true")
data_df.printSchema()
data_df = data_df.fillna(0)
data_df.printSchema()
Run Code Online (Sandbox Code Playgroud)
这里列'Plays'和'drafts'包含整数值,但由于这些列中存在nan,它们被视为字符串类型.
我想更改现有数据帧的架构,同时更改我遇到错误的架构。我是否可以更改数据帧的现有架构。
val customSchema=StructType(
Array(
StructField("data_typ", StringType, nullable=false),
StructField("data_typ", IntegerType, nullable=false),
StructField("proc_date", IntegerType, nullable=false),
StructField("cyc_dt", DateType, nullable=false),
));
val readDF=
+------------+--------------------+-----------+--------------------+
|DatatypeCode| Description|monthColNam| timeStampColNam|
+------------+--------------------+-----------+--------------------+
| 03099|Volumetric/Expand...| 201867|2018-05-31 18:25:...|
| 03307| Elapsed Day Factor| 201867|2018-05-31 18:25:...|
+------------+--------------------+-----------+--------------------+
val rows= readDF.rdd
val readDF1 = sparkSession.createDataFrame(rows,customSchema)
Run Code Online (Sandbox Code Playgroud)
预期结果
val newdf=
+------------+--------------------+-----------+--------------------+
|data_typ_cd | data_typ_desc|proc_dt | cyc_dt |
+------------+--------------------+-----------+--------------------+
| 03099|Volumetric/Expand...| 201867|2018-05-31 18:25:...|
| 03307| Elapsed Day Factor| 201867|2018-05-31 18:25:...|
+------------+--------------------+-----------+--------------------+
Run Code Online (Sandbox Code Playgroud)
任何帮助将被应用
在最近的SO-post 中,我发现withColumn在处理堆叠/链列表达式以及不同的窗口规范时,使用可能会改进 DAG。然而,在这个例子中,withColumn实际上使 DAG 变得更糟,并且与使用select相反的结果不同。
首先,一些测试数据(PySpark 2.4.4 独立版):
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
{
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
"col5": np.random.randint(0, 5, size=100),
}
)
df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
| 0| 3| 2| 2| 2|
| …Run Code Online (Sandbox Code Playgroud) python dataframe directed-acyclic-graphs apache-spark pyspark
我在不是很大的 DataFrame 上使用 toPandas() ,但出现以下异常:
18/10/31 19:13:19 ERROR Executor: Exception in task 127.2 in stage 13.0 (TID 2264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 261, in dump_stream
batch = _create_batch(series, self._timezone)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in <listcomp>
arrs = [create_array(s, t) for s, t in series] …Run Code Online (Sandbox Code Playgroud) 我已经通过以下方式创建了一个DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
df = spark.read.csv("train.csv", header=True)
Run Code Online (Sandbox Code Playgroud)
我的DataFrame的架构如下:
root
|-- PassengerId: string (nullable = true)
|-- Survived: string (nullable = true)
|-- Pclass: string (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: string (nullable = true)
|-- SibSp: string (nullable = true)
|-- Parch: string (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: …Run Code Online (Sandbox Code Playgroud) 我正在从 hbase 获取数据并将其转换为数据帧。现在,我在数据框中有一列是string数据类型。但我需要将其数据类型转换为Int.
尝试了下面的代码,但它给我一个错误
df.withColumn("order", 'order.cast(int)')
Run Code Online (Sandbox Code Playgroud)
我面临的错误如下
error:col should be column
Run Code Online (Sandbox Code Playgroud)
我在这里给出了正确的列名,我需要在 pyspark 中更改上述代码的语法吗?