我正在尝试将两个流合并为一个并将结果写入主题
代码:1-阅读两个主题
val PERSONINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx:9092")
.option("subscribe", "PERSONINFORMATION")
.option("group.id", "info")
.option("maxOffsetsPerTrigger", 1000)
.option("startingOffsets", "earliest")
.load()
val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxx:9092")
.option("subscribe", "CANDIDATEINFORMATION")
.option("group.id", "candent")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false")
.load()
Run Code Online (Sandbox Code Playgroud)
2-解析数据以加入它们:
val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")
val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")
val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
Run Code Online (Sandbox Code Playgroud)
3-连接两个框架
val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")
val …Run Code Online (Sandbox Code Playgroud) 我一直在这里关注这个例子,我想知道这个精度函数到底是如何工作的:
def compute_accuracy(y_true, y_pred):
'''Compute classification accuracy with a fixed threshold on distances.
'''
pred = y_pred.ravel() < 0.5
return np.mean(pred == y_true)
Run Code Online (Sandbox Code Playgroud)
据我所知,在这种情况下网络的输出将是两对之间的距离。那么在这种情况下我们如何计算准确率呢?“0.5”阈值指的是什么?另外,如何计算错误率?