我有一个Spark DataFrame查询,保证返回单个Int值的单列.从生成的DataFrame中将此值作为Int提取的最佳方法是什么?
这个问题并不新鲜,但我在Spark中发现了令人惊讶的行为.我需要向DataFrame添加一列行ID.我使用了DataFrame方法monotonically_increasing_id(),它确实给了我一个额外的uniques行ID(顺便说一句,它们不是连续的,但是是唯一的).
我遇到的问题是,当我过滤DataFrame时,重新分配生成的DataFrame中的行ID.两个DataFrame如下所示.
第一个是添加了行ID的初始DataFrame,如下所示:
df.withColumn("rowId", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)第二个DataFrame是在col P via上过滤后获得的数据帧df.filter(col("P")).
问题由custId 169的rowId说明,在初始DataFrame中为5,但在过滤后,当custId 169被过滤掉时,rowId(5)被重新分配给custmId 773!我不知道为什么这是默认行为.
我希望rowIds它"粘"; 如果我从DataFrame中删除行,我不希望他们的ID"重新使用",我希望它们与行一起消失.有可能吗?我没有看到任何标志从monotonically_increasing_id方法请求此行为.
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| …Run Code Online (Sandbox Code Playgroud) 我想知道是否有某种方法可以为多列上的spark数据帧指定自定义聚合函数.
我有一个类似这样的表(名称,项目,价格):
john | tomato | 1.99
john | carrot | 0.45
bill | apple | 0.99
john | banana | 1.29
bill | taco | 2.59
Run Code Online (Sandbox Code Playgroud)
至:
我想将每个人的项目和成本汇总到这样的列表中:
john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
Run Code Online (Sandbox Code Playgroud)
这在数据帧中是否可行?我最近了解到collect_list它,但它似乎只适用于一个专栏.
Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?
这是我的测试代码(在Databricks社区上运行):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …Run Code Online (Sandbox Code Playgroud) performance user-defined-functions apache-spark apache-spark-sql pyspark
在pyspark 1.6.2中,我可以导入col函数
from pyspark.sql.functions import col
Run Code Online (Sandbox Code Playgroud)
但是当我尝试在Github源代码中查找它时,我发现文件中没有col函数,functions.pypython如何导入一个不存在的函数?
我有一个很大的pyspark.sql.dataframe.DataFrame,我想保留(所以filter)列中保存的URL location包含预定字符串的所有行,例如'google.com'.
我试过了,df.filter(sf.col('location').contains('google.com')
但这会抛出一个
TypeError: _TypeError: 'Column' object is not callable'
Run Code Online (Sandbox Code Playgroud)
我该如何绕过并正确过滤我的df?提前谢谢了!
...通过检查列的值是否在a中seq.
也许我没有解释得很好,我基本上希望这(使用常规的SQL表达出来)DF_Column IN seq?
首先,我使用a broadcast var(我放置seq),UDF(检查完成)和registerTempTable.
问题是,我没有测试它,因为我遇到了一个已知的bug,显然只有使用时,会出现registerTempTable与ScalaIDE.
我最终创建了一个新DataFrame的seq并与它进行内连接(交集),但我怀疑这是完成任务的最高效的方式.
谢谢
编辑:(回应@YijieShen):
如何filter根据一个DataFrame列的元素是否在另一个DF的列(如SQL select * from A where login in (select username from B))中?
例如:第一个DF:
login count
login1 192
login2 146
login3 72
Run Code Online (Sandbox Code Playgroud)
第二个DF:
username
login2
login3
login4
Run Code Online (Sandbox Code Playgroud)
结果:
login count
login2 146
login3 72
Run Code Online (Sandbox Code Playgroud)
尝试:
EDIT-2:我认为,现在修复了这个bug,这些应该可行.结束编辑-2
ordered.select("login").filter($"login".contains(empLogins("username")))
Run Code Online (Sandbox Code Playgroud)
和
ordered.select("login").filter($"login" in empLogins("username"))
Run Code Online (Sandbox Code Playgroud)
两者Exception …
在标准SQL中,当您将表连接到自身时,可以为表创建别名以跟踪您引用的列:
SELECT a.column_name, b.column_name...
FROM table1 a, table1 b
WHERE a.common_field = b.common_field;
Run Code Online (Sandbox Code Playgroud)
我可以通过两种方式来使用Spark DataFrameAPI 实现相同的功能:
解决方案#1:重命名列
在回答这个问题时,有几种不同的方法可以解决这个问题.这个只是重命名具有特定后缀的所有列:
df.toDF(df.columns.map(_ + "_R"):_*)
Run Code Online (Sandbox Code Playgroud)
例如,您可以这样做:
df.join(df.toDF(df.columns.map(_ + "_R"):_*), $"common_field" === $"common_field_R")
Run Code Online (Sandbox Code Playgroud)
解决方案#2:将引用复制到 DataFrame
另一个简单的解决方案就是这样做:
val df: DataFrame = ....
val df_right = df
df.join(df_right, df("common_field") === df_right("common_field"))
Run Code Online (Sandbox Code Playgroud)
这两种解决方案都有效,我可以看到每种解决方案在某些情况下都很有用.我应该注意两者之间是否存在内部差异?
我有以下Scala值:
val values: List[Iterable[Any]] = Traces().evaluate(features).toList
Run Code Online (Sandbox Code Playgroud)
我想将其转换为DataFrame.
当我尝试以下内容时:
sqlContext.createDataFrame(values)
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误:
error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$1: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (List[Iterable[Any]])
sqlContext.createDataFrame(values)
Run Code Online (Sandbox Code Playgroud)
为什么?
我在Spark中有一个数据框,其中一个列包含一个数组.现在,我编写了一个单独的UDF,它将数组转换为另一个只有不同值的数组.见下面的例子:
例:[24,23,27,23]应转换为[24,23,27 ] 代码:
def uniq_array(col_array):
x = np.unique(col_array)
return x
uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))
Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array))
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,Df2.age_array是我在其上应用UDF以获取不同列的数组,该列"age_array_unique"应该只包含数组中的唯一值.
但是,只要我运行该命令Df3.show(),我就会收到错误:
net.razorvine.pickle.PickleException:构造ClassDict的预期零参数(对于numpy.core.multiarray._reconstruct)
任何人都可以让我知道为什么会这样吗?
谢谢!
arrays user-defined-functions apache-spark apache-spark-sql pyspark
apache-spark ×10
apache-spark-sql ×10
pyspark ×4
dataframe ×3
scala ×3
python ×2
arrays ×1
orc ×1
performance ×1
pyspark-sql ×1