小编use*_*411的帖子

尝试将数据帧行映射到更新行时出现编码器错误

当我试图在我的代码中做同样的事情,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)

我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误

无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.

注意:我正在使用spark 2.0!

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

33
推荐指数
2
解决办法
3万
查看次数

如何在Airflow中运行Spark代码?

你好地球人!我正在使用Airflow来安排和运行Spark任务.我此时发现的只是Airflow可以管理的python DAG.
DAG示例:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , …
Run Code Online (Sandbox Code Playgroud)

python java directed-acyclic-graphs apache-spark airflow

33
推荐指数
3
解决办法
2万
查看次数

使用Spark中的自定义函数聚合多个列

我想知道是否有某种方法可以为多列上的spark数据帧指定自定义聚合函数.

我有一个类似这样的表(名称,项目,价格):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59
Run Code Online (Sandbox Code Playgroud)

至:

我想将每个人的项目和成本汇总到这样的列表中:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
Run Code Online (Sandbox Code Playgroud)

这在数据帧中是否可行?我最近了解到collect_list它,但它似乎只适用于一个专栏.

scala dataframe apache-spark apache-spark-sql orc

32
推荐指数
4
解决办法
2万
查看次数

Spark功能与UDF性能有关?

Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?

这是我的测试代码(在Databricks社区上运行):

# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in xrange(times):
        yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …
Run Code Online (Sandbox Code Playgroud)

performance user-defined-functions apache-spark apache-spark-sql pyspark

32
推荐指数
2
解决办法
2万
查看次数

如何将Vector拆分为列 - 使用PySpark

上下文:我有DataFrame2列:单词和向量.其中"vector"的列类型是VectorUDT.

一个例子:

word    |  vector
assert  | [435,323,324,212...]
Run Code Online (Sandbox Code Playgroud)

我希望得到这个:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Run Code Online (Sandbox Code Playgroud)

题:

如何使用PySpark为每个维度拆分包含多列向量的列?

提前致谢

python apache-spark apache-spark-sql pyspark apache-spark-ml

32
推荐指数
1
解决办法
2万
查看次数

AWS S3存储桶策略编辑器访问被拒绝

我使用root帐户登录,尝试使用内置设置静态s3网站的说明来公开访问存储桶.但是,在运行存储桶策略时,我收到了拒绝访问的消息.

消息没有更详细的信息.

错误

amazon-s3 bucket amazon-web-services

31
推荐指数
1
解决办法
5779
查看次数

Spark sql如何在不丢失空值的情况下爆炸

我有一个Dataframe,我试图压扁.作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行.例如,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
Run Code Online (Sandbox Code Playgroud)

应该成为

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
Run Code Online (Sandbox Code Playgroud)

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}
Run Code Online (Sandbox Code Playgroud)

问题是在我的数据中,一些数组列有空值.在这种情况下,整个行都将被删除.所以这个数据帧:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null
Run Code Online (Sandbox Code Playgroud)

变 …

java null apache-spark apache-spark-sql

30
推荐指数
2
解决办法
1万
查看次数

解决Apache Spark中的依赖性问题

构建和部署Spark应用程序时的常见问题是:

  • java.lang.ClassNotFoundException.
  • object x is not a member of package y 编译错误.
  • java.lang.NoSuchMethodError

如何解决这些问题?

java scala nosuchmethoderror classnotfoundexception apache-spark

30
推荐指数
2
解决办法
1万
查看次数

PySpark:when子句中的多个条件

我想修改数据帧列(Age)的单元格值,其中当前它是空白的,我只会在另一列(Survived)的值为0时为相应的行进行修改,其中Age为空白.如果它在Survived列中为1但在Age列中为空,那么我将它保持为null.

我试图使用&&运算符,但它没有用.这是我的代码:

tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
Run Code Online (Sandbox Code Playgroud)

任何建议如何处理?谢谢.

错误信息:

SyntaxError: invalid syntax
  File "<ipython-input-33-3e691784411c>", line 1
    tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
                                                    ^
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
2
解决办法
8万
查看次数

DataFrame/Dataset groupBy行为/优化

假设我们有DataFrame,df包含以下列:

名称,姓氏,大小,宽度,长度,重量

现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.

val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)

正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:

val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)

在分组之前以某种方式影响性能?

performance dataframe apache-spark apache-spark-sql apache-spark-dataset

28
推荐指数
1
解决办法
1万
查看次数