标签: spark-dataframe

Spark,在Scala中添加具有相同值的新列

withColumn在Spark-Scala环境中遇到了一些问题.我想在我的DataFrame中添加一个新列,如下所示:

+---+----+---+
|  A|   B|  C|
+---+----+---+
|  4|blah|  2|
|  2|    |  3|
| 56| foo|  3|
|100|null|  5|
+---+----+---+
Run Code Online (Sandbox Code Playgroud)

成为:

+---+----+---+-----+
|  A|   B|  C|  D  |
+---+----+---+-----+
|  4|blah|  2|  750|
|  2|    |  3|  750|
| 56| foo|  3|  750|
|100|null|  5|  750|
+---+----+---+-----+
Run Code Online (Sandbox Code Playgroud)

对于我的DataFrame中的每一行,一列中的列D重复N次.

代码是这样的:

var totVehicles : Double = df_totVehicles(0).getDouble(0); //return 750
Run Code Online (Sandbox Code Playgroud)

变量totVehicles返回正确的值,它的工作原理!

第二个DataFrame必须计算2个字段(id_zipcode,n_vehicles),并添加第三列(具有相同的值-750):

var df_nVehicles =
df_carPark.filter(
      substring($"id_time",1,4) < 2013
    ).groupBy(
      $"id_zipcode"
    ).agg(
      sum($"n_vehicles") as 'n_vehicles
    ).select(
      $"id_zipcode" as 'id_zipcode,
      'n_vehicles …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

11
推荐指数
1
解决办法
2万
查看次数

Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降

社区!

请帮助我了解如何使用 Spark 获得更好的压缩率?

让我描述一下案例:

  1. 我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,84 列

  2. 我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:

~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。

代码片段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
  1. 使用镶木地板工具,我查看了摄取和处理的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY …
Run Code Online (Sandbox Code Playgroud)

snappy apache-spark parquet apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
1万
查看次数

Spark - 随机数生成

我写了一个方法,必须考虑一个随机数来模拟伯努利分布.我random.nextDouble用来生成0到1之间的数字,然后根据我的概率参数给出我的决定.

我的问题是Spark在我的for循环映射函数的每次迭代中生成相同的随机数.我正在使用DataFrameAPI.我的代码遵循以下格式:

val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}
Run Code Online (Sandbox Code Playgroud)

这是班级:

class myClass extends Serializable {
  val q = qProb

  def myMethod(s: String, rand: Double) = {
    if (rand <= q) // do something
    else // do something else
  }
}
Run Code Online (Sandbox Code Playgroud)

每次myMethod调用时我都需要一个新的随机数.我也尝试在我的方法中生成数字java.util.Random( …

random scala apache-spark spark-dataframe

10
推荐指数
3
解决办法
2万
查看次数

pyspark:合并(外连接)两个数据帧

我有以下两个数据框:

DF1:

    Id | field_A | field_B | field_C | field_D
     1 |   cat   |  12     |   black | 11
     2 |   dog   | 128     |   white | 19
     3 |   dog   |  35     |  yellow | 20
     4 |   dog   |  21     |   brown |  4
     5 |  bird   |  10     |    blue |  7
     6 |   cow   |  99     |   brown | 34
Run Code Online (Sandbox Code Playgroud)

DF2:

    Id | field_B | field_C | field_D | field_E
     3 |  35     | …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

10
推荐指数
1
解决办法
2万
查看次数

Spark:读取inputStream而不是File

我在Java应用程序中使用SparkSQL使用Databricks对CSV文件进行一些处理以进行解析.

我正在处理的数据来自不同的来源(远程URL,本地文件,谷歌云存储),我习惯将所有内容都变成一个InputStream,以便我可以解析和处理数据,而无需知道它来自何处.

我在Spark上看到的所有文档都从路径中读取文件,例如

SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);

DataFrame df = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("path/to/file.csv");

DataFrame dfGrouped = df.groupBy("varA","varB")
    .avg("varC","varD");

dfGrouped.show();
Run Code Online (Sandbox Code Playgroud)

我想要做的是从InputStream中读取,或者甚至只读取已经在内存中的字符串.类似于以下内容:

InputStream stream = new URL(
    "http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
    ).openStream();

DataFrame dfRemote = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(stream);

String someString = "imagine,some,csv,data,here";

DataFrame dfFromString = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .read(someString);
Run Code Online (Sandbox Code Playgroud)

这里有什么简单的东西吗?

我已经阅读了一些关于Spark Streaming和自定义接收器的文档,但据我所知,这是为了打开一个连续提供数据的连接.Spark Streaming似乎将数据分解为块并对其进行一些处理,期望更多的数据进入无休止的流中.

我最好的猜测是,Spark作为Hadoop的后代,期望大量的数据可能存在于某个文件系统中.但是由于Spark无论如何都要在内存中进行处理,因此我认为SparkSQL能够解析内存中的数据.

任何帮助,将不胜感激.

java apache-spark apache-spark-sql spark-dataframe databricks

10
推荐指数
1
解决办法
5672
查看次数

如何分析 pyspark 工作

我想了解 pyspark 代码中的分析。

在此之后:https : //github.com/apache/spark/pull/2351

>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
         284 function calls (276 primitive calls) in 0.001 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
        4    0.000    0.000    0.000    0.000 {reduce}
     12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
        4    0.000    0.000    0.000    0.000 {cPickle.loads}
        4    0.000    0.000    0.000    0.000 {cPickle.dumps}
      104    0.000    0.000    0.000    0.000 …
Run Code Online (Sandbox Code Playgroud)

profiler apache-spark apache-spark-sql pyspark spark-dataframe

10
推荐指数
1
解决办法
5221
查看次数

使用pyspark分组By,Rank和聚合火花数据框

我有一个看起来像这样的数据框:

A     B    C
---------------
A1    B1   0.8
A1    B2   0.55
A1    B3   0.43

A2    B1   0.7
A2    B2   0.5
A2    B3   0.5

A3    B1   0.2
A3    B2   0.3
A3    B3   0.4
Run Code Online (Sandbox Code Playgroud)

如何将列'C'转换为每列A的相对等级(更高的分数 - >更好的等级)?预期产出:

A     B    Rank
---------------
A1    B1   1
A1    B2   2
A1    B3   3

A2    B1   1
A2    B2   2
A2    B3   2

A3    B1   3
A3    B2   2
A3    B3   1
Run Code Online (Sandbox Code Playgroud)

我想要达到的最终状态是聚合列B并存储每个A的等级:

例:

B    Ranks
B1   [1,1,3]
B2   [2,2,2]
B3   [3,2,1]
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe

10
推荐指数
1
解决办法
2万
查看次数

将pyspark数据框的列转换为小写

我在pyspark中有一个数据框,其中包含大写的列ID,COMPANY依此类推

我想将这些列名称设为id company等等.根据要求将所有列细菌转换为小写或大写.

我想这样做,列的数据类型保持不变.

我们怎么做?

python apache-spark pyspark spark-dataframe

10
推荐指数
1
解决办法
1万
查看次数

Spark 2.2非法模式组件:XXX java.lang.IllegalArgumentException:非法模式组件:XXX

我正在尝试从Spark 2.1升级到2.2.当我尝试将数据帧读取或写入某个位置(CSV或JSON)时,我收到此错误:

Illegal pattern component: XXX
java.lang.IllegalArgumentException: Illegal pattern component: XXX
at org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
at org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
at org.apache.commons.lang3.time.FastDatePrinter.<init>(FastDatePrinter.java:142)
at org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:384)
at org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:369)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
at org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
at org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
at org.apache.spark.sql.catalyst.json.JSONOptions.<init>(JSONOptions.scala:81)
at org.apache.spark.sql.catalyst.json.JSONOptions.<init>(JSONOptions.scala:43)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:53)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:333)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:279)
Run Code Online (Sandbox Code Playgroud)

我没有为dateFormat设置默认值,所以我不知道它来自哪里.

spark.createDataFrame(objects.map((o) => MyObject(t.source, t.table, o.partition, o.offset, d)))
    .coalesce(1)
    .write
    .mode(SaveMode.Append)
    .partitionBy("source", "table")
    .json(path)
Run Code Online (Sandbox Code Playgroud)

我仍然得到这个错误:

import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder.appName("Spark2.2Test").master("local").getOrCreate()
import spark.implicits._
val agesRows = List(Person("alice", 35), …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

10
推荐指数
2
解决办法
9053
查看次数

计算Spark数据帧的大小 - SizeEstimator会产生意外结果

我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).

原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.

关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.

首先,我将数据帧保存到内存中:

df.cache().count 
Run Code Online (Sandbox Code Playgroud)

Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)

这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这又导致10'792'965'376字节的不同大小=〜10.8GB.

我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).

SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?

apache-spark spark-dataframe

10
推荐指数
3
解决办法
1万
查看次数