UDF导致警告:CachedKafkaConsumer未在UninterruptibleThread中运行(KAFKA-1894)

okw*_*wap 8 apache-kafka apache-spark spark-streaming apache-spark-sql pyspark

在通常的structured_kafka_wordcount.py代码中,

当我将线条分成udf如下所示的单词时,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)
Run Code Online (Sandbox Code Playgroud)

警告将继续显示:

WARN CachedKafkaConsumer:CachedKafkaConsumer未在UninterruptibleThread中运行.当CachedKafkaConsumer的方法由于KAFKA-1894而中断时,它可能会挂起

另一方面,当我将线条分成单词时pyspark.sql.functions.split,一切都运行良好.

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)
Run Code Online (Sandbox Code Playgroud)

为什么会这样,以及如何修复警告?

这是我试图在实践中执行的代码:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)


def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
Run Code Online (Sandbox Code Playgroud)

hi-*_*zir 5

除了拒绝Python UDF *,您无法在代码中对此问题做任何事情。如您所见,异常消息UninterruptibleThread是Kafka错误(KAFKA-1894)的变通办法,旨在防止中断时发生无限循环KafkaConsumer

它不用于PythonUDFRunner(在此处引入特殊情况可能没有意义)。

就个人而言,除非您遇到一些相关问题,否则我不会担心。您的Python代码永远不会直接与交互KafkaConsumer。并且,如果您遇到任何问题,应该在上游进行修复-在这种情况下,我建议创建JIRA票证


*您的unfold函数可以用SQL函数重写,但这将是一个hack。将消息计数添加为整数:

from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when

p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"

lines = spark.createDataFrame(
    ["asd message repeated 3 times: [ 12]", "some other message"], "string"
)

lines_with_count = lines.withColumn(
   "message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))
Run Code Online (Sandbox Code Playgroud)

用它来 explode

exploded = lines_with_count.withColumn(
     "i", 
      expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
Run Code Online (Sandbox Code Playgroud)

并提取:

exploded.withColumn(
    "value",
    when(
        col("value").rlike(p),
         concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
    ).otherwise(col("value"))).show(4, False)


# +------------------+
# |value             |
# +------------------+
# |asd 12            |
# |asd 12            |
# |asd 12            |
# |some other message|
# +------------------+
Run Code Online (Sandbox Code Playgroud)