小编ran*_*ddy的帖子

Spark 解码并解压缩 gzip 嵌入的 Base 64 字符串

我的 Spark 程序读取一个文件,其中包含编码为 64 的 gzip 压缩字符串。我必须解码和解压缩。我使用spark unbase64来解码并生成字节数组

bytedf=df.withColumn("unbase",unbase64(col("value")) )
Run Code Online (Sandbox Code Playgroud)

Spark中有没有可用的spark方法来解压缩字节码?

apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
3352
查看次数

如何从Kafka读取XML格式的流数据?

我正在尝试使用Spark Structured流从Kafka主题中读取XML数据.

我尝试使用Databricks spark-xml包,但我得到一个错误,说这个包不支持流式读取.有什么办法可以使用结构化流媒体从Kafka主题中提取XML数据吗?

我目前的代码:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()
Run Code Online (Sandbox Code Playgroud)

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
Run Code Online (Sandbox Code Playgroud)

xml-parsing apache-spark pyspark-sql spark-structured-streaming

5
推荐指数
2
解决办法
2881
查看次数

Kafka 结构化流检查点

我正在尝试从 Kafka 进行结构化流式传输。我打算在 HDFS 中存储检查点。我读了一篇 Cloudera 博客,建议不要在 HDFS 中为 Spark 流存储检查点。结构流检查点是否存在相同的问题。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

在结构化流中,如果我的 spark 程序停机了一段时间,我如何从检查点目录中获取最新的偏移量并在该偏移量之后加载数据。我将检查点存储在一个目录中,如下所示。

 df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()
Run Code Online (Sandbox Code Playgroud)

更新:

这是我的结构化流程序读取 Kafka 消息,解压缩并写入 HDFS。

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

hadoop pyspark spark-structured-streaming

4
推荐指数
1
解决办法
8529
查看次数

Spark生成发生矩阵

我有如图所示的输入交易

apples,mangos,eggs
milk,oranges,eggs
milk, cereals
mango,apples
Run Code Online (Sandbox Code Playgroud)

我必须像这样生成一个共现矩阵的 Spark 数据帧。

     apple mango  milk cereals  eggs
apple    2     2      0     0       1
mango    2     2      0     0       1
milk     0     0      2     1       1
cereals  0     0      1     1       0
eggs     1     1      1     0       2
Run Code Online (Sandbox Code Playgroud)

苹果和芒果一起买两次,所以矩阵[apple][mango] =2。

我被困在实现这一点的想法中?任何建议都会有很大帮助。我正在使用 PySpark 来实现这一点。

apache-spark apache-spark-sql pyspark-sql

4
推荐指数
1
解决办法
982
查看次数

编写镶木地板文件时如何避免空文件?

我正在使用 Spark Structured Streaming 从 Kafka 队列中读取数据。从卡夫卡我看完后正在申请filterdataframe。我正在将此过滤后的数据框保存到镶木地板文件中。这会生成许多空的镶木地板文件。有什么办法可以停止写入空文件吗?

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KafkaServer) \
    .option("subscribe", KafkaTopics) \
    .load()

Transaction_DF = df.selectExpr("CAST(value AS STRING)")

decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....) 

query = filterDF .writeStream \
    .option("path", outputpath) \
    .option("checkpointLocation", RawXMLCheckpoint) \
    .start()
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-structured-streaming

2
推荐指数
2
解决办法
9622
查看次数