我的数据框架构如下所示,要创建手动架构,请创建案例类。
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- message: string (nullable = true)
|-- powerData: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- current: array (nullable = true)
| | | |-- element: double (containsNull = true)
| | |-- delayStartTime: double (nullable = true)
| | |-- idSub1: string (nullable = true)
| | |-- motorNumber: integer (nullable = true)
| | |-- power: array (nullable …Run Code Online (Sandbox Code Playgroud) 当处理大量数据(大约10,000行数据)时,我想确保数据中没有重复项。
我们的工作流程是先读取镶木地板文件以生成火花dataframes,然后将其转换dataframes为scala case classes。
有两个选项可以消除重复项:
distinct()上Seq[caseClasses]distinct()上dataframes深入研究scala源代码,我发现它遍历了每条记录,并将它们放在set。与spark相比,哪种方法更有效?
谢谢!
编辑:原始帖子说数据大小是10,000,这是我们的测试数据的大小。后来数据大小可以达到500万。
我想将镶木地板文件写入PostgreSQL。我正在使用Spark并使用Spark Dataframe的write.jdbc函数写入文件。对于镶木地板列类型,如长,十进制或文本,一切正常。问题出在诸如Map之类的复杂类型上。我想将Map作为json存储在我的PostgreSQL中。由于我知道PostgreSQL可以将文本数据类型自动转换为json(使用强制转换操作),因此我将map转储为json字符串。
但是spark程序抱怨我们试图将“字符变化”数据类型插入“ json”类型的列中。这清楚表明PostgreSQL不会自动将“字符变化”转换为JSON。
我继续并登录到数据库,然后手动尝试将JSON字符串插入到表的JSON数据类型列中,并且它可以正常工作。
我的问题是为什么我的spark程序抱怨强制转换操作?
我正在使用Spark版本1.6.1,PostgreSQL 4.3和JDBC 42.1.1
这是代码片段
url = "jdbc:postgresql://host_name:host_port/db_name"
data_frame.write.jdbc(url, table_name, properties={"user": user, "password": password})
Run Code Online (Sandbox Code Playgroud)
错误堆栈跟踪:
Hint: You will need to rewrite or cast the expression.
Position: 66 Call getNextException to see other errors in the batch.
at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1325)
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1350)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:458)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:791)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1547)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:215)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:277)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:276)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at …Run Code Online (Sandbox Code Playgroud) 我有一个PySpark数据框,在列中包含时间戳(调用列'dt'),如下所示:
2018-04-07 16:46:00
2018-03-06 22:18:00
Run Code Online (Sandbox Code Playgroud)
当我执行:
SELECT trunc(dt, 'day') as day
Run Code Online (Sandbox Code Playgroud)
...我期望:
2018-04-07 00:00:00
2018-03-06 00:00:00
Run Code Online (Sandbox Code Playgroud)
但我得到了:
null
null
Run Code Online (Sandbox Code Playgroud)
我如何截断到一天而不是一小时?
我需要一个查询列出spark内部唯一的复合分区键.
CASSANDRA中的查询SELECT DISTINCT key1, key2, key3 FROM schema.table;非常快,但是在RDD中使用相同类型的数据过滤器或者spark.sql在比较中检索结果的速度非常慢.
例如
---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")
t1.count // takes 20 minutes
t2.count // takes 20 minutes
---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table;
Run Code Online (Sandbox Code Playgroud)
表格格式如下:
CREATE TABLE schema.table (
key1 text,
key2 text,
key3 text,
ckey1 text,
ckey2 text,
v1 int, …Run Code Online (Sandbox Code Playgroud) 我有一个数据框,我想在每个Window分区中提供ID。例如我有
id | col |
1 | a |
2 | a |
3 | b |
4 | c |
5 | c |
Run Code Online (Sandbox Code Playgroud)
所以我想要(基于与列col分组)
id | group |
1 | 1 |
2 | 1 |
3 | 2 |
4 | 3 |
5 | 3 |
Run Code Online (Sandbox Code Playgroud)
我想使用窗口函数,但是无论如何我都找不到为每个窗口分配ID的方法。我需要类似的东西:
w = Window().partitionBy('col')
df = df.withColumn("group", id().over(w))
Run Code Online (Sandbox Code Playgroud)
有什么办法可以达到这样的目标吗?(我不能简单地将col用作组ID,因为我有兴趣在多个列上创建一个窗口)
在Scala,Spark和许多其他"大数据"类型的框架,语言,库中,我看到了名为" zip*"的方法.例如,在Scala中,List类型有一个固有的zipWithIndex方法,您可以这样使用:
val listOfNames : List[String] = getSomehow()
for((name,i) <- listOfNames.zipWithIndex) {
println(s"Names #${i+1}: ${name}")
}
Run Code Online (Sandbox Code Playgroud)
同样星火有RDD类似的方法zip,zipPartitions等等.
但方法名称"zip"完全让我失望.这是计算或离散数学的概念吗?!所有这些方法的名称中都有" zip " 的动机是什么?
我想在我的特定细胞更改值Spark DataFrame使用PySpark。
琐碎的例子-我创建一个模拟Spark DataFrame:
df = spark.createDataFrame(
[
(1, 1.87, 'new_york'),
(4, 2.76, 'la'),
(6, 3.3, 'boston'),
(8, 4.1, 'detroit'),
(2, 5.70, 'miami'),
(3, 6.320, 'atlanta'),
(1, 6.1, 'houston')
],
('variable_1', "variable_2", "variable_3")
)
Run Code Online (Sandbox Code Playgroud)
运行display(df)我得到此表:
variable_1 variable_2 variable_3
1 1.87 new_york
4 2.76 la
6 3.3 boston
8 4.1 detroit
2 5.7 miami
3 6.32 atlanta
1 6.1 houston
Run Code Online (Sandbox Code Playgroud)
Let's说,例如,我想分配为第4行和第3列的单元格的新值,即改变detroit了new_orleans。我知道作业在中有效df.iloc[4, 3] = 'new_orleans'或df.loc[4, 'detroit'] = …
我在pyspark下面的示例中有一个数据框.我想复制数据框中的列并重命名为另一个列名.
Name Age Rate
Aira 23 90
Ben 32 98
Cat 27 95
Run Code Online (Sandbox Code Playgroud)
期望的输出是:
Name Age Rate Rate2
Aira 23 90 90
Ben 32 98 98
Cat 27 95 95
Run Code Online (Sandbox Code Playgroud)
我该怎么做?
如何将S3存储桶中的一堆文件加载到单个PySpark数据帧中?我在EMR实例上运行。如果文件是本地文件,则可以使用SparkContext textFile方法。但是,当文件位于S3上时,如何使用boto3将多个类型(CSV,JSON等)的多个文件加载到单个数据框中进行处理?
apache-spark ×10
pyspark ×6
scala ×3
amazon-s3 ×1
boto3 ×1
case-class ×1
cassandra ×1
dataframe ×1
distinct ×1
jdbc ×1
parquet ×1
postgresql ×1
pyspark-sql ×1
python ×1