sim*_*nst 9 arrays null pyspark
通过 ID 连接后,我的数据框如下所示:
ID | Features | Vector
1 | (50,[...] | Array[1.1,2.3,...]
2 | (50,[...] | Null
Run Code Online (Sandbox Code Playgroud)
我最终得到了“向量”列中某些 ID 的空值。我想用 300 维的零数组替换这些 Null 值(与非空向量条目的格式相同)。df.fillna 在这里不起作用,因为它是我想插入的数组。知道如何在 PySpark 中实现这一点吗?
- -编辑 - -
与这篇文章类似,我目前的方法是:
df_joined = id_feat_vec.join(new_vec_df, "id", how="left_outer")
fill_with_vector = udf(lambda x: x if x is not None else np.zeros(300),
ArrayType(DoubleType()))
df_new = df_joined.withColumn("vector", fill_with_vector("vector"))
Run Code Online (Sandbox Code Playgroud)
不幸的是,收效甚微:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0in stage 848.0 failed 4 times, most recent failure: Lost task 0.3 in stage 848.0 (TID 692199, 10.179.224.107, executor 16): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-193-e55fed27fcd8> in <module>()
5 a = df_joined.withColumn("vector", fill_with_vector("vector"))
6
----> 7 a.show()
/databricks/spark/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
Run Code Online (Sandbox Code Playgroud)
更新:我无法使用 SQL 表达式形式来创建双精度数组。'array(0.0, ...)' 似乎创建了一个 Decimal 类型的数组。但是,使用 python 函数,你可以让它正确地创建一个双精度数组。
一般的想法是使用 when/otherwise 函数有选择地只更新你想要的行。您可以提前将想要的文字值定义为一列,然后将其转储到“THEN”子句中。
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([StructField("f1", LongType()), StructField("f2", ArrayType(DoubleType(), False))])
data = [(1, [10.0, 11.0]), (2, None), (3, None)]
df = sqlContext.createDataFrame(sc.parallelize(data), schema)
# Create a column object storing the value you want in the NULL case
num_elements = 300
null_value = array([lit(0.0)] * num_elements)
# If you want a different type you can change it like this
# null_value = null_value.cast('array<float>')
# Keep the value when there is one, replace it when it's null
df2 = df.withColumn('f2', when(df['f2'].isNull(), null_value).otherwise(df['f2']))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6522 次 |
最近记录: |