小编shi*_*404的帖子

在Spark中分解结构列时出错

我有一个数据框架,其架构如下所示:

event: struct (nullable = true)
|    | event_category: string (nullable = true)
|    | event_name: string (nullable = true)
|    | properties: struct (nullable = true)
|    |    | ErrorCode: string (nullable = true)
|    |    | ErrorDescription: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用以下代码爆炸该structproperties

df_json.withColumn("event_properties", explode($"event.properties"))
Run Code Online (Sandbox Code Playgroud)

但这引发了以下异常:

cannot resolve 'explode(`event`.`properties`)' due to data type mismatch: 
input to function explode should be array or map type, 
not StructType(StructField(IDFA,StringType,true),
Run Code Online (Sandbox Code Playgroud)

如何爆炸列properties

scala apache-spark apache-spark-sql pyspark spark-dataframe

5
推荐指数
2
解决办法
7758
查看次数

Kafka消费者端故障处理和重新投递

如果某些消息在 kafka 中没有被确认会发生什么?假设我正在同时消费消息。并且一位消费者无法处理 offset=20 的消息并且没有发回 Ack。但是其他具有 offset=21 的消息已被消耗并返回。我怎样才能只重播 20 个?

我是否需要将消息放入 DLQ 并再次消费?如果失败也发生在那里怎么办?

我对保证交货有点困惑。

apache-kafka kafka-consumer-api spring-kafka

3
推荐指数
1
解决办法
1304
查看次数