小编Mar*_*usz的帖子

无法在spark/pyspark中创建数组文字

我在尝试根据要过滤的两列项列表从数据框中删除行时遇到了麻烦.例如,对于此数据框:

df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
df.show()
+------+------+---+
|number|letter| id|
+------+------+---+
|   100|     A|304|
|   200|     B|305|
|   300|     C|306|
+------+------+---+
Run Code Online (Sandbox Code Playgroud)

我可以使用isin一列轻松删除行:

df.where(~col('number').isin([100, 200])).show()
+------+------+---+
|number|letter| id|
+------+------+---+
|   300|     C|306|
+------+------+---+
Run Code Online (Sandbox Code Playgroud)

但是当我尝试将它们删除两列时,我得到一个例外:

df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
    at org.apache.spark.sql.functions$.lit(functions.scala:101)
    at org.apache.spark.sql.functions.lit(functions.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

7
推荐指数
1
解决办法
9605
查看次数

错误 TransportClientFactory:引导客户端时出现异常

我有一个 PySpark 作业,可以在小型集群上成功运行,但在启动时的前几分钟内开始出现许多以下错误。我知道如何解决它吗?这是 PySpark 2.2.0 和 mesos 的情况。

17/09/29 18:54:26 INFO Executor: Running task 5717.0 in stage 0.0 (TID 5717)
17/09/29 18:54:26 INFO CoarseGrainedExecutorBackend: Got assigned task 5813
17/09/29 18:54:26 INFO Executor: Running task 5813.0 in stage 0.0 (TID 5813)
17/09/29 18:54:26 INFO CoarseGrainedExecutorBackend: Got assigned task 5909
17/09/29 18:54:26 INFO Executor: Running task 5909.0 in stage 0.0 (TID 5909)
17/09/29 18:54:56 ERROR TransportClientFactory: Exception while bootstrapping client after 30001 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
        at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

6
推荐指数
0
解决办法
1426
查看次数

为什么 Pandas-API-on-Spark 在组上的应用比 pyspark API 慢得多?

在比较 pyspark 3.2.1 中的两个 API 时,我得到了奇怪的性能结果,这两个 API 提供了在 Spark Dataframe 的分组结果上运行 pandas UDF 的能力:

首先,我在本地 Spark 模式(Spark 3.2.1)下运行以下输入生成器代码:

import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps

spark = SparkSession.builder \
    .config("spark.sql.execution.arrow.pyspark.enabled", True) \
    .getOrCreate()

ps.set_option("compute.default_index_type", "distributed")

spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
    .write.parquet('/tmp/sample_input', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

然后我测试applyInPandas

def getsum(pdf):
    pdf['sum_in_group'] = pdf['id'].sum()
    return pdf

df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
    df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-koalas

6
推荐指数
0
解决办法
2155
查看次数

如何在 Pyspark 的 VectorAssembler 中使用字符串变量

我想在 Pyspark 上运行随机森林算法。Pyspark 文档中提到VectorAssembler 仅接受数字或布尔数据类型。因此,如果我的数据包含 Stringtype 变量,例如城市名称,我是否应该对它们进行 one-hot 编码,以便进一步进行随机森林分类/回归?

这是我一直在尝试的代码,输入文件在这里

train=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('filename')
drop_list = ["Country", "Carrier", "TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))#only this variable is actually double, rest of them are strings
junk = train.select([column for column in train.columns if column in drop_list])
transformed = assembler.transform(junk)
Run Code Online (Sandbox Code Playgroud)

我不断收到错误消息IllegalArgumentException: u'Data type StringType is not supported.'

PS:抱歉问了一个基本问题。我来自 R 背景。在R中,当我们进行随机森林时,不需要将分类变量转换为数值变量。

random-forest pyspark

4
推荐指数
1
解决办法
6252
查看次数