标签: apache-kafka

如何向受SSL保护的kafka主题发送消息

任何人都可以提出建议,使用java KafkaProducer,我们需要设置哪些属性来将消息发送到受SSL保护的kafka主题,这是kafka的新增功能,无法向受SSL保护的kafka发送一条消息

java ssl apache-kafka kafka-producer-api

0
推荐指数
1
解决办法
8599
查看次数

带Kafka连接器的Spark流式传输停止

我开始使用Spark流媒体.我想从Kafka获取一个流,其中包含我在Spark文档中找到的示例代码:https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html

这是我的代码:

object SparkStreaming {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Test_kafka_spark").setMaster("local[*]") // local parallelism 1
    val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9093",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("spark")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map(record => (record.key, record.value))

  }
}
Run Code Online (Sandbox Code Playgroud)

所有人似乎都开始很好,但工作立即停止,记录如下:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka spark-streaming spark-streaming-kafka

0
推荐指数
1
解决办法
479
查看次数

融合Kafka Connect Elasticsearch文档ID创建

我正在使用汇合来连接我的数据库和ES,但出现以下异常:

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我在卡夫卡连接-JDBC配置是:

name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app
Run Code Online (Sandbox Code Playgroud)

我的kafka-connect Elasticsearch配置是:

name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id
Run Code Online (Sandbox Code Playgroud)

我的表结构是:

employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1                  |  Bala    |  "2017-05-18 …
Run Code Online (Sandbox Code Playgroud)

jdbc elasticsearch apache-kafka confluent apache-kafka-connect

0
推荐指数
1
解决办法
962
查看次数

在Kafka Connect中使用自定义转换器吗?

我正在尝试将自定义转换器与Kafka Connect一起使用,但似乎无法正确使用。我希望有人对此有经验,可以帮助我解决!

初始情况

怎么了 ?

当连接器启动时,它们会正确加载罐子并找到自定义转换器。确实,这是我在日志中看到的内容:

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
Run Code Online (Sandbox Code Playgroud)

然后,我将JSON配置发布到连接器节点之一以创建我的连接器:

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}
Run Code Online (Sandbox Code Playgroud)

并收到以下回复:

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
2666
查看次数

kafka-stream:获取CorruptRecordException

我根据本教程编写了一个简单的kafka-stream程序:http:
//kafka.apache.org/10/documentation/streams/tutorial


程序

Pipe.java:

package eric.kafka.stream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

/**
 * kafka-stream - pipe,
 */
public class Pipe {
    // topic names,
    public static final String TOPIC_INPUT = "streams-plaintext-input";
    public static final String TOPIC_OUTPUT = "streams-pipe-output";

    public static void pipe() {
        // set up properties,
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // app id,
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka server,
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
2509
查看次数

KSQL左连接不起作用

我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.

我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.

我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAMEXPENSE_TABLE

ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)

ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)

当我执行以下Query时,它给我空指针异常.我尝试了以下查询.

1:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)

2: …

apache-kafka confluent ksql

0
推荐指数
1
解决办法
307
查看次数

出现副本滞后时,Kafka副本如何成为领导者

例如,我有三个副本:replica1(leader),replica2(follower),replica3(follower)。但是现在领导者和追随者之间存在差距,而领导者现在已经死亡。

因此,关注者没有最新消息(由于滞后)。那么,卡夫卡如何选出新的领导人,无论如何,都会有数据丢失,因此卡夫卡如何处理这一问题。

apache-kafka

0
推荐指数
1
解决办法
445
查看次数

Kafka Connect无法使用主题策略

语境

我编写了几个小型的Kafka Connect连接器。一个每秒仅生成随机数据,另一个每秒将其记录在控制台中。它们与Schema Registry集成在一起,因此可以使用Avro序列化数据。

我使用Landoop提供fast-data-dev Docker映像将它们部署到本地Kafka环境中

基本设置有效,并每秒产生一条记录的消息

但是,我想更改主题名称策略。默认一生成两个主题:

  • ${topic}-key
  • ${topic}-value

根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

根据文档,我的需求适合TopicRecordNameStrategy

我尝试了什么

我创建avroData用于发送值进行连接的对象:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }
Run Code Online (Sandbox Code Playgroud)

然后用它来创建SourceRecord响应对象

该文档指出,为了在Kafka Connect中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,将它们添加:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)

问题

连接器似乎忽略了这些属性,并继续使用旧的${topic}-key${topic}-value主题。

Kafka Connect应该支持不同的主题策略。我设法通过编写自己的版本AvroConverter和硬编码来解决此问题,该主题策略是我所需要的。但是,这似乎不是一种好方法,并且在尝试使用Sink …

avro apache-kafka apache-kafka-connect confluent-schema-registry

0
推荐指数
1
解决办法
658
查看次数

Scala如何订阅多个kafka主题

我想在scala中将字符串arry / list转换为util.Collection [String]对象。我尝试了多种方法,但没有解决。

import org.apache.kafka.clients.consumer.KafkaConsumer


object KafkaConsumerApp {

  def main(args: Array[String]): Unit = {

    val prop:Properties = new Properties()
    prop.put("bootstrap.servers","192.168.1.100:9092,192.168.1.141:9092,192.168.1.113:9092,192.168.1.118:9092")
    prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

    val consumer = new KafkaConsumer(prop)

    val topics = List[String] ("my_topic_partition","my_topic_partition")
    val a = Collections.singletonList(topics)

    consumer.subscribe(a)

  }
}
Run Code Online (Sandbox Code Playgroud)

Consumer.subscribe(a)返回编译时错误

Error:(24, 14) overloaded method value subscribe with alternatives:
  (x$1: java.util.regex.Pattern)Unit <and>
  (x$1: java.util.Collection[String])Unit
 cannot be applied to (java.util.List[List[String]])
    consumer.subscribe(a)
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka kafka-consumer-api

0
推荐指数
1
解决办法
236
查看次数

当流应用程序有多个实例时,有状态操作如何在Kafka流中工作?

状态完整操作如何在具有多个实例的Kafka Stream应用程序中工作?让我们说我们有2个主题,每个A和B有2个分区.我们有一个流应用程序,它既消耗了两个主题,又有两个流之间的连接.

现在我们正在运行此流应用程序的2个实例.据我所知,每个实例将分配每个主题的2个分区之一.

现在,如果要连接的消息被应用程序的不同实例使用,联接将如何发生?我无法理解它.

虽然我测试了一个似乎工作正常的小流应用程序.我是否可以在不考虑流应用程序中定义的拓扑类型的情况下,始终增加任何类型应用程序的实例数量?

是否有任何文件可以让我了解其工作细节?

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
99
查看次数