相关疑难解决方法(0)

如何在pyspark数据框中将字符串类型的列转换为int形式?

我在pyspark中有数据框.它的一些数字列包含'nan',因此当我读取数据并检查数据帧的模式时,这些列将具有"字符串"类型.我怎样才能改变他们为int type.I取代了"南"值是0,并再次检查的模式,但后来又它显示是跟着下面的代码对那些columns.I字符串类型:

data_df = sqlContext.read.format("csv").load('data.csv',header=True, inferSchema="true")
data_df.printSchema()
data_df = data_df.fillna(0)
data_df.printSchema()
Run Code Online (Sandbox Code Playgroud)

我的数据看起来像这样: 在此输入图像描述

这里列'Plays'和'drafts'包含整数值,但由于这些列中存在nan,它们被视为字符串类型.

python dataframe pyspark

20
推荐指数
3
解决办法
7万
查看次数

更改现有数据框的架构

我想更改现有数据帧的架构,同时更改我遇到错误的架构。我是否可以更改数据帧的现有架构。

val customSchema=StructType(
      Array(
        StructField("data_typ", StringType, nullable=false),
        StructField("data_typ", IntegerType, nullable=false),
        StructField("proc_date", IntegerType, nullable=false),
        StructField("cyc_dt", DateType, nullable=false),
        ));

val readDF=
+------------+--------------------+-----------+--------------------+
|DatatypeCode|         Description|monthColNam|     timeStampColNam|
+------------+--------------------+-----------+--------------------+
|       03099|Volumetric/Expand...|     201867|2018-05-31 18:25:...|
|       03307|  Elapsed Day Factor|     201867|2018-05-31 18:25:...|
+------------+--------------------+-----------+--------------------+

val rows= readDF.rdd
val readDF1 = sparkSession.createDataFrame(rows,customSchema)
Run Code Online (Sandbox Code Playgroud)

预期结果

val newdf=
    +------------+--------------------+-----------+--------------------+
    |data_typ_cd |       data_typ_desc|proc_dt    |     cyc_dt         |
    +------------+--------------------+-----------+--------------------+
    |       03099|Volumetric/Expand...|     201867|2018-05-31 18:25:...|
    |       03307|  Elapsed Day Factor|     201867|2018-05-31 18:25:...|
    +------------+--------------------+-----------+--------------------+
Run Code Online (Sandbox Code Playgroud)

任何帮助将被应用

scala dataframe apache-spark

9
推荐指数
2
解决办法
4万
查看次数

Spark DAG 与“withColumn”和“select”不同

语境

在最近的SO-post 中,我发现withColumn在处理堆叠/链列表达式以及不同的窗口规范时,使用可能会改进 DAG。然而,在这个例子中,withColumn实际上使 DAG 变得更糟,并且与使用select相反的结果不同。

可重现的例子

首先,一些测试数据(PySpark 2.4.4 独立版):

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),      
        "col5": np.random.randint(0, 5, size=100),        

    }
)

df = spark.createDataFrame(dfp)
df.show(5)

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   0|   3|   2|   2|   2|
| …
Run Code Online (Sandbox Code Playgroud)

python dataframe directed-acyclic-graphs apache-spark pyspark

8
推荐指数
1
解决办法
3019
查看次数

在 pyspark 中的 DataFrame 上使用 toPandas() 时出现神秘的“pyarrow.lib.ArrowInvalid:浮点值被截断”错误

我在不是很大的 DataFrame 上使用 toPandas() ,但出现以下异常:

18/10/31 19:13:19 ERROR Executor: Exception in task 127.2 in stage 13.0 (TID 2264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
      process()
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
      serializer.dump_stream(func(split_index, iterator), outfile)
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 261, in dump_stream
      batch = _create_batch(series, self._timezone)
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in _create_batch
      arrs = [create_array(s, t) for s, t in series]
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in <listcomp>
      arrs = [create_array(s, t) for s, t in series] …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark apache-arrow pyarrow

5
推荐指数
2
解决办法
9683
查看次数

如何转换Spark DataFrame列?使用pyspark

我已经通过以下方式创建了一个DataFrame:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

df = spark.read.csv("train.csv", header=True)
Run Code Online (Sandbox Code Playgroud)

我的DataFrame的架构如下:

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

0
推荐指数
1
解决办法
1750
查看次数

如何在数据框中投射一列?

我正在从 hbase 获取数据并将其转换为数据帧。现在,我在数据框中有一列是string数据类型。但我需要将其数据类型转换为Int.

尝试了下面的代码,但它给我一个错误

df.withColumn("order", 'order.cast(int)')
Run Code Online (Sandbox Code Playgroud)

我面临的错误如下

error:col should be column
Run Code Online (Sandbox Code Playgroud)

我在这里给出了正确的列名,我需要在 pyspark 中更改上述代码的语法吗?

dataframe apache-spark apache-spark-sql pyspark

-2
推荐指数
1
解决办法
3万
查看次数