标签: apache-spark

如何创建具有结构类型的scala case类?

我的数据框架构如下所示,要创建手动架构,请创建案例类。

 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- message: string (nullable = true)
 |-- powerData: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- current: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- delayStartTime: double (nullable = true)
 |    |    |-- idSub1: string (nullable = true)
 |    |    |-- motorNumber: integer (nullable = true)
 |    |    |-- power: array (nullable …
Run Code Online (Sandbox Code Playgroud)

scala case-class apache-spark

1
推荐指数
1
解决办法
2158
查看次数

Scala的distinct()和spark的distinct(),哪个更有效?

当处理大量数据(大约10,000行数据)时,我想确保数据中没有重复项

我们的工作流程是先读取镶木地板文件以生成火花dataframes,然后将其转换dataframesscala case classes

有两个选项可以消除重复项:

  1. 呼叫阶distinct()Seq[caseClasses]
  2. 呼叫火花distinct()dataframes

深入研究scala源代码,我发现它遍历了每条记录,并将它们放在set。与spark相比,哪种方法更有效?

谢谢!

编辑:原始帖子说数据大小是10,000,这是我们的测试数据的大小。后来数据大小可以达到500万。

scala apache-spark

1
推荐指数
1
解决办法
64
查看次数

将镶木地板文件存储到PostgreSQL数据库中

我想将镶木地板文件写入PostgreSQL。我正在使用Spark并使用Spark Dataframe的write.jdbc函数写入文件。对于镶木地板列类型,如长,十进制或文本,一切正常。问题出在诸如Map之类的复杂类型上。我想将Map作为json存储在我的PostgreSQL中。由于我知道PostgreSQL可以将文本数据类型自动转换为json(使用强制转换操作),因此我将map转储为json字符串。

但是spark程序抱怨我们试图将“字符变化”数据类型插入“ json”类型的列中。这清楚表明PostgreSQL不会自动将“字符变化”转换为JSON。

我继续并登录到数据库,然后手动尝试将JSON字符串插入到表的JSON数据类型列中,并且它可以正常工作。

我的问题是为什么我的spark程序抱怨强制转换操作?

我正在使用Spark版本1.6.1,PostgreSQL 4.3和JDBC 42.1.1

这是代码片段

url = "jdbc:postgresql://host_name:host_port/db_name"
data_frame.write.jdbc(url, table_name, properties={"user": user, "password": password})
Run Code Online (Sandbox Code Playgroud)

错误堆栈跟踪:

Hint: You will need to rewrite or cast the expression.
  Position: 66  Call getNextException to see other errors in the batch.
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
    at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1325)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1350)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:458)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:791)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1547)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:215)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:277)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:276)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at …
Run Code Online (Sandbox Code Playgroud)

postgresql jdbc apache-spark parquet pyspark

1
推荐指数
1
解决办法
2621
查看次数

如何将时间戳类型的PySpark数据帧截断到当天?

我有一个PySpark数据框,在列中包含时间戳(调用列'dt'),如下所示:

2018-04-07 16:46:00
2018-03-06 22:18:00
Run Code Online (Sandbox Code Playgroud)

当我执行:

SELECT trunc(dt, 'day') as day
Run Code Online (Sandbox Code Playgroud)

...我期望:

2018-04-07 00:00:00
2018-03-06 00:00:00
Run Code Online (Sandbox Code Playgroud)

但我得到了:

null
null
Run Code Online (Sandbox Code Playgroud)

我如何截断到一天而不是一小时?

apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
1
解决办法
6050
查看次数

在Spark中选择DISTINCT Cassandra

我需要一个查询列出spark内部唯一的复合分区键.
CASSANDRA中的查询SELECT DISTINCT key1, key2, key3 FROM schema.table;非常快,但是在RDD中使用相同类型的数据过滤器或者spark.sql在比较中检索结果的速度非常慢.

例如

---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")

t1.count // takes 20 minutes
t2.count // takes 20 minutes

---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table; 
Run Code Online (Sandbox Code Playgroud)

表格格式如下:

CREATE TABLE schema.table (
    key1 text,
    key2 text,
    key3 text,
    ckey1 text,
    ckey2 text,
    v1 int, …
Run Code Online (Sandbox Code Playgroud)

distinct cassandra apache-spark

1
推荐指数
1
解决办法
595
查看次数

在Spark Dataframe的窗口上创建组ID

我有一个数据框,我想在每个Window分区中提供ID。例如我有

id | col |
1  |  a  |
2  |  a  |
3  |  b  |
4  |  c  |
5  |  c  |
Run Code Online (Sandbox Code Playgroud)

所以我想要(基于与列col分组)

id | group |
1  |  1    |
2  |  1    |
3  |  2    |
4  |  3    |
5  |  3    |
Run Code Online (Sandbox Code Playgroud)

我想使用窗口函数,但是无论如何我都找不到为每个窗口分配ID的方法。我需要类似的东西:

w = Window().partitionBy('col')
df = df.withColumn("group", id().over(w)) 
Run Code Online (Sandbox Code Playgroud)

有什么办法可以达到这样的目标吗?(我不能简单地将col用作组ID,因为我有兴趣在多个列上创建一个窗口)

window-functions apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1928
查看次数

Scala和Spark中的"zip"方法是什么?

在Scala,Spark和许多其他"大数据"类型的框架,语言,库中,我看到了名为" zip*"的方法.例如,在Scala中,List类型有一个固有的zipWithIndex方法,您可以这样使用:

val listOfNames : List[String] = getSomehow()
for((name,i) <- listOfNames.zipWithIndex) {
  println(s"Names #${i+1}: ${name}")
}
Run Code Online (Sandbox Code Playgroud)

同样星火有RDD类似的方法zip,zipPartitions等等.

但方法名称"zip"完全让我失望.这是计算或离散数学的概念吗?!所有这些方法的名称中都有" zip " 的动机是什么?

scala apache-spark

1
推荐指数
1
解决办法
1001
查看次数

将值分配给PySpark dataFrame中的特定单元格

我想在我的特定细胞更改值Spark DataFrame使用PySpark

琐碎的例子-我创建一个模拟Spark DataFrame

df = spark.createDataFrame(
    [
     (1, 1.87, 'new_york'), 
     (4, 2.76, 'la'), 
     (6, 3.3, 'boston'), 
     (8, 4.1, 'detroit'), 
     (2, 5.70, 'miami'), 
     (3, 6.320, 'atlanta'), 
     (1, 6.1, 'houston')
    ],
    ('variable_1', "variable_2", "variable_3")
)
Run Code Online (Sandbox Code Playgroud)

运行display(df)我得到此表:

variable_1   variable_2   variable_3
    1           1.87    new_york
    4           2.76    la
    6           3.3     boston
    8           4.1     detroit
    2           5.7     miami
    3           6.32    atlanta
    1           6.1     houston
Run Code Online (Sandbox Code Playgroud)

Let's说,例如,我想分配为第4行和第3列的单元格的新值,即改变detroitnew_orleans。我知道作业在中有效df.iloc[4, 3] = 'new_orleans'df.loc[4, 'detroit'] = …

python dataframe apache-spark pyspark

1
推荐指数
2
解决办法
2696
查看次数

在pyspark数据框中复制一列

我在pyspark下面的示例中有一个数据框.我想复制数据框中的列并重命名为另一个列名.

Name    Age    Rate
Aira     23     90
Ben      32     98
Cat      27     95
Run Code Online (Sandbox Code Playgroud)

期望的输出是:

Name    Age     Rate     Rate2
Aira    23      90       90
Ben     32      98       98
Cat     27      95       95
Run Code Online (Sandbox Code Playgroud)

我该怎么做?

apache-spark pyspark

1
推荐指数
1
解决办法
5147
查看次数

从S3存储桶读取文件到PySpark Dataframe Boto3

如何将S3存储桶中的一堆文件加载到单个PySpark数据帧中?我在EMR实例上运行。如果文件是本地文件,则可以使用SparkContext textFile方法。但是,当文件位于S3上时,如何使用boto3将多个类型(CSV,JSON等)的多个文件加载到单个数据框中进行处理?

amazon-s3 apache-spark boto3 pyspark

1
推荐指数
1
解决办法
5339
查看次数