标签: apache-spark-sql

Spark SQL 和 Hive 的区别

你能帮我理解 Spark SQl 和 Hive 之间的区别吗?

hive apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2644
查看次数

如何从列创建结构?

所以我有一个看起来像这样的数据框:

 |-- id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我想实现一个看起来像:

 |-- id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?我为第二个结构创建了案例类,但我不知道映射第一个数据框。

谢谢!

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2631
查看次数

如何动态传递参数以过滤 Apache Spark 中的函数?

我有一个员工文件,其中包含以下数据:

Name:   Age:
David   25
Jag     32
Paul    33
Sam     18
Run Code Online (Sandbox Code Playgroud)

我加载到dataframeApache Spark 中,我正在过滤如下值:

Name:   Age:
David   25
Jag     32
Paul    33
Sam     18
Run Code Online (Sandbox Code Playgroud)
employee_rdd=sc.textFile("employee.txt")
employee_df=employee_rdd.toDF()
employee_data = employee_df.filter("Name = 'David'").collect() 
Run Code Online (Sandbox Code Playgroud)

但是当我尝试做这样的事情时:

emp_Name='Sam' 并将此名称传递给过滤器,如下所示:

employee_data = employee_df.filter("Name = 'emp_Name'").collect
Run Code Online (Sandbox Code Playgroud)

但这给了我空清单。

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1万
查看次数

如何根据其他列 spark 的值在 Dataframe 中添加列

我有一个列“年龄”的字符串类型的数据框,我想获得一个包含字符串格式范围的新列

范围如下

[-1, 12, 17, 24, 34, 44, 54, 64, 100, 1000]

例如输入值

Age
=====  
-1
12
18
28
38
46
======
Run Code Online (Sandbox Code Playgroud)

需要输出

  Age    Age-Range
 =====  ========= 
 -1     (-1,12)
 12     (-1,12)
 18     (12-17) 
 28     (24-34)
 38     (34-44)
 46     (44-54) 
======  ==========
Run Code Online (Sandbox Code Playgroud)

任何建议或帮助都受到高度赞赏

scala dataframe apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3641
查看次数

Pyspark Structured Streaming Kafka 配置错误

我之前已经成功地将 pyspark 用于 Spark Streaming(Spark 2.0.2)和 Kafka(0.10.1.0),但我的目的更适合结构化流。我尝试在线使用示例:https : //spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html

使用以下类似代码:

ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
query = ds1
  .writeStream
  .outputMode('append')
  .format('console')
  .start()
query.awaitTermination() 
Run Code Online (Sandbox Code Playgroud)

但是,我总是以以下错误告终:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value
Run Code Online (Sandbox Code Playgroud)

我还尝试在创建 ds1 时将其添加到我的选项集中:

.option("partition.assignment.strategy", "range")
Run Code Online (Sandbox Code Playgroud)

但即使明确地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。

我也用“assign”选项尝试了这个并实现了同样的错误(我们的Kafka主机设置为assign——每个消费者只分配一个分区,我们没有任何重新平衡)。

知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?

apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming

1
推荐指数
1
解决办法
2664
查看次数

Spark - 带递归的窗口?- 有条件地跨行传播值

我有以下数据框显示购买收入。

+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
|      1|       1|      0|
|      1|       2|      0|
|      1|       3|      0|
|      1|       4|    100|
|      1|       5|      0|
|      1|       6|      0|
|      1|       7|    200|
|      1|       8|      0|
|      1|       9|     10|
+-------+--------+-------+
Run Code Online (Sandbox Code Playgroud)

最终,我希望新列purch_revenue在每一行中显示购买产生的收入。作为一种解决方法,我还尝试引入一个购买标识符purch_id,每次购买时都会增加该标识符。所以这只是作为参考列出。

+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
|      1|       1|      0|          100|       1|
|      1|       2|      0|          100|       1|
|      1|       3|      0|          100|       1|
|      1|       4|    100|          100|       1|
|      1|       5|      0|          100|       2|
| …
Run Code Online (Sandbox Code Playgroud)

window-functions apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
1
解决办法
1812
查看次数

使用 Spark JDBC 指定 jceks 文件

我正在尝试通过该sqlContext.read.format("json")方法连接到 Oracle 。一切顺利,但在创建 JDBC 字符串时,我必须在字符串中指定数据库的用户名和密码:

val jdbcString = "jdbc:oracle:thin:USERNAME/PASSWORD@//HOSTNAME:PORT/SID"
Run Code Online (Sandbox Code Playgroud)

但是,我jceks在 HDFS 上确实有一个包含密码的文件。我想知道是否有任何方法可以利用该文件连接到 JDBC 而不是纯文本密码?就像在 Sqoop 中一样,我们可以这样做:

sqoop import -Dhadoop.security.credential.provider.path=jceks://hdfs/data/credentials/oracle.password.jceks
Run Code Online (Sandbox Code Playgroud)

谢谢。

hadoop jdbc apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2231
查看次数

如何使用scala在Apache spark中用空字符串(“”)替换空值

我正在使用 Apache spark 中的巨大数据集(包含 332 个字段)与大约 10M 记录的 scala(除了一个字段,其余 331 个可以为空)。但我想用空白字符串(“”)替换 null。由于我有大量字段,实现这一目标的最佳方法是什么?我想在导入此数据集时处理空值,因此在执行转换或导出到 DF 时我会很安全。所以我创建了具有 332 个字段的案例类,处理这些空值的最佳方法是什么?我可以使用 Option(field).getOrElse(""),但我想这不是最好的方法,因为我有大量的字段。谢谢!!

scala apache-spark apache-spark-sql spark-dataframe

1
推荐指数
2
解决办法
9874
查看次数

如何将行合并到 spark 数据帧的列中作为有效的 json 将其写入 mysql

我正在尝试将多行合并为一列,作为 spark 数据帧(spark 1.6.1)中的有效 json 格式。然后我希望它存储在 mysql 表中。

我的原始火花数据框如下所示:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |
Run Code Online (Sandbox Code Playgroud)

我想像这样转换上表:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 
Run Code Online (Sandbox Code Playgroud)

然后把上面的表放到mysql表中。

这是完全相反的爆炸方式,但我找不到正确的方法。

json python-2.7 apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
3085
查看次数

Apache Spark 将多行连接成单行列表

我需要从源表创建一个表(hive 表/spark 数据框),该表将多行用户的数据存储到单行列表中。

User table:
Schema:  userid: string | transactiondate:string | charges: string |events:array<struct<name:string,value:string>> 
----|------------|-------| ---------------------------------------
123 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"this"}]
123 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"last"}]
123 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"recent"}]
123 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"0"}]
456 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"this"}]
456 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"last"}]
456 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"recent"}]
456 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"0"}]
Run Code Online (Sandbox Code Playgroud)

输出表应该是

userid:String | concatenatedlist :List[Row]
-------|-----------------
123    | …
Run Code Online (Sandbox Code Playgroud)

hive scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
9613
查看次数