Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?
这是我的测试代码(在Databricks社区上运行):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …Run Code Online (Sandbox Code Playgroud) performance user-defined-functions apache-spark apache-spark-sql pyspark
在pyspark 1.6.2中,我可以导入col函数
from pyspark.sql.functions import col
Run Code Online (Sandbox Code Playgroud)
但是当我尝试在Github源代码中查找它时,我发现文件中没有col函数,functions.pypython如何导入一个不存在的函数?
我有一个很大的pyspark.sql.dataframe.DataFrame,我想保留(所以filter)列中保存的URL location包含预定字符串的所有行,例如'google.com'.
我试过了,df.filter(sf.col('location').contains('google.com')
但这会抛出一个
TypeError: _TypeError: 'Column' object is not callable'
Run Code Online (Sandbox Code Playgroud)
我该如何绕过并正确过滤我的df?提前谢谢了!
我正在使用Spark 1.3,并希望使用python接口(SparkSQL)加入多个列
以下作品:
我首先将它们注册为临时表.
numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")
test = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
Run Code Online (Sandbox Code Playgroud)
我现在想基于多个列加入它们.
我得到SyntaxError:语法无效:
test = numeric.join(Ref,
numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
numeric.STATUS == Ref.STATUS , joinType='inner')
Run Code Online (Sandbox Code Playgroud) 我当前的Java /星火单元测试方法效果(详细点击这里)通过使用JUnit"本地"和运行单元测试实例化一个SparkContext.
必须组织代码以在一个函数中执行I/O,然后使用多个RDD调用另一个函数.
这非常有效.我有一个用Java + Spark编写的高度测试的数据转换.
我可以用Python做同样的事吗?
我如何用Python运行Spark单元测试?
在Pyspark中,我可以从列表中创建RDD并确定要有多少分区:
sc = SparkContext()
sc.parallelize(xrange(0, 10), 4)
Run Code Online (Sandbox Code Playgroud)
我决定对RDD进行分区的分区数量如何影响性能?这取决于我的机器核心数量如何?
我在Spark中有一个数据框,其中一个列包含一个数组.现在,我编写了一个单独的UDF,它将数组转换为另一个只有不同值的数组.见下面的例子:
例:[24,23,27,23]应转换为[24,23,27 ] 代码:
def uniq_array(col_array):
x = np.unique(col_array)
return x
uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))
Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array))
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,Df2.age_array是我在其上应用UDF以获取不同列的数组,该列"age_array_unique"应该只包含数组中的唯一值.
但是,只要我运行该命令Df3.show(),我就会收到错误:
net.razorvine.pickle.PickleException:构造ClassDict的预期零参数(对于numpy.core.multiarray._reconstruct)
任何人都可以让我知道为什么会这样吗?
谢谢!
arrays user-defined-functions apache-spark apache-spark-sql pyspark
我想使用下面的代码转换spark数据框:
from pyspark.mllib.clustering import KMeans
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)
详细的错误消息是:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-a19a1763d3ac> in <module>()
1 from pyspark.mllib.clustering import KMeans
2 spark_df = sqlContext.createDataFrame(pandas_df)
----> 3 rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
4 model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
842 if name not in self.columns:
843 raise AttributeError(
--> 844 "'%s' object …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark spark-dataframe apache-spark-mllib
我试图覆盖spark会话/ spark上下文默认配置,但它正在挑选整个节点/群集资源.
spark = SparkSession.builder
.master("ip")
.enableHiveSupport()
.getOrCreate()
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext
Run Code Online (Sandbox Code Playgroud)
当我将配置放入spark提交时,它工作正常
spark-submit --master ip --executor-cores=3 --diver 10G code.py
Run Code Online (Sandbox Code Playgroud) 我试图在pySpark中的一行代码中进行多个操作,并且不确定这是否适用于我的情况.
我的意图是不必将输出保存为新的数据帧.
我目前的代码很简单:
encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
.groupBy('timePeriod')
.agg(
mean('DOWNSTREAM_SIZE').alias("Mean"),
stddev('DOWNSTREAM_SIZE').alias("Stddev")
)
.show(20, False)
Run Code Online (Sandbox Code Playgroud)
我的目的是count()在使用之后添加groupBy,以获得与timePeriod列的每个值匹配的记录计数,打印\显示为输出.
在尝试使用时,groupBy(..).count().agg(..)我得到例外.
是否有任何方法可以实现这两个count()和agg() .show()打印,而无需将代码拆分为两行命令,例如:
new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
Run Code Online (Sandbox Code Playgroud)
或者更好的是,将合并的输出输出到agg.show()输出 - 一个额外的列,它表示与行的值匹配的计数记录数.例如:
timePeriod | Mean | Stddev | Num Of Records
X | 10 | 20 | 315
Run Code Online (Sandbox Code Playgroud) apache-spark ×10
pyspark ×10
python ×6
performance ×2
arrays ×1
java ×1
join ×1
pyspark-sql ×1
rdd ×1
scala ×1
unit-testing ×1