我在 Python 2.7 中使用 Pyspark。我在字符串中有一个日期列(带毫秒)并且想转换为时间戳
这是我迄今为止尝试过的
df = df.withColumn('end_time', from_unixtime(unix_timestamp(df.end_time, '%Y-%M-%d %H:%m:%S.%f')) )
Run Code Online (Sandbox Code Playgroud)
printSchema()
显示
end_time: string (nullable = true)
当我将时间戳用作变量类型时
I'm trying to read pyspark DataFrame from Google Cloud Storage, but I keep getting an error that the service account has no storage.objects.create permissions. The account does not have WRITER permissions, but it's just reading parquet files:
spark_session.read.parquet(input_path)
18/12/25 13:12:00 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Repairing batch of 1 missing directories.
18/12/25 13:12:01 ERROR com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Failed to repair some missing directories.
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
{
"code" : 403,
"errors" : [ {
"domain" : "global",
"message" : "***.gserviceaccount.com does not have storage.objects.create access …
Run Code Online (Sandbox Code Playgroud) google-cloud-storage google-cloud-platform apache-spark-sql pyspark airflow
假设我有一个数据框,
a b
0 1 2
1 2 3
2 4 2
3 4 3
Run Code Online (Sandbox Code Playgroud)
我想过滤数据框,以便得到如下结果:
a b
0 1 2
3 4 3
Run Code Online (Sandbox Code Playgroud)
即,我想要组合(1,2)
并(4,3)
通过将两列过滤在一起。
如果我尝试这个
df1 = df[df['a'].isin([1,4]) & df['b'].isin([2,3])]
Run Code Online (Sandbox Code Playgroud)
我得到了整个数据框,因为(1,3)
和的组合(4,2)
也包含在上述方法中。但是我只需要给定的组合。我有两列元组的庞大列表,我想根据它们考虑相应的元组组合来过滤数据框。
另外,我不想将两列合并为一个字符串,然后进行过滤。
def main(inputs, output):
sdf = spark.read.csv(inputs, schema=observation_schema)
sdf.registerTempTable('filtertable')
result = spark.sql("""
SELECT * FROM filtertable WHERE qflag IS NULL
""").show()
temp_max = spark.sql(""" SELECT date, station, value FROM filtertable WHERE (observation = 'TMAX')""").show()
temp_min = spark.sql(""" SELECT date, station, value FROM filtertable WHERE (observation = 'TMIN')""").show()
result = temp_max.join(temp_min, condition1).select(temp_max('date'), temp_max('station'), ((temp_max('TMAX')-temp_min('TMIN'))/10)).alias('Range'))
Run Code Online (Sandbox Code Playgroud)
错误:
Traceback (most recent call last):
File "/Users/syedikram/Documents/temp_range_sql.py", line 96, in <module>
main(inputs, output)
File "/Users/syedikram/Documents/temp_range_sql.py", line 52, in main
result = temp_max.join(temp_min, condition1).select(temp_max('date'), temp_max('station'), ((temp_max('TMAX')-temp_min('TMIN')/10)).alias('Range'))
AttributeError: 'NoneType' …
Run Code Online (Sandbox Code Playgroud) 我是全新的pyspark
,想将我现有的pandas
/python
代码转换为PySpark
.
我想对 my 进行子集化,dataframe
以便只'original_problem'
返回包含我在字段中查找的特定关键字的行。
下面是我在 PySpark 中尝试的 Python 代码:
def pilot_discrep(input_file):
df = input_file
searchfor = ['cat', 'dog', 'frog', 'fleece']
df = df[df['original_problem'].str.contains('|'.join(searchfor))]
return df
Run Code Online (Sandbox Code Playgroud)
当我尝试运行上述程序时,出现以下错误:
AnalysisException: u"无法从 original_problem#207 中提取值:需要结构类型但得到字符串;"
我正在研究一种机器学习形状模型1,456,354 X 53
.我想为我的数据集做功能选择.我知道如何python
使用以下代码进行功能选择.
from sklearn.feature_selection import RFECV,RFE
logreg = LogisticRegression()
rfe = RFE(logreg, step=1, n_features_to_select=28)
rfe = rfe.fit(df.values,arrythmia.values)
features_bool = np.array(rfe.support_)
features = np.array(df.columns)
result = features[features_bool]
print(result)
Run Code Online (Sandbox Code Playgroud)
但是,我找不到任何可以显示如何执行递归特征选择的文章pyspark
.
我试图sklearn
在pyspark中导入库,但是它找不到错误的sklearn模块.我正在google dataproc集群上运行pyspark.
可以请有人帮助我在pyspark实现这个目标
python machine-learning feature-selection pyspark google-cloud-dataproc