你能帮我理解 Spark SQl 和 Hive 之间的区别吗?
所以我有一个看起来像这样的数据框:
|-- id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我想实现一个看起来像:
|-- id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?我为第二个结构创建了案例类,但我不知道映射第一个数据框。
谢谢!
我有一个员工文件,其中包含以下数据:
Name: Age:
David 25
Jag 32
Paul 33
Sam 18
Run Code Online (Sandbox Code Playgroud)
我加载到dataframeApache Spark 中,我正在过滤如下值:
Name: Age:
David 25
Jag 32
Paul 33
Sam 18
Run Code Online (Sandbox Code Playgroud)
employee_rdd=sc.textFile("employee.txt")
employee_df=employee_rdd.toDF()
employee_data = employee_df.filter("Name = 'David'").collect()
Run Code Online (Sandbox Code Playgroud)
但是当我尝试做这样的事情时:
emp_Name='Sam' 并将此名称传递给过滤器,如下所示:
employee_data = employee_df.filter("Name = 'emp_Name'").collect
Run Code Online (Sandbox Code Playgroud)
但这给了我空清单。
我有一个列“年龄”的字符串类型的数据框,我想获得一个包含字符串格式范围的新列
范围如下
[-1, 12, 17, 24, 34, 44, 54, 64, 100, 1000]
例如输入值
Age
=====
-1
12
18
28
38
46
======
Run Code Online (Sandbox Code Playgroud)
需要输出
Age Age-Range
===== =========
-1 (-1,12)
12 (-1,12)
18 (12-17)
28 (24-34)
38 (34-44)
46 (44-54)
====== ==========
Run Code Online (Sandbox Code Playgroud)
任何建议或帮助都受到高度赞赏
我之前已经成功地将 pyspark 用于 Spark Streaming(Spark 2.0.2)和 Kafka(0.10.1.0),但我的目的更适合结构化流。我尝试在线使用示例:https : //spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,我总是以以下错误告终:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
Run Code Online (Sandbox Code Playgroud)
我还尝试在创建 ds1 时将其添加到我的选项集中:
.option("partition.assignment.strategy", "range")
Run Code Online (Sandbox Code Playgroud)
但即使明确地为其分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。
我也用“assign”选项尝试了这个并实现了同样的错误(我们的Kafka主机设置为assign——每个消费者只分配一个分区,我们没有任何重新平衡)。
知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流处理?或者这是唯一的网关?
apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming
我有以下数据框显示购买收入。
+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
| 1| 1| 0|
| 1| 2| 0|
| 1| 3| 0|
| 1| 4| 100|
| 1| 5| 0|
| 1| 6| 0|
| 1| 7| 200|
| 1| 8| 0|
| 1| 9| 10|
+-------+--------+-------+
Run Code Online (Sandbox Code Playgroud)
最终,我希望新列purch_revenue在每一行中显示购买产生的收入。作为一种解决方法,我还尝试引入一个购买标识符purch_id,每次购买时都会增加该标识符。所以这只是作为参考列出。
+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
| 1| 1| 0| 100| 1|
| 1| 2| 0| 100| 1|
| 1| 3| 0| 100| 1|
| 1| 4| 100| 100| 1|
| 1| 5| 0| 100| 2|
| …Run Code Online (Sandbox Code Playgroud) window-functions apache-spark apache-spark-sql pyspark pyspark-sql
我正在尝试通过该sqlContext.read.format("json")方法连接到 Oracle 。一切顺利,但在创建 JDBC 字符串时,我必须在字符串中指定数据库的用户名和密码:
val jdbcString = "jdbc:oracle:thin:USERNAME/PASSWORD@//HOSTNAME:PORT/SID"
Run Code Online (Sandbox Code Playgroud)
但是,我jceks在 HDFS 上确实有一个包含密码的文件。我想知道是否有任何方法可以利用该文件连接到 JDBC 而不是纯文本密码?就像在 Sqoop 中一样,我们可以这样做:
sqoop import -Dhadoop.security.credential.provider.path=jceks://hdfs/data/credentials/oracle.password.jceks
Run Code Online (Sandbox Code Playgroud)
谢谢。
我正在使用 Apache spark 中的巨大数据集(包含 332 个字段)与大约 10M 记录的 scala(除了一个字段,其余 331 个可以为空)。但我想用空白字符串(“”)替换 null。由于我有大量字段,实现这一目标的最佳方法是什么?我想在导入此数据集时处理空值,因此在执行转换或导出到 DF 时我会很安全。所以我创建了具有 332 个字段的案例类,处理这些空值的最佳方法是什么?我可以使用 Option(field).getOrElse(""),但我想这不是最好的方法,因为我有大量的字段。谢谢!!
我正在尝试将多行合并为一列,作为 spark 数据帧(spark 1.6.1)中的有效 json 格式。然后我希望它存储在 mysql 表中。
我的原始火花数据框如下所示:
|user_id |product_id|price |
|A |p1 |3000 |
|A |p2 |1500 |
|B |P1 |3000 |
|B |P3 |2000 |
Run Code Online (Sandbox Code Playgroud)
我想像这样转换上表:
|user_id |contents_json
|A |{(product_id:p1, price:3000), (product_id:p2, price:1500)}
|B |{{product_id:p1, price:3000), (product_id:p3, price:2000)}
Run Code Online (Sandbox Code Playgroud)
然后把上面的表放到mysql表中。
这是完全相反的爆炸方式,但我找不到正确的方法。
我需要从源表创建一个表(hive 表/spark 数据框),该表将多行用户的数据存储到单行列表中。
User table:
Schema: userid: string | transactiondate:string | charges: string |events:array<struct<name:string,value:string>>
----|------------|-------| ---------------------------------------
123 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"this"}]
123 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"last"}]
123 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"recent"}]
123 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"0"}]
456 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"this"}]
456 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"last"}]
456 | 2017-09-01 | 20.00 | [{"name":"chargeperiod","value":"recent"}]
456 | 2017-09-01 | 30.00 | [{"name":"chargeperiod","value":"0"}]
Run Code Online (Sandbox Code Playgroud)
输出表应该是
userid:String | concatenatedlist :List[Row]
-------|-----------------
123 | …Run Code Online (Sandbox Code Playgroud) apache-spark ×10
apache-spark-sql ×10
pyspark ×4
scala ×4
hive ×2
apache-kafka ×1
dataframe ×1
hadoop ×1
jdbc ×1
json ×1
pyspark-sql ×1
python-2.7 ×1