标签: pyspark

Spark功能与UDF性能有关?

Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?

这是我的测试代码(在Databricks社区上运行):

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …
Run Code Online (Sandbox Code Playgroud)

performance user-defined-functions apache-spark apache-spark-sql pyspark

32
推荐指数
2
解决办法
2万
查看次数

在pyspark中找不到col函数

在pyspark 1.6.2中,我可以导入col函数

from pyspark.sql.functions import col
Run Code Online (Sandbox Code Playgroud)

但是当我尝试在Github源代码中查找它时,我发现文件中没有col函数,functions.pypython如何导入一个不存在的函数?

python apache-spark apache-spark-sql pyspark pyspark-sql

32
推荐指数
5
解决办法
3万
查看次数

当值匹配pyspark中字符串的一部分时,过滤df

我有一个很大的pyspark.sql.dataframe.DataFrame,我想保留(所以filter)列中保存的URL location包含预定字符串的所有行,例如'google.com'.

我试过了,df.filter(sf.col('location').contains('google.com') 但这会抛出一个

TypeError: _TypeError: 'Column' object is not callable'
Run Code Online (Sandbox Code Playgroud)

我该如何绕过并正确过滤我的df?提前谢谢了!

python apache-spark apache-spark-sql pyspark

32
推荐指数
3
解决办法
5万
查看次数

如何在Pyspark中加入多个列?

我正在使用Spark 1.3,并希望使用python接口(SparkSQL)加入多个列

以下作品:

我首先将它们注册为临时表.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
Run Code Online (Sandbox Code Playgroud)

我现在想基于多个列加入它们.

我得到SyntaxError:语法无效:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')
Run Code Online (Sandbox Code Playgroud)

python join apache-spark apache-spark-sql pyspark

31
推荐指数
3
解决办法
5万
查看次数

我如何对PySpark程序进行单元测试?

我当前的Java /星火单元测试方法效果(详细点击这里)通过使用JUnit"本地"和运行单元测试实例化一个SparkContext.

必须组织代码以在一个函数中执行I/O,然后使用多个RDD调用另一个函数.

这非常有效.我有一个用Java + Spark编写的高度测试的数据转换.

我可以用Python做同样的事吗?

我如何用Python运行Spark单元测试?

python unit-testing apache-spark pyspark

31
推荐指数
6
解决办法
2万
查看次数

RDD中的分区数和Spark中的性能

在Pyspark中,我可以从列表中创建RDD并确定要有多少分区:

sc = SparkContext()
sc.parallelize(xrange(0, 10), 4)
Run Code Online (Sandbox Code Playgroud)

我决定对RDD进行分区的分区数量如何影响性能?这取决于我的机器核心数量如何?

performance apache-spark rdd pyspark

31
推荐指数
2
解决办法
3万
查看次数

Spark Error:构造ClassDict的预期零参数(对于numpy.core.multiarray._reconstruct)

我在Spark中有一个数据框,其中一个列包含一个数组.现在,我编写了一个单独的UDF,它将数组转换为另一个只有不同值的数组.见下面的例子:

例:[24,23,27,23]应转换为[24,23,27 ] 代码:

def uniq_array(col_array):
    x = np.unique(col_array)
    return x
uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))

Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array))
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,Df2.age_array是我在其上应用UDF以获取不同列的数组,该列"age_array_unique"应该只包含数组中的唯一值.

但是,只要我运行该命令Df3.show(),我就会收到错误:

net.razorvine.pickle.PickleException:构造ClassDict的预期零参数(对于numpy.core.multiarray._reconstruct)

任何人都可以让我知道为什么会这样吗?

谢谢!

arrays user-defined-functions apache-spark apache-spark-sql pyspark

31
推荐指数
3
解决办法
2万
查看次数

AttributeError:'DataFrame'对象没有属性'map'

我想使用下面的代码转换spark数据框:

from pyspark.mllib.clustering import KMeans
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)

详细的错误消息是:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-11-a19a1763d3ac> in <module>()
      1 from pyspark.mllib.clustering import KMeans
      2 spark_df = sqlContext.createDataFrame(pandas_df)
----> 3 rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
      4 model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
    842         if name not in self.columns:
    843             raise AttributeError(
--> 844                 "'%s' object …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe apache-spark-mllib

31
推荐指数
1
解决办法
5万
查看次数

spark 2.1.0 session config settings(pyspark)

我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.

 spark  = SparkSession.builder
                      .master("ip")
                      .enableHiveSupport()
                      .getOrCreate()

 spark.conf.set("spark.executor.memory", '8g')
 spark.conf.set('spark.executor.cores', '3')
 spark.conf.set('spark.cores.max', '3')
 spark.conf.set("spark.driver.memory",'8g')
 sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)

当我将配置放入spark提交时,它工作正常

spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

31
推荐指数
2
解决办法
4万
查看次数

聚合函数计算Spark中groupBy的使用情况

我试图在pySpark中的一行代码中进行多个操作,并且不确定这是否适用于我的情况.

我的意图是不必将输出保存为新的数据帧.

我目前的代码很简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)
Run Code Online (Sandbox Code Playgroud)

我的目的是count()在使用之后添加groupBy,以获得与timePeriod列的每个值匹配的记录计数,打印\显示为输出.

在尝试使用时,groupBy(..).count().agg(..)我得到例外.

是否有任何方法可以实现这两个count()agg() .show()打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
Run Code Online (Sandbox Code Playgroud)

或者更好的是,将合并的输出输出到agg.show()输出 - 一个额外的列,它表示与行的值匹配的计数记录数.例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark apache-spark-sql pyspark

30
推荐指数
1
解决办法
5万
查看次数