当我试图在我的代码中做同样的事情,如下所述
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)
我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误
无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.
注意:我正在使用spark 2.0!
scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
你好地球人!我正在使用Airflow来安排和运行Spark任务.我此时发现的只是Airflow可以管理的python DAG.
DAG示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, …
Run Code Online (Sandbox Code Playgroud) 我想知道是否有某种方法可以为多列上的spark数据帧指定自定义聚合函数.
我有一个类似这样的表(名称,项目,价格):
john | tomato | 1.99
john | carrot | 0.45
bill | apple | 0.99
john | banana | 1.29
bill | taco | 2.59
Run Code Online (Sandbox Code Playgroud)
至:
我想将每个人的项目和成本汇总到这样的列表中:
john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
Run Code Online (Sandbox Code Playgroud)
这在数据帧中是否可行?我最近了解到collect_list
它,但它似乎只适用于一个专栏.
Spark现在提供可在数据帧中使用的预定义函数,并且它们似乎已经过高度优化.我最初的问题是更快,但我自己做了一些测试,发现至少在一个实例中,spark函数的速度提高了大约10倍.有谁知道为什么会这样,什么时候udf会更快(仅适用于存在相同spark函数的情况)?
这是我的测试代码(在Databricks社区上运行):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print …
Run Code Online (Sandbox Code Playgroud) performance user-defined-functions apache-spark apache-spark-sql pyspark
上下文:我有DataFrame
2列:单词和向量.其中"vector"的列类型是VectorUDT
.
一个例子:
word | vector
assert | [435,323,324,212...]
Run Code Online (Sandbox Code Playgroud)
我希望得到这个:
word | v1 | v2 | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Run Code Online (Sandbox Code Playgroud)
题:
如何使用PySpark为每个维度拆分包含多列向量的列?
提前致谢
python apache-spark apache-spark-sql pyspark apache-spark-ml
我有一个Dataframe,我试图压扁.作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行.例如,
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
Run Code Online (Sandbox Code Playgroud)
应该成为
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
Run Code Online (Sandbox Code Playgroud)
这是我的代码
private DataFrame explodeDataFrame(DataFrame df) {
DataFrame resultDf = df;
for (StructField field : df.schema().fields()) {
if (field.dataType() instanceof ArrayType) {
resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
resultDf.show();
}
}
return resultDf;
}
Run Code Online (Sandbox Code Playgroud)
问题是在我的数据中,一些数组列有空值.在这种情况下,整个行都将被删除.所以这个数据帧:
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
2 | Lucy | null
Run Code Online (Sandbox Code Playgroud)
变 …
构建和部署Spark应用程序时的常见问题是:
java.lang.ClassNotFoundException
.object x is not a member of package y
编译错误.java.lang.NoSuchMethodError
如何解决这些问题?
java scala nosuchmethoderror classnotfoundexception apache-spark
我想修改数据帧列(Age)的单元格值,其中当前它是空白的,我只会在另一列(Survived)的值为0时为相应的行进行修改,其中Age为空白.如果它在Survived列中为1但在Age列中为空,那么我将它保持为null.
我试图使用&&
运算符,但它没有用.这是我的代码:
tdata.withColumn("Age", when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
Run Code Online (Sandbox Code Playgroud)
任何建议如何处理?谢谢.
错误信息:
SyntaxError: invalid syntax
File "<ipython-input-33-3e691784411c>", line 1
tdata.withColumn("Age", when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
^
Run Code Online (Sandbox Code Playgroud) 假设我们有DataFrame,df
包含以下列:
名称,姓氏,大小,宽度,长度,重量
现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.
val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)
正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:
val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)
在分组之前以某种方式影响性能?
performance dataframe apache-spark apache-spark-sql apache-spark-dataset