我有一个Dataframe
带有strings
. 如何检查其中的哪些行是数字。我在 PySpark 的官方文档中找不到任何功能-
values = [('25q36',),('75647',),('13864',),('8758K',),('07645',)]
df = sqlContext.createDataFrame(values,['ID',])
df.show()
+-----+
| ID|
+-----+
|25q36|
|75647|
|13864|
|8758K|
|07645|
+-----+
Run Code Online (Sandbox Code Playgroud)
在 Python 中,有一个函数.isDigit()
可以返回True
或者False
是否string
只包含数字。
预期数据帧 -
+-----+-------+
| ID| Value |
+-----+-------+
|25q36| False |
|75647| True |
|13864| True |
|8758K| False |
|07645| True |
+-----+-------+
Run Code Online (Sandbox Code Playgroud)
我想避免创建一个UDF
.
这是我面临的问题的缩影,我遇到了错误。让我尝试在这里重现它。
我将 a 保存DataFrame
为 a parquet
,但是当我重新加载DataFrame
fromparquet
文件并再次将其保存为 时parquet
,出现错误。
valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Run Code Online (Sandbox Code Playgroud)
错误信息-
执行器 22): java.io.FileNotFoundException: 请求的文件 maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet 不存在。底层文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或重新创建所涉及的数据集/数据帧来显式使 Spark 中的缓存失效。
另一个 SO问题解决了这个问题。建议的解决方案是像下面的代码一样刷新表格,但这没有帮助。问题在于元数据的刷新。我不知道如何刷新它。
df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Run Code Online (Sandbox Code Playgroud)
此问题的解决方法:解决此问题的一种不太优雅的方法是使用不同的名称保存DataFrame
as文件,然后删除原始文件,最后将此文件重命名为旧名称。parquet
parquet
parquet
# Workaround
import os …
Run Code Online (Sandbox Code Playgroud) 我正在使用 PySpark 并加载csv
文件。我有一列带有欧洲格式的数字,这意味着逗号替换了点,反之亦然。
例如:我有2.416,67
而不是2,416.67
.
My data in .csv file looks like this -
ID; Revenue
21; 2.645,45
23; 31.147,05
.
.
55; 1.009,11
Run Code Online (Sandbox Code Playgroud)
在 Pandas 中,通过在内部指定decimal=','
和thousands='.'
选项pd.read_csv()
来读取欧洲格式,可以轻松读取这样的文件。
熊猫代码:
import pandas as pd
df=pd.read_csv("filepath/revenues.csv",sep=';',decimal=',',thousands='.')
Run Code Online (Sandbox Code Playgroud)
我不知道如何在 PySpark 中做到这一点。
PySpark 代码:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
schema = StructType([
StructField("ID", StringType(), True),
StructField("Revenue", FloatType(), True)
])
df=spark.read.csv("filepath/revenues.csv",sep=';',encoding='UTF-8', schema=schema, header=True)
Run Code Online (Sandbox Code Playgroud)
谁能建议我们如何使用上述.csv()
函数在 PySpark 中加载这样的文件?
我有收入数据,我需要在德国地图上映射.
我的数据包含邮政编码,相应的纬度和经度,以及与该邮政编码相对应的收入价值,如下所示 -
data = {'35447': {50.6022608,8.861908900000001,434224.45€}}
Run Code Online (Sandbox Code Playgroud)
我正在使用plotly
Python库,这有助于创建Choropleth/Point Maps.我提到了Pandas Choropleth Maps的例子,它解释了如何使用plotly
,但是对于USA来说.在示例中通过设置
locationmode = 'USA-states'
locations = 'AL'# AL for Alabama, AZ for Arizona
Run Code Online (Sandbox Code Playgroud)
我们可以为美国创建Choropleth地图.同样,通过设置
locationmode = 'USA-states'
lon = -74.25908989999999 # Longitude
lat = 40.4773991 # Latitude
Run Code Online (Sandbox Code Playgroud)
我们可以为美国创建Point Maps.
问题:我们如何使用库plotly
为德国创建Choropleth/Point地图?或者plotly
只是为美国实施.pyGeoDb可以解决这个问题,但它不能用于Python 3. +
作为附注,可能有用也可能没有帮助 - 没有为邮政编码实施等值线图并plotly
确认.
你可以看到lscpu
命令的输出-
jack@042:~$ lscpu
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 56
On-line CPU(s) list: 0-55
Thread(s) per core: 2
Core(s) per socket: 14
Socket(s): 2
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 79
Model name: Intel(R) Xeon(R) CPU E5-2690 v4 @ 2.60GHz
Stepping: 1
CPU MHz: 2600.000
CPU max MHz: 2600.0000
CPU min MHz: 1200.0000
BogoMIPS: 5201.37
Virtualization: VT-x
Hypervisor vendor: vertical
Virtualization type: full
L1d cache: 32K
L1i cache: 32K …
Run Code Online (Sandbox Code Playgroud) 我.csv
有几列,我希望'n'
在使用spark.read.csv()
函数将此文件导入数据帧时跳过 4 行(或一般情况下)。我有一个.csv
这样的文件 -
ID;Name;Revenue
Identifier;Customer Name;Euros
cust_ID;cust_name;€
ID132;XYZ Ltd;2825
ID150;ABC Ltd;1849
Run Code Online (Sandbox Code Playgroud)
在普通的 Python 中,使用read_csv()
函数时,很简单,可以使用以下skiprow=n
选项来完成-
import pandas as pd
df=pd.read_csv('filename.csv',sep=';',skiprows=3) # Since we wish to skip top 3 lines
Run Code Online (Sandbox Code Playgroud)
使用 PySpark,我按如下方式导入这个 .csv 文件 -
df=spark.read.csv("filename.csv",sep=';')
This imports the file as -
ID |Name |Revenue
Identifier |Customer Name|Euros
cust_ID |cust_name |€
ID132 |XYZ Ltd |2825
ID150 |ABC Ltd 1849
Run Code Online (Sandbox Code Playgroud)
这是不正确的,因为我希望忽略前三行。我不能使用选项,'header=True'
因为它只会排除第一行。可以使用'comment='
选项,但为此需要行以特定字符开头,而我的文件并非如此。我在文档中找不到任何内容。有没有办法做到这一点?
我有一个 DataFrame df
,PySpark
如下所示 -
+-----+--------------------+-------+\n| ID| customers|country|\n+-----+--------------------+-------+\n|56 |xyz Limited |U.K. |\n|66 |ABC Limited |U.K. |\n|16 |Sons & Sons |U.K. |\n|51 |T\xc3\x9cV GmbH |Germany|\n|23 |Mueller GmbH |Germany|\n|97 |Schneider AG |Germany|\n|69 |Sahm UG |Austria|\n+-----+--------------------+-------+\n
Run Code Online (Sandbox Code Playgroud)\n\n我只想保留ID
从 5 或 6 开始的那些行。所以,我希望我的最终数据框看起来像这样 -
+-----+--------------------+-------+\n| ID| customers|country|\n+-----+--------------------+-------+\n|56 |xyz Limited |U.K. |\n|66 |ABC Limited |U.K. |\n|51 |T\xc3\x9cV GmbH |Germany|\n|69 |Sahm UG |Austria|\n+-----+--------------------+-------+\n
Run Code Online (Sandbox Code Playgroud)\n\n这可以通过多种方式实现,而且这不是问题。但是,我有兴趣了解如何使用语句来完成此操作LIKE
。
如果我只对从 5 开始的那些行感兴趣ID
,就可以像这样轻松完成 -
df=df.where("ID like (\'5%\')")\n
Run Code Online (Sandbox Code Playgroud)\n\n我的问题: …
我正在从 RDD 创建一个 DataFrame,其中一个值是date
. 我不知道如何DateType()
在模式中指定。
让我来说明手头的问题——
我们可以将 加载date
到 DataFrame 的一种方法是首先将其指定为字符串并date
使用to_date()函数将其转换为正确的。
from pyspark.sql.types import Row, StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col, to_date
values=sc.parallelize([(3,'2012-02-02'),(5,'2018-08-08')])
rdd= values.map(lambda t: Row(A=t[0],date=t[1]))
# Importing date as String in Schema
schema = StructType([StructField('A', IntegerType(), True), StructField('date', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
# Finally converting the string into date using to_date() function.
df = df.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df.show()
+---+----------+
| A| date|
+---+----------+
| 3|2012-02-02| …
Run Code Online (Sandbox Code Playgroud) 我有monthly
过去 5 年的收入数据,并且我parquet
以append
模式但列的格式存储各个月份的数据帧。这是下面的伪代码 -partitioned by
month
def Revenue(filename):
df = spark.read.load(filename)
.
.
df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')
Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')
Run Code Online (Sandbox Code Playgroud)
df
每月以格式存储,parquet
如下所示 -
问:如何删除parquet
特定月份对应的文件夹?
一种方法是将所有这些parquet
文件加载到一个大文件中df
,然后使用.where()
子句过滤掉该特定月份,然后将其保存回模式月份parquet
格式,如下所示 -partitionBy
overwrite
# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')
Run Code Online (Sandbox Code Playgroud)
但是,这种方法相当麻烦。
另一种方法是直接删除该特定月份的文件夹,但我不确定这是否是处理问题的正确方法,以免我们metadata
以不可预见的方式更改。
parquet
删除特定月份的数据的正确方法是什么?
我想用另一个值替换数据框列中的一个值,并且我必须为许多列(假设为 30/100 列)执行此操作
from pyspark.sql.functions import when, lit, col
df = sc.parallelize([(1, "foo", "val"), (2, "bar", "baz"), (3, "baz", "buz")]).toDF(["x", "y", "z"])
df.show()
# I can replace "baz" with Null separaely in column y and z
def replace(column, value):
return when(column != value, column).otherwise(lit(None))
df = df.withColumn("y", replace(col("y"), "baz"))\
.withColumn("z", replace(col("z"), "baz"))
df.show()
Run Code Online (Sandbox Code Playgroud)
我可以在 y 和 z 列中分别用 Null 替换“baz”。但我想对所有列都这样做 - 类似于下面的列表理解方式
[replace(df[col], "baz") for col in df.columns]
Run Code Online (Sandbox Code Playgroud) pyspark ×7
python ×6
apache-spark ×5
csv ×2
dataframe ×2
parquet ×2
choropleth ×1
comma ×1
core ×1
cpu ×1
format ×1
header ×1
metadata ×1
numeric ×1
pandas ×1
replaceall ×1
sql-like ×1
where-clause ×1