我的 Spark 程序读取一个文件,其中包含编码为 64 的 gzip 压缩字符串。我必须解码和解压缩。我使用spark unbase64来解码并生成字节数组
bytedf=df.withColumn("unbase",unbase64(col("value")) )
Run Code Online (Sandbox Code Playgroud)
Spark中有没有可用的spark方法来解压缩字节码?
我正在尝试使用Spark Structured流从Kafka主题中读取XML数据.
我尝试使用Databricks spark-xml包,但我得到一个错误,说这个包不支持流式读取.有什么办法可以使用结构化流媒体从Kafka主题中提取XML数据吗?
我目前的代码:
df = spark \
.readStream \
.format("kafka") \
.format('com.databricks.spark.xml') \
.options(rowTag="MainElement")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option(subscribeType, "test") \
.load()
Run Code Online (Sandbox Code Playgroud)
错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
Run Code Online (Sandbox Code Playgroud) xml-parsing apache-spark pyspark-sql spark-structured-streaming
我正在尝试从 Kafka 进行结构化流式传输。我打算在 HDFS 中存储检查点。我读了一篇 Cloudera 博客,建议不要在 HDFS 中为 Spark 流存储检查点。结构流检查点是否存在相同的问题。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/。
在结构化流中,如果我的 spark 程序停机了一段时间,我如何从检查点目录中获取最新的偏移量并在该偏移量之后加载数据。我将检查点存储在一个目录中,如下所示。
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
Run Code Online (Sandbox Code Playgroud)
更新:
这是我的结构化流程序读取 Kafka 消息,解压缩并写入 HDFS。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud) 我有如图所示的输入交易
apples,mangos,eggs
milk,oranges,eggs
milk, cereals
mango,apples
Run Code Online (Sandbox Code Playgroud)
我必须像这样生成一个共现矩阵的 Spark 数据帧。
apple mango milk cereals eggs
apple 2 2 0 0 1
mango 2 2 0 0 1
milk 0 0 2 1 1
cereals 0 0 1 1 0
eggs 1 1 1 0 2
Run Code Online (Sandbox Code Playgroud)
苹果和芒果一起买两次,所以矩阵[apple][mango] =2。
我被困在实现这一点的想法中?任何建议都会有很大帮助。我正在使用 PySpark 来实现这一点。
我正在使用 Spark Structured Streaming 从 Kafka 队列中读取数据。从卡夫卡我看完后正在申请filter的dataframe。我正在将此过滤后的数据框保存到镶木地板文件中。这会生成许多空的镶木地板文件。有什么办法可以停止写入空文件吗?
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....)
query = filterDF .writeStream \
.option("path", outputpath) \
.option("checkpointLocation", RawXMLCheckpoint) \
.start()
Run Code Online (Sandbox Code Playgroud)