小编mik*_*ike的帖子

如何在 Scala 中编写 Kafka Producer

我需要帮助使用 kafka 生产者向主题发布消息。我的 kafka 生产者客户端是用运行在 spark 上的 Scala 编写的。

我的工作运行成功,但我的消息似乎没有发布。

这是代码

val response = info.producer.asInstanceOf[KafkaProducer[K, V]].send(new ProducerRecord(info.props.getProperty(s"$topicNickName.topic"), keyMessage._1, keyMessage._2))
Run Code Online (Sandbox Code Playgroud)

生产者配置值

metric.reporters = []
    metadata.max.age.ms = 300000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [x.data.edh:6667, y.data.edh:6667, z.data.edh:6667, a.data.edh:6667, b.data.edh:6667]
    ssl.keystore.type = JKS
    sasl.mechanism = GSSAPI
    max.block.ms = 60000
    interceptor.classes = null
    ssl.truststore.password = null
    client.id = 
    ssl.endpoint.identification.algorithm = null
    request.timeout.ms = 30000
    acks = 1
    receive.buffer.bytes = 32768
    ssl.truststore.type = JKS
    retries = 0
    ssl.truststore.location = null
    ssl.keystore.password = null …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark kafka-producer-api

3
推荐指数
1
解决办法
3758
查看次数

如何在单个kafka主题的所有分区中写入相同的消息?

我有一个主题,假设名称为“测试”。假设有4个分区P1、P2、P3、P4。现在,我正在发送一条消息假设来自 Kafka Producer 的 M1。我希望消息 M1 写入所有分区 P1、P2、P3、P4。是否可以?如果是的话我该怎么做?(我对此很陌生,我正在使用 Kafka-Node 来执行此操作。)

apache-kafka kafka-consumer-api kafka-producer-api

3
推荐指数
1
解决办法
6720
查看次数

如何在 spark 3.0 结构化流中使用 kafka.group.id 和检查点以继续从重启后停止的 Kafka 读取?

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。

如何为 Spark 结构化流指定 kafka 消费者的组 ID?

如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?

但是,我在 spark 3.0 中尝试了如下设置。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

3
推荐指数
1
解决办法
1159
查看次数

kafka区分消耗的偏移量和提交的偏移量吗?

据我了解,消费者读取特定主题的消息,并且消费者客户端将定期提交偏移量。

因此,如果由于某种原因消费者未能发送特定消息,则该偏移量将不会被提交,然后您可以返回并重新处理该消息。

是否有任何东西可以跟踪您刚刚消耗的偏移量以及您随后提交的偏移量?

apache-kafka kafka-consumer-api

3
推荐指数
1
解决办法
3131
查看次数

为什么 Spark 对大小大于 autoBroadcastJoinThreshold 的文件应用广播连接?

我正在使用 Spark 3.1.1 并分别加入文件大小为 8.6Gb 和 25.2Mb 的两个 Dataframe,并且不应用任何过滤器。Spark 会自动使用 BroadcastHashJoin 来实现此目的,尽管spark.sql.autoBroadcastJoinThreshold默认值为 10Mb。

如何在不应用任何过滤器的情况下将 25.2Mb 转换为 8.1Mb 以获得广播资格?

val df1 = spark.read
  .option("header",true)
  .csv("s3a://data/staging/received/data/spark/3/KernelVersionOutputFiles.csv")
  .withColumn("Pid",substring(rand(),3,4).cast("bigint"))


val df2 = spark.read
  .option("header",true)
  .csv("s3a://data/staging/received/data/spark/3/ForumTopics.csv")
  .withColumn("Cid",substring(rand(),3,4).cast("bigint"))

val df3 = df2.coalesce(1)
val joinDf = df1.join(df3, df1("Pid") === df3("Cid"))
val cnt = joinDf.count()
Run Code Online (Sandbox Code Playgroud)

DAG 看起来像这样:

在此输入图像描述

apache-spark apache-spark-sql

3
推荐指数
1
解决办法
1078
查看次数

Spark 3.x 与 Python 中的 Kafka 集成

带有 spark-streaming 的 Kafka 抛出一个错误:

from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
Run Code Online (Sandbox Code Playgroud)

我已经设置了一个 kafka 代理和一个工作的 Spark 环境,一个主人和一个工人。

import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils



if __name__=="__main__":
    sc = SparkContext(appName="SparkStreamAISfromKAFKA")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,1)
    kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
    lines = kvs.map(lambda x: x[1])
    lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
    ssc.start()
    ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我假设错误是缺少与 kafka ans 相关的特定版本。有人能帮忙吗?

火花版本:版本 3.0.0-preview2

我执行:

/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark pyspark spark-structured-streaming spark-kafka-integration

2
推荐指数
1
解决办法
1327
查看次数

Spark Scala:使用 $ 符号的功能差异?

以下两个表达式之间是否存在功能差异?结果对我来说看起来一样,但很好奇是否有未知的未知数。该$符号表示什么/如何读取?

df1.orderBy($"reasonCode".asc).show(10, false)
    
df1.orderBy(asc("reasonCode")).show(10, false)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

2
推荐指数
1
解决办法
102
查看次数

何时适合使用 UDF 与使用 Spark 功能?

在过去的几周里,我一直在通过工作中所做的所有测试来扩展我对 Spark 的了解,但我有点困惑,什么时候适合使用 UDF,什么时候不适合。查看一些同行代码,他们在使用数据帧时使用了很多UDF' ,但它们非常占用资源。因为我重构了他们的很多代码,所以我使用spark.sql() 重写了很多代码,而且速度更快,而且我只使用 Spark 功能。话虽如此,什么时候使用 UDF 比较合适,什么时候只使用 Spark 的内置功能比较合适?

user-defined-functions apache-spark apache-spark-sql pyspark

2
推荐指数
1
解决办法
2698
查看次数

如果将group_id设置为None,Kafka消费者会收到消息,但如果不是None,它不会收到任何消息?

我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。

consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=enable_auto_commit,
        group_id=group_id,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

for m in consumer:
Run Code Online (Sandbox Code Playgroud)

group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。

消费者控制台确实显示以下消息:

2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[]
2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
4504
查看次数

Spring Kafka Consumer Configs - 默认值和至少一次语义

我正在使用 spring-kafka 模板编写 kafka 消费者。当我实例化消费者时,Spring kafka 接受如下参数。

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Run Code Online (Sandbox Code Playgroud)

我阅读了文档,看起来还有很多其他参数也可以作为消费者配置传递。有趣的是,每个参数都有一个默认值。我的问题是

  1. 这些是在什么基础上到达的?
  2. 是否真的需要改变这些值,如果是的话,这些值是什么
    (恕我直言,这是根据具体情况而定的。但仍然想听听专家的意见)
  3. 我们拥有的传递语义是至少一次。因此,对于这种(至少一次)传递语义,如果这些保持不变,它仍然会处理大量数据。

任何指示或答案都会对澄清我的疑问有很大帮助。

apache-kafka kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
3010
查看次数

Kafka集群和Kafka Broker有什么区别?

Kafka clusterKafka broker一样的意思吗?

我知道集群有多个代理(这是错误的吗?)。

但是当我编写代码来生成消息时,我发现很尴尬option

props.put("bootstrap.servers", "kafka001:9092, kafka002:9092, kafka003:9092");
Run Code Online (Sandbox Code Playgroud)

这是代理地址还是集群地址?如果这是经纪人地址,我认为这不好,因为当经纪人计数变化时我们必须修改上面的地址。
(但这似乎是经纪人地址..)

此外,我在亚马逊看到MSK,我们可以向每个AZ.
这意味着,我们不能有很多经纪人。(最多三四个?)

他们指导我们应该将此代理地址写入 bootstrap.server option as a,`单独的列表。

为什么他们不指导我们使用集群地址或ARN

apache-kafka kafka-cluster aws-msk

0
推荐指数
1
解决办法
4535
查看次数