我的 Spark Streaming 工作正在消耗来自 Kafka 的数据
KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);
Run Code Online (Sandbox Code Playgroud)
每当我重新启动我的工作时,它就会开始从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要花费大量时间,并且如果我更改消费者组,它会立即处理新消息)
我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 都会从我离开的位置发送数据。
我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能做到这一点?任何建议
apache-kafka apache-spark spark-streaming kafka-consumer-api
我又来了,我尝试使用用 scala -2.10.5 编写的 Spark Streaming_1.6.1 类从 kafka_0.9.0.0 主题读取数据。这是一个简单的程序,我在 sbt_0.13.12 中构建了它。当我运行该程序时,我收到此异常
(run-main-0) org.apache.spark.SparkException: 由于阶段失败,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机):java.lang. lang.ClassCastException:[B 无法在 org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [错误] 在 org.kafka.receiver.AvroCons 处转换为 java.lang.String [错误] $$anonfun$1.apply(AvroConsumer.scala:54) [错误] 位于 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [错误]
位于 org.apache.spark.util.Utils$。 getIteratorSize(Utils.scala:1597) [错误] 在 org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.rdd.RDD$ $anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark。 SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [错误] 在 org.apache.spark.scheduler。 Task.run(Task.scala:89) [错误] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [错误] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1145) [错误] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [错误] 在 java.lang.Thread.run(Thread.java:745) [错误] [错误]驱动程序堆栈跟踪:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 …
我正在尝试通过 Apache Spark Streaming 读取 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,它工作正常,直到它执行spark.read.json读取 json 到数据帧。
package consumerTest
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import scala.util.parsing.json.{JSON, JSONObject}
object Consumer {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "<kafka-server>:9092",
"key.deserializer" -> …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming kafka-consumer-api spark-dataframe
我正在使用以下水槽。问题是它将elasticsearch索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用汇合4
{
"name": "es-sink-mysql-foobar-02",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist ",
"type.name": "type.name=kafka-connect",
"index.name": "asimtest",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "mysql-foobar",
"_comment": "If the Kafka message doesn't have a key …Run Code Online (Sandbox Code Playgroud) elasticsearch apache-kafka kafka-consumer-api apache-kafka-connect confluent-platform
我正在运行 ubuntu 18 并使用 java(intellij IDE)并编写一个基本的 kafka 应用程序。我正在尝试使用此处的基本示例,并将信息流式传输到应用程序,然后将某些内容打印到屏幕上,我正在使用 intellij“运行”命令运行应用程序。
当我将应用程序连接到输出流时,它工作正常并且我设法将信息输出到终端。
我尝试添加System.out.println()foreach 方法,在 apply 方法中,它不起作用,我在其中添加断点并运行调试模式,但它没有到达那里,我猜在运行期间流没有到达那里。
我正在使用正确的主题将信息流式传输到应用程序,并且我在应用程序中放置在 apply 和 foreach 之外的打印件工作正常。
每次向应用程序传输信息时,如何让应用程序打印某些内容?主要思想是处理某些内容并将结果打印到监视器而不是 kafka 流
这是代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Printed;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class main {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting …Run Code Online (Sandbox Code Playgroud) 我的项目通过为每个租户和通配符消费者使用单独的主题来在 Kafka 级别实现多租户,即发布到主题“message.tenant1”或“message.tenant99”并从主题“message.*”消费
这工作正常,直到我们想要动态添加新租户,即添加“message.tenant100”主题。在重新启动之前,通配符使用者不会看到新主题。
有没有办法让通配符消费者在不重启整个应用程序的情况下看到新主题?
我们正在使用 Spring,但如果无法通过 Spring 获得解决方案,那么我们可以使用其他方法。
编辑:事实证明这确实有效,但在重新平衡之前有一个粗略的延迟 5 分钟。5 分钟对于我们的生产来说可能太长了。我尝试将 'leader.imbalance.check.interval.seconds' 设置为较低的值,但这似乎没有任何效果。
我如何配置或告诉 Kafka 尽快重新平衡?我希望重新平衡是一项昂贵的操作,而不是您想要经常做的操作。
我现在正在研究 Kafka java api。在关于消费者的内容中,我经常看到ConsumerRecords。我想确切地知道键和值的含义。我已经阅读了官方文档,但我找不到答案。请分享你的智慧。
试图理解复制因子和消费者组之间的关系。示例:分区数 = 2 复制数 = 3 消费者组中的消费者数 = 4。在这种情况下 ,
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
Run Code Online (Sandbox Code Playgroud)
我的 Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
Run Code Online (Sandbox Code Playgroud)
我的输入输出通道接口
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)
我的控制台日志--
:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 …
spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka
我们试图在指定的窗口时间从 Kafka 读取数据(所以我们有 Kafka 消费者),这意味着避免在其他时间读取数据。但是,我们不确定如何在时间段到期后关闭消费者。我想知道是否有任何示例可以说明如何做到这一点?非常感谢您帮助我们。
apache-kafka spring-scheduled kafka-consumer-api spring-kafka
apache-kafka ×7
spring-kafka ×3
apache-spark ×2
java ×1
sbt ×1
scala ×1
spring-boot ×1
spring-cloud ×1