我按照此处的快速入门指南的说明在本地运行 kafka ,
然后我定义了我的消费者组配置,config/consumer.properties以便我的消费者可以从定义的group.id
运行以下命令,
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Run Code Online (Sandbox Code Playgroud)
结果是,
test-consumer-group <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh
Run Code Online (Sandbox Code Playgroud)
我能够通过基于 python 的消费者连接到 kafka,该消费者被配置为使用提供group.id即test-consumer-group
首先,我无法理解 kafka 如何/何时创建消费者组。似乎它conf/consumer.properties在某个时间点加载了,另外它在console-consumer-67807通过kafka-console-consumer.sh.
我怎样才能明确地创建我自己的消费者组,比如说my-created-consumer-group?
Kafka 再平衡算法可以跨主题工作吗?
假设我有 5 个主题,每个主题有 10 个分区,同一消费者组中有 20 个消费者应用程序实例,每个实例都订阅这 5 个主题。
Kafka 会尝试在 20 个实例中均匀平衡 50 个分区吗?
或者它只会在一个主题内平衡,因此第一个 10 个实例可能(或可能)接收所有 50 个分区,而其他 10 个实例可能保持空闲状态?
我知道过去 Kafka 并没有在主题之间保持平衡,但是现在的版本又如何呢?
我试图找出消费者群体层面是否也存在任何抵消。消费者偏移量是在消费者组级别还是在 Kafka 中该消费者组内的单个消费者?
我正在研究 kafka 消费者程序。最近我们将其部署在 PROD 环境中。在那里,我们遇到了如下问题:
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch …Run Code Online (Sandbox Code Playgroud) 使用发布/订阅如何确认消息?
当消息发送给唯一组中的某些消费者时。如果所有消费者都确认了消息就意味着消息被确认了,还是所有消费者都确认了消息?
我正在尝试使用mapGroupsWithState方法对传入的数据流进行有状态结构化流处理。但我面临的问题是,我为groupByKey选择的键使我的状态太大太快。明显的出路是更改密钥,但我希望在更新方法中应用的业务逻辑要求密钥与我现在拥有的密钥完全相同,或者如果可能的话,访问所有密钥的GroupState 。
例如,我有来自各个组织的数据流,通常组织包含 userId、personId 等。请参阅下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
Run Code Online (Sandbox Code Playgroud)
案例类别:
case class User(
orgId: Long,
profileId: Long,
userId: Long)
case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
Run Code Online (Sandbox Code Playgroud)
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = { …Run Code Online (Sandbox Code Playgroud) aggregate apache-spark spark-streaming spark-structured-streaming
我正在本地模式下使用 Spark 集群运行 PySpark,并尝试将流式 DataFrame 写入 Kafka 主题。
当我运行查询时,我收到以下消息:
java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed..
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
java.lang.IllegalStateException: Set(topicname-0) …Run Code Online (Sandbox Code Playgroud) apache-kafka pyspark spark-structured-streaming spark-streaming-kafka
我在 Ubuntu 20.04 中有kafka_2.13-2.7.0 。我运行 kafka 服务器和 Zookeeper,然后创建一个主题并通过 发送其中的文本文件nc -lk 9999。该主题充满了数据。另外,我的系统上有spark-3.0.1-bin-hadoop2.7。事实上,我想使用 kafka 主题作为 Spark Structured Streaming with python 的来源。我的代码是这样的:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
我使用以下命令通过Spark-Submit运行上述代码:
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py
Run Code Online (Sandbox Code Playgroud)
代码运行没有任何异常,我收到 Spark 站点中的输出:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming
我正在尝试将两个流合并为一个并将结果写入主题
代码:1-阅读两个主题
val PERSONINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx:9092")
.option("subscribe", "PERSONINFORMATION")
.option("group.id", "info")
.option("maxOffsetsPerTrigger", 1000)
.option("startingOffsets", "earliest")
.load()
val CANDIDATEINFORMATION_df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxx:9092")
.option("subscribe", "CANDIDATEINFORMATION")
.option("group.id", "candent")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", "false")
.load()
Run Code Online (Sandbox Code Playgroud)
2-解析数据以加入它们:
val parsed_PERSONINFORMATION_df: DataFrame = PERSONINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaPERSONINFORMATION).as("s")).select("s.*")
val parsed_CANDIDATEINFORMATION_df: DataFrame = CANDIDATEINFORMATION_df
.select(from_json(expr("cast(value as string) as actualValue"), schemaCANDIDATEINFORMATION).as("s")).select("s.*")
val df_person = parsed_PERSONINFORMATION_df.as("dfperson")
val df_candidate = parsed_CANDIDATEINFORMATION_df.as("dfcandidate")
Run Code Online (Sandbox Code Playgroud)
3-连接两个框架
val joined_df : DataFrame = df_candidate.join(df_person, col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner")
val …Run Code Online (Sandbox Code Playgroud) 我有一个 Dataframe,我想通过 Databricks Notebook 中的 select 语句中的小部件动态传递列名称。我该怎么做?
我正在使用下面的代码
df1 = spark.sql("select * from tableraw")
Run Code Online (Sandbox Code Playgroud)
其中df1有列“tablename”和“layer”
df = df1.select("tablename", "layer")
Run Code Online (Sandbox Code Playgroud)
现在,我们的要求是使用小部件的值来选择这些列,如下所示:
df = df1.select(dbutils.widget.get("tablename"), dbutils.widget.get("datalayer"))
Run Code Online (Sandbox Code Playgroud)