标签: apache-spark-sql

Spark - 从DataFrame中提取单个值

我有一个Spark DataFrame查询,保证返回单个Int值的单列.从生成的DataFrame中将此值作为Int提取的最佳方法是什么?

scala apache-spark apache-spark-sql

32
推荐指数
3
解决办法
4万
查看次数

如何向Spark DataFrame添加持久的行ID列?

这个问题并不新鲜,但我在Spark中发现了令人惊讶的行为.我需要向DataFrame添加一列行ID.我使用了DataFrame方法monotonically_increasing_id(),它确实给了我一个额外的uniques行ID(顺便说一句,它们不是连续的,但是是唯一的).

我遇到的问题是,当我过滤DataFrame时,重新分配生成的DataFrame中的行ID.两个DataFrame如下所示.

  • 第一个是添加了行ID的初始DataFrame,如下所示:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
    Run Code Online (Sandbox Code Playgroud)
  • 第二个DataFrame是在col P via上过滤后获得的数据帧df.filter(col("P")).

问题由custId 169的rowId说明,在初始DataFrame中为5,但在过滤后,当custId 169被过滤掉时,rowId(5)被重新分配给custmId 773!我不知道为什么这是默认行为.

我希望rowIds它"粘"; 如果我从DataFrame中删除行,我不希望他们的ID"重新使用",我希望它们与行一起消失.有可能吗?我没有看到任何标志从monotonically_increasing_id方法请求此行为.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
| …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql

32
推荐指数
1
解决办法
3万
查看次数

使用Spark中的自定义函数聚合多个列

我想知道是否有某种方法可以为多列上的spark数据帧指定自定义聚合函数.

我有一个类似这样的表(名称,项目,价格):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59
Run Code Online (Sandbox Code Playgroud)

至:

我想将每个人的项目和成本汇总到这样的列表中:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
Run Code Online (Sandbox Code Playgroud)

这在数据帧中是否可行?我最近了解到collect_list它,但它似乎只适用于一个专栏.

scala dataframe apache-spark apache-spark-sql orc

32
推荐指数
4
解决办法
2万
查看次数

Spark功能与UDF性能有关?

Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?

这是我的测试代码(在Databricks社区上运行):

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …
Run Code Online (Sandbox Code Playgroud)

performance user-defined-functions apache-spark apache-spark-sql pyspark

32
推荐指数
2
解决办法
2万
查看次数

在pyspark中找不到col函数

在pyspark 1.6.2中,我可以导入col函数

from pyspark.sql.functions import col
Run Code Online (Sandbox Code Playgroud)

但是当我尝试在Github源代码中查找它时,我发现文件中没有col函数,functions.pypython如何导入一个不存在的函数?

python apache-spark apache-spark-sql pyspark pyspark-sql

32
推荐指数
5
解决办法
3万
查看次数

当值匹配pyspark中字符串的一部分时,过滤df

我有一个很大的pyspark.sql.dataframe.DataFrame,我想保留(所以filter)列中保存的URL location包含预定字符串的所有行,例如'google.com'.

我试过了,df.filter(sf.col('location').contains('google.com') 但这会抛出一个

TypeError: _TypeError: 'Column' object is not callable'
Run Code Online (Sandbox Code Playgroud)

我该如何绕过并正确过滤我的df?提前谢谢了!

python apache-spark apache-spark-sql pyspark

32
推荐指数
3
解决办法
5万
查看次数

过滤DataFrame最有效的方法是什么

...通过检查列的值是否在a中seq.
也许我没有解释得很好,我基本上希望这(使用常规的SQL表达出来)DF_Column IN seq

首先,我使用a broadcast var(我放置seq),UDF(检查完成)和registerTempTable.
问题是,我没有测试它,因为我遇到了一个已知的bug,显然只有使用时,会出现registerTempTableScalaIDE.

我最终创建了一个新DataFrameseq并与它进行内连接(交集),但我怀疑这是完成任务的最高效的方式.

谢谢

编辑:(回应@YijieShen):
如何filter根据一个DataFrame列的元素是否在另一个DF的列(如SQL select * from A where login in (select username from B))中?

例如:第一个DF:

login      count
login1     192  
login2     146  
login3     72   
Run Code Online (Sandbox Code Playgroud)

第二个DF:

username
login2
login3
login4
Run Code Online (Sandbox Code Playgroud)

结果:

login      count
login2     146  
login3     72   
Run Code Online (Sandbox Code Playgroud)

尝试:
EDIT-2:我认为,现在修复了这个bug,这些应该可行.结束编辑-2

ordered.select("login").filter($"login".contains(empLogins("username")))
Run Code Online (Sandbox Code Playgroud)

ordered.select("login").filter($"login" in empLogins("username"))
Run Code Online (Sandbox Code Playgroud)

两者Exception …

apache-spark apache-spark-sql

31
推荐指数
2
解决办法
3万
查看次数

在Spark中执行DataFrame自联接的最干净,最有效的语法

在标准SQL中,当您将表连接到自身时,可以为表创建别名以跟踪您引用的列:

SELECT a.column_name, b.column_name...
FROM table1 a, table1 b
WHERE a.common_field = b.common_field;
Run Code Online (Sandbox Code Playgroud)

我可以通过两种方式来使用Spark DataFrameAPI 实现相同的功能:

解决方案#1:重命名列

在回答这个问题时,有几种不同的方法可以解决这个问题.这个只是重命名具有特定后缀的所有列:

df.toDF(df.columns.map(_ + "_R"):_*)
Run Code Online (Sandbox Code Playgroud)

例如,您可以这样做:

df.join(df.toDF(df.columns.map(_ + "_R"):_*), $"common_field" === $"common_field_R")
Run Code Online (Sandbox Code Playgroud)

解决方案#2:将引用复制到 DataFrame

另一个简单的解决方案就是这样做:

val df: DataFrame = ....
val df_right = df

df.join(df_right, df("common_field") === df_right("common_field"))
Run Code Online (Sandbox Code Playgroud)

这两种解决方案都有效,我可以看到每种解决方案在某些情况下都很有用.我应该注意两者之间是否存在内部差异?

dataframe apache-spark apache-spark-sql

31
推荐指数
1
解决办法
2万
查看次数

如何从Scala的Iterables列表创建DataFrame?

我有以下Scala值:

val values: List[Iterable[Any]] = Traces().evaluate(features).toList
Run Code Online (Sandbox Code Playgroud)

我想将其转换为DataFrame.

当我尝试以下内容时:

sqlContext.createDataFrame(values)
Run Code Online (Sandbox Code Playgroud)

我收到了这个错误:

error: overloaded method value createDataFrame with alternatives:

[A <: Product](data: Seq[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame 
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$1: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (List[Iterable[Any]])
          sqlContext.createDataFrame(values)
Run Code Online (Sandbox Code Playgroud)

为什么?

scala apache-spark apache-spark-sql spark-dataframe

31
推荐指数
3
解决办法
8万
查看次数

Spark Error:构造ClassDict的预期零参数(对于numpy.core.multiarray._reconstruct)

我在Spark中有一个数据框,其中一个列包含一个数组.现在,我编写了一个单独的UDF,它将数组转换为另一个只有不同值的数组.见下面的例子:

例:[24,23,27,23]应转换为[24,23,27 ] 代码:

def uniq_array(col_array):
    x = np.unique(col_array)
    return x
uniq_array_udf = udf(uniq_array,ArrayType(IntegerType()))

Df3 = Df2.withColumn("age_array_unique",uniq_array_udf(Df2.age_array))
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,Df2.age_array是我在其上应用UDF以获取不同列的数组,该列"age_array_unique"应该只包含数组中的唯一值.

但是,只要我运行该命令Df3.show(),我就会收到错误:

net.razorvine.pickle.PickleException:构造ClassDict的预期零参数(对于numpy.core.multiarray._reconstruct)

任何人都可以让我知道为什么会这样吗?

谢谢!

arrays user-defined-functions apache-spark apache-spark-sql pyspark

31
推荐指数
3
解决办法
2万
查看次数