我在尝试根据要过滤的两列项列表从数据框中删除行时遇到了麻烦.例如,对于此数据框:
df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
+------+------+---+
|number|letter| id|
+------+------+---+
| 100| A|304|
| 200| B|305|
| 300| C|306|
+------+------+---+
Run Code Online (Sandbox Code Playgroud)
我可以使用isin一列轻松删除行:
df.where(~col('number').isin([100, 200])).show()
+------+------+---+
|number|letter| id|
+------+------+---+
| 300| C|306|
+------+------+---+
Run Code Online (Sandbox Code Playgroud)
但是当我尝试将它们删除两列时,我得到一个例外:
df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions.lit(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) …Run Code Online (Sandbox Code Playgroud) 我有一个 PySpark 作业,可以在小型集群上成功运行,但在启动时的前几分钟内开始出现许多以下错误。我知道如何解决它吗?这是 PySpark 2.2.0 和 mesos 的情况。
17/09/29 18:54:26 INFO Executor: Running task 5717.0 in stage 0.0 (TID 5717)
17/09/29 18:54:26 INFO CoarseGrainedExecutorBackend: Got assigned task 5813
17/09/29 18:54:26 INFO Executor: Running task 5813.0 in stage 0.0 (TID 5813)
17/09/29 18:54:26 INFO CoarseGrainedExecutorBackend: Got assigned task 5909
17/09/29 18:54:26 INFO Executor: Running task 5909.0 in stage 0.0 (TID 5909)
17/09/29 18:54:56 ERROR TransportClientFactory: Exception while bootstrapping client after 30001 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at …Run Code Online (Sandbox Code Playgroud) 在比较 pyspark 3.2.1 中的两个 API 时,我得到了奇怪的性能结果,这两个 API 提供了在 Spark Dataframe 的分组结果上运行 pandas UDF 的能力:
首先,我在本地 Spark 模式(Spark 3.2.1)下运行以下输入生成器代码:
import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", True) \
.getOrCreate()
ps.set_option("compute.default_index_type", "distributed")
spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
.write.parquet('/tmp/sample_input', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
然后我测试applyInPandas:
def getsum(pdf):
pdf['sum_in_group'] = pdf['id'].sum()
return pdf
df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, …Run Code Online (Sandbox Code Playgroud) 我想在 Pyspark 上运行随机森林算法。Pyspark 文档中提到VectorAssembler 仅接受数字或布尔数据类型。因此,如果我的数据包含 Stringtype 变量,例如城市名称,我是否应该对它们进行 one-hot 编码,以便进一步进行随机森林分类/回归?
这是我一直在尝试的代码,输入文件在这里:
train=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('filename')
drop_list = ["Country", "Carrier", "TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))#only this variable is actually double, rest of them are strings
junk = train.select([column for column in train.columns if column in drop_list])
transformed = assembler.transform(junk)
Run Code Online (Sandbox Code Playgroud)
我不断收到错误消息IllegalArgumentException: u'Data type StringType is not supported.'
PS:抱歉问了一个基本问题。我来自 R 背景。在R中,当我们进行随机森林时,不需要将分类变量转换为数值变量。