标签: spark-dataframe

Spark数据帧组合到列表中

我正在尝试对集合进行一些分析.我有一个示例数据集,如下所示:

orders.json

{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
Run Code Online (Sandbox Code Playgroud)

它只是一个字段,它是一个代表ID的数字列表.

这是我试图运行的Spark脚本:

val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("Dataframe Test")

val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)

val dataframe = sql.read.json("orders.json")

val expanded = dataframe
  .explode[::[Long], Long]("items", "item1")(row => row)
  .explode[::[Long], Long]("items", "item2")(row => row)

val grouped = expanded
  .where(expanded("item1") !== expanded("item2"))
  .groupBy("item1", "item2")
  .count()

val recs = grouped
  .groupBy("item1")
Run Code Online (Sandbox Code Playgroud)

创建expanded并且grouped很好,简而言之,expanded是两个ID在同一原始集中的所有可能的两个ID的列表.grouped过滤掉与自身匹配的ID,然后将所有唯一ID组合在一起并为每个ID生成计数.架构和数据样本grouped是:

root
 |-- item1: long (nullable = true)
 |-- item2: long …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql spark-dataframe

10
推荐指数
1
解决办法
9065
查看次数

如何检索从Spark UI写入的输出大小和记录等指标?

如何在任务或作业完成后立即在控制台(Spark Shell或Spark提交作业)上收集这些指标.

我们使用Spark将数据从Mysql加载到Cassandra并且它非常庞大(例如:~200 GB和600M行).当任务完成后,我们想验证火花过程究竟完成了多少行?我们可以从Spark UI获取数字,但是如何从spark shell或spark-submit作业中检索该数字("Output Records Written").

示例命令从Mysql加载到Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
Run Code Online (Sandbox Code Playgroud)

我想在上面的任务中检索所有Spark UI指标,主要是输出大小和记录写入.

请帮忙.

谢谢你的时间!

apache-spark codahale-metrics apache-spark-sql spark-cassandra-connector spark-dataframe

10
推荐指数
1
解决办法
2200
查看次数

为什么df.limit在Pyspark中不断变化?

我创建了一些数据帧数据样本df

rdd = df.limit(10000).rdd
Run Code Online (Sandbox Code Playgroud)

这个操作需要相当长的时间(实际上为什么?在10000行之后它不能短路?),所以我假设我现在有了一个新的RDD.

但是,当我现在工作时rdd,每次访问它时都会有不同的行.好像它重新重新采样一样.缓存RDD有点帮助,但肯定不是保存?

它背后的原因是什么?

更新:这是Spark 1.5.2的再现

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))
Run Code Online (Sandbox Code Playgroud)

输出是

499500
19955500
49651500
Run Code Online (Sandbox Code Playgroud)

我很惊讶.rdd没有修复数据.

编辑:为了表明它比重​​新执行问题更棘手,这里是一个单一的操作,在Spark 2.0.0.2.5.0上产生不正确的结果

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join
Run Code Online (Sandbox Code Playgroud)

基本上,每当您使用limit结果时可能会出错.我并不是指"只是众多样本中的一个",而是非常不正确(因为在这种情况下结果应始终为12345).

apache-spark pyspark spark-dataframe

10
推荐指数
3
解决办法
8732
查看次数

如何在Spark Scala中从多个数组创建DataFrame?

val tvalues: Array[Double] = Array(1.866393526974307, 2.864048126935307, 4.032486069215076, 7.876169953355888, 4.875333799256043, 14.316322626848278)
val pvalues: Array[Double] = Array(0.064020056478447, 0.004808399479386827, 8.914865448939047E-5, 7.489564524121306E-13, 2.8363794106756046E-6, 0.0)
Run Code Online (Sandbox Code Playgroud)

我有两个如上所述的数组,我需要从这个数组构建一个DataFrame,如下所示,

Tvalues                Pvalues
1.866393526974307      0.064020056478447
2.864048126935307      0.004808399479386827
......                 .....
Run Code Online (Sandbox Code Playgroud)

截至目前我StringBuilder在Scala 尝试.没有按预期进行.请帮帮我.

arrays scala linear-regression spark-dataframe

10
推荐指数
1
解决办法
2万
查看次数

什么是版本库spark支持的SparkSession

使用SparkSession代码Spark.

   import org.apache.spark.SparkConf
   import org.apache.spark.SparkContext 

   val conf = SparkSession.builder
  .master("local")
  .appName("testing")
  .enableHiveSupport()  // <- enable Hive support.
  .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

代码pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.cms.spark</groupId>
    <artifactId>cms-spark</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>cms-spark</name>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.10</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.8.3</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this …
Run Code Online (Sandbox Code Playgroud)

hadoop scala apache-spark apache-spark-sql spark-dataframe

10
推荐指数
2
解决办法
2万
查看次数

如何使用PySpark将CSV文件作为dataFrame读取时跳过行?

我有一个以这种方式构建的CSV文件:

Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"
Run Code Online (Sandbox Code Playgroud)

我在阅读此文件时遇到两个问题.

  1. 我想忽略标题并忽略空白行
  2. 值中的逗号不是分隔符

这是我尝试过的:

df = sc.textFile("myFile.csv")\
              .map(lambda line: line.split(","))\ #Split By comma
              .filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows
Run Code Online (Sandbox Code Playgroud)

但是,这不起作用,因为值中的逗号被读作分隔符而len(line)返回4而不是2.

我尝试了另一种方法:

data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped
Run Code Online (Sandbox Code Playgroud)

我的想法是使用过滤器而不是读取标题.但是,当我尝试打印标题时,我得到了编码值.

[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]
Run Code Online (Sandbox Code Playgroud)

读取CSV文件并跳过前两行的正确方法是什么?

apache-spark pyspark spark-dataframe pyspark-sql

10
推荐指数
3
解决办法
1万
查看次数

通过行中非空元素的计数对PySpark Dataframe进行统一分区

我知道有一千个问题涉及如何通过盐键等来最好地划分你的DataFrames或者RDDs,但我认为这种情况不同,足以保证自己的问题.

我正在PySpark中构建协同过滤推荐引擎,这意味着需要比较每个用户(行)的唯一项目评级.因此,对于一DataFrame维的M (rows) x N (columns),这意味着该数据集变得M x (K choose 2)其中K << N为非空为用户(即,评估)元素的数量.

我的算法对于用户评估了大约一定数量项目的数据集非常有效.但是,对于一部分用户评估了大量项目(数量级比同一分区中的其他用户大)的情况,我的数据变得非常偏斜,最后几个分区开始花费大量时间.举一个简单的例子,考虑以下内容DataFrame:

cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
    (1, 4.5,  3.5,  None, 1.0,  None),  # user 1
    (2, 2.0,  None, 5.0,  4.0,  3.0),   # user 2
    (3, 3.5,  5.0,  1.0,  None, 1.0),   # user 3
    (4, None, None, 4.5,  3.5,  4.0),   # user 4
    (5, …
Run Code Online (Sandbox Code Playgroud)

python performance machine-learning pyspark spark-dataframe

10
推荐指数
1
解决办法
1327
查看次数

16个任务(1048.5 MB)的序列化结果总大小大于spark.driver.maxResultSize(1024.0 MB)

当我添加--conf spark.driver.maxResultSize=2050到我的spark-submit命令时,我收到以下错误.

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
Run Code Online (Sandbox Code Playgroud)

添加此配置的原因是错误:

py4j.protocol.Py4JJavaError: An …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

10
推荐指数
2
解决办法
1万
查看次数

计算Spark数据帧的大小 - SizeEstimator会产生意外结果

我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).

原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.

关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.

首先,我将数据帧保存到内存中:

df.cache().count 
Run Code Online (Sandbox Code Playgroud)

Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)

这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:

df.mapPartitions(
    iterator => Seq(SizeEstimator.estimate(
        iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)

这又导致10'792'965'376字节的不同大小=〜10.8GB.

我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).

SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?

apache-spark spark-dataframe

10
推荐指数
3
解决办法
1万
查看次数

TypeError:列不可迭代-如何遍历ArrayType()?

考虑以下DataFrame:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

可以使用以下代码创建:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)

有没有一种方法可以通过对每个元素应用函数而不使用?来直接修改ArrayType()列?"names"udf

例如,假设我想将该函数foo应用于"names"列。(我将使用其中的例子foostr.upper只用于说明目的,但我的问题是关于可以应用到一个可迭代的元素任何有效的功能。)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
Run Code Online (Sandbox Code Playgroud)

TypeError:列不可迭代

我可以使用udf

foo_udf = f.udf(lambda row: [foo(x) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe pyspark-sql

9
推荐指数
1
解决办法
4438
查看次数