我需要帮助使用 kafka 生产者向主题发布消息。我的 kafka 生产者客户端是用运行在 spark 上的 Scala 编写的。
我的工作运行成功,但我的消息似乎没有发布。
这是代码
val response = info.producer.asInstanceOf[KafkaProducer[K, V]].send(new ProducerRecord(info.props.getProperty(s"$topicNickName.topic"), keyMessage._1, keyMessage._2))
Run Code Online (Sandbox Code Playgroud)
生产者配置值
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [x.data.edh:6667, y.data.edh:6667, z.data.edh:6667, a.data.edh:6667, b.data.edh:6667]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null …Run Code Online (Sandbox Code Playgroud) 我有一个主题,假设名称为“测试”。假设有4个分区P1、P2、P3、P4。现在,我正在发送一条消息假设来自 Kafka Producer 的 M1。我希望消息 M1 写入所有分区 P1、P2、P3、P4。是否可以?如果是的话我该怎么做?(我对此很陌生,我正在使用 Kafka-Node 来执行此操作。)
基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。
如何为 Spark 结构化流指定 kafka 消费者的组 ID?
如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?
但是,我在 spark 3.0 中尝试了如下设置。
package com.example
/**
* @author ${user.name}
*/
import scala.math.random
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import …Run Code Online (Sandbox Code Playgroud) scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration
据我了解,消费者读取特定主题的消息,并且消费者客户端将定期提交偏移量。
因此,如果由于某种原因消费者未能发送特定消息,则该偏移量将不会被提交,然后您可以返回并重新处理该消息。
是否有任何东西可以跟踪您刚刚消耗的偏移量以及您随后提交的偏移量?
我正在使用 Spark 3.1.1 并分别加入文件大小为 8.6Gb 和 25.2Mb 的两个 Dataframe,并且不应用任何过滤器。Spark 会自动使用 BroadcastHashJoin 来实现此目的,尽管spark.sql.autoBroadcastJoinThreshold默认值为 10Mb。
如何在不应用任何过滤器的情况下将 25.2Mb 转换为 8.1Mb 以获得广播资格?
val df1 = spark.read
.option("header",true)
.csv("s3a://data/staging/received/data/spark/3/KernelVersionOutputFiles.csv")
.withColumn("Pid",substring(rand(),3,4).cast("bigint"))
val df2 = spark.read
.option("header",true)
.csv("s3a://data/staging/received/data/spark/3/ForumTopics.csv")
.withColumn("Cid",substring(rand(),3,4).cast("bigint"))
val df3 = df2.coalesce(1)
val joinDf = df1.join(df3, df1("Pid") === df3("Cid"))
val cnt = joinDf.count()
Run Code Online (Sandbox Code Playgroud)
DAG 看起来像这样:
带有 spark-streaming 的 Kafka 抛出一个错误:
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
Run Code Online (Sandbox Code Playgroud)
我已经设置了一个 kafka 代理和一个工作的 Spark 环境,一个主人和一个工人。
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__=="__main__":
sc = SparkContext(appName="SparkStreamAISfromKAFKA")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,1)
kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
lines = kvs.map(lambda x: x[1])
lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我假设错误是缺少与 kafka ans 相关的特定版本。有人能帮忙吗?
火花版本:版本 3.0.0-preview2
我执行:
/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark pyspark spark-structured-streaming spark-kafka-integration
以下两个表达式之间是否存在功能差异?结果对我来说看起来一样,但很好奇是否有未知的未知数。该$符号表示什么/如何读取?
df1.orderBy($"reasonCode".asc).show(10, false)
df1.orderBy(asc("reasonCode")).show(10, false)
Run Code Online (Sandbox Code Playgroud) 在过去的几周里,我一直在通过工作中所做的所有测试来扩展我对 Spark 的了解,但我有点困惑,什么时候适合使用 UDF,什么时候不适合。查看一些同行代码,他们在使用数据帧时使用了很多UDF' ,但它们非常占用资源。因为我重构了他们的很多代码,所以我使用spark.sql() 重写了很多代码,而且速度更快,而且我只使用 Spark 功能。话虽如此,什么时候使用 UDF 比较合适,什么时候只使用 Spark 的内置功能比较合适?
user-defined-functions apache-spark apache-spark-sql pyspark
我有以下 Kafka 消费者,如果将 分配group_id给 None,它会很好地工作 - 它收到了所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
Run Code Online (Sandbox Code Playgroud)
group_id但是,如果我将其设置为某个值,它不会收到任何内容。我尝试运行测试生产者来发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入第 497 代组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新的分区分配:[] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 Consumer.py 为组 my_group 设置新分配的分区 set()
我正在使用 spring-kafka 模板编写 kafka 消费者。当我实例化消费者时,Spring kafka 接受如下参数。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Run Code Online (Sandbox Code Playgroud)
我阅读了文档,看起来还有很多其他参数也可以作为消费者配置传递。有趣的是,每个参数都有一个默认值。我的问题是
任何指示或答案都会对澄清我的疑问有很大帮助。
有Kafka cluster和Kafka broker一样的意思吗?
我知道集群有多个代理(这是错误的吗?)。
但是当我编写代码来生成消息时,我发现很尴尬option。
props.put("bootstrap.servers", "kafka001:9092, kafka002:9092, kafka003:9092");
Run Code Online (Sandbox Code Playgroud)
这是代理地址还是集群地址?如果这是经纪人地址,我认为这不好,因为当经纪人计数变化时我们必须修改上面的地址。
(但这似乎是经纪人地址..)
此外,我在亚马逊看到MSK,我们可以向每个AZ.
这意味着,我们不能有很多经纪人。(最多三四个?)
他们指导我们应该将此代理地址写入 bootstrap.server option as a,`单独的列表。
为什么他们不指导我们使用集群地址或ARN?
apache-kafka ×8
apache-spark ×6
scala ×3
pyspark ×2
aws-msk ×1
kafka-python ×1
python ×1
spring-kafka ×1