我withColumn在Spark-Scala环境中遇到了一些问题.我想在我的DataFrame中添加一个新列,如下所示:
+---+----+---+
| A| B| C|
+---+----+---+
| 4|blah| 2|
| 2| | 3|
| 56| foo| 3|
|100|null| 5|
+---+----+---+
Run Code Online (Sandbox Code Playgroud)
成为:
+---+----+---+-----+
| A| B| C| D |
+---+----+---+-----+
| 4|blah| 2| 750|
| 2| | 3| 750|
| 56| foo| 3| 750|
|100|null| 5| 750|
+---+----+---+-----+
Run Code Online (Sandbox Code Playgroud)
对于我的DataFrame中的每一行,一列中的列D重复N次.
代码是这样的:
var totVehicles : Double = df_totVehicles(0).getDouble(0); //return 750
Run Code Online (Sandbox Code Playgroud)
变量totVehicles返回正确的值,它的工作原理!
第二个DataFrame必须计算2个字段(id_zipcode,n_vehicles),并添加第三列(具有相同的值-750):
var df_nVehicles =
df_carPark.filter(
substring($"id_time",1,4) < 2013
).groupBy(
$"id_zipcode"
).agg(
sum($"n_vehicles") as 'n_vehicles
).select(
$"id_zipcode" as 'id_zipcode,
'n_vehicles …Run Code Online (Sandbox Code Playgroud) 社区!
请帮助我了解如何使用 Spark 获得更好的压缩率?
让我描述一下案例:
我有数据集,让我们把它的产品在其上的实木复合地板文件使用的编解码器使用Sqoop ImportTool进口HDFS瞬间。作为导入的结果,我有 100 个文件,总大小为46 GB,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。记录总数超过80 亿条,有84 列
我也在使用snappy对 Spark 进行简单的读取/重新分区/写入,结果我得到:
~ 100 GB输出大小,具有相同的文件数、相同的编解码器、相同的数量和相同的列。
代码片段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
Run Code Online (Sandbox Code Playgroud)
摄取:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY …Run Code Online (Sandbox Code Playgroud) snappy apache-spark parquet apache-spark-sql spark-dataframe
我写了一个方法,必须考虑一个随机数来模拟伯努利分布.我random.nextDouble用来生成0到1之间的数字,然后根据我的概率参数给出我的决定.
我的问题是Spark在我的for循环映射函数的每次迭代中生成相同的随机数.我正在使用DataFrameAPI.我的代码遵循以下格式:
val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)
for (m <- 1 to M) {
val newDF = sqlContext.createDataFrame(myDF
.map{row => RowFactory
.create(row.getString(0),
myClass.myMethod(row.getString(2), rand.nextDouble())
}, myDF.schema)
}
Run Code Online (Sandbox Code Playgroud)
这是班级:
class myClass extends Serializable {
val q = qProb
def myMethod(s: String, rand: Double) = {
if (rand <= q) // do something
else // do something else
}
}
Run Code Online (Sandbox Code Playgroud)
每次myMethod调用时我都需要一个新的随机数.我也尝试在我的方法中生成数字java.util.Random( …
我有以下两个数据框:
DF1:
Id | field_A | field_B | field_C | field_D
1 | cat | 12 | black | 11
2 | dog | 128 | white | 19
3 | dog | 35 | yellow | 20
4 | dog | 21 | brown | 4
5 | bird | 10 | blue | 7
6 | cow | 99 | brown | 34
Run Code Online (Sandbox Code Playgroud)
和
DF2:
Id | field_B | field_C | field_D | field_E
3 | 35 | …Run Code Online (Sandbox Code Playgroud) 我在Java应用程序中使用SparkSQL使用Databricks对CSV文件进行一些处理以进行解析.
我正在处理的数据来自不同的来源(远程URL,本地文件,谷歌云存储),我习惯将所有内容都变成一个InputStream,以便我可以解析和处理数据,而无需知道它来自何处.
我在Spark上看到的所有文档都从路径中读取文件,例如
SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
.avg("varC","varD");
dfGrouped.show();
Run Code Online (Sandbox Code Playgroud)
我想要做的是从InputStream中读取,或者甚至只读取已经在内存中的字符串.类似于以下内容:
InputStream stream = new URL(
"http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
).openStream();
DataFrame dfRemote = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.read(someString);
Run Code Online (Sandbox Code Playgroud)
这里有什么简单的东西吗?
我已经阅读了一些关于Spark Streaming和自定义接收器的文档,但据我所知,这是为了打开一个连续提供数据的连接.Spark Streaming似乎将数据分解为块并对其进行一些处理,期望更多的数据进入无休止的流中.
我最好的猜测是,Spark作为Hadoop的后代,期望大量的数据可能存在于某个文件系统中.但是由于Spark无论如何都要在内存中进行处理,因此我认为SparkSQL能够解析内存中的数据.
任何帮助,将不胜感激.
java apache-spark apache-spark-sql spark-dataframe databricks
我想了解 pyspark 代码中的分析。
在此之后:https : //github.com/apache/spark/pull/2351
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
284 function calls (276 primitive calls) in 0.001 seconds
Ordered by: internal time, cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
4 0.000 0.000 0.000 0.000 serializers.py:198(load_stream)
4 0.000 0.000 0.000 0.000 {reduce}
12/4 0.000 0.000 0.001 0.000 rdd.py:2092(pipeline_func)
4 0.000 0.000 0.000 0.000 {cPickle.loads}
4 0.000 0.000 0.000 0.000 {cPickle.dumps}
104 0.000 0.000 0.000 0.000 …Run Code Online (Sandbox Code Playgroud) profiler apache-spark apache-spark-sql pyspark spark-dataframe
我有一个看起来像这样的数据框:
A B C
---------------
A1 B1 0.8
A1 B2 0.55
A1 B3 0.43
A2 B1 0.7
A2 B2 0.5
A2 B3 0.5
A3 B1 0.2
A3 B2 0.3
A3 B3 0.4
Run Code Online (Sandbox Code Playgroud)
如何将列'C'转换为每列A的相对等级(更高的分数 - >更好的等级)?预期产出:
A B Rank
---------------
A1 B1 1
A1 B2 2
A1 B3 3
A2 B1 1
A2 B2 2
A2 B3 2
A3 B1 3
A3 B2 2
A3 B3 1
Run Code Online (Sandbox Code Playgroud)
我想要达到的最终状态是聚合列B并存储每个A的等级:
例:
B Ranks
B1 [1,1,3]
B2 [2,2,2]
B3 [3,2,1]
Run Code Online (Sandbox Code Playgroud) 我在pyspark中有一个数据框,其中包含大写的列ID,COMPANY依此类推
我想将这些列名称设为id company等等.根据要求将所有列细菌转换为小写或大写.
我想这样做,列的数据类型保持不变.
我们怎么做?
我正在尝试从Spark 2.1升级到2.2.当我尝试将数据帧读取或写入某个位置(CSV或JSON)时,我收到此错误:
Illegal pattern component: XXX
java.lang.IllegalArgumentException: Illegal pattern component: XXX
at org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
at org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
at org.apache.commons.lang3.time.FastDatePrinter.<init>(FastDatePrinter.java:142)
at org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:384)
at org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:369)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
at org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
at org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
at org.apache.spark.sql.catalyst.json.JSONOptions.<init>(JSONOptions.scala:81)
at org.apache.spark.sql.catalyst.json.JSONOptions.<init>(JSONOptions.scala:43)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:53)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:333)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:279)
Run Code Online (Sandbox Code Playgroud)
我没有为dateFormat设置默认值,所以我不知道它来自哪里.
spark.createDataFrame(objects.map((o) => MyObject(t.source, t.table, o.partition, o.offset, d)))
.coalesce(1)
.write
.mode(SaveMode.Append)
.partitionBy("source", "table")
.json(path)
Run Code Online (Sandbox Code Playgroud)
我仍然得到这个错误:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder.appName("Spark2.2Test").master("local").getOrCreate()
import spark.implicits._
val agesRows = List(Person("alice", 35), …Run Code Online (Sandbox Code Playgroud) 我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).
原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者在写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)或repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.
关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.
首先,我将数据帧保存到内存中:
df.cache().count
Run Code Online (Sandbox Code Playgroud)
Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)
这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这又导致10'792'965'376字节的不同大小=〜10.8GB.
我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).
SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?
apache-spark ×10
spark-dataframe ×10
pyspark ×4
scala ×3
python ×2
databricks ×1
java ×1
parquet ×1
profiler ×1
random ×1
snappy ×1