任何人都可以提出建议,使用java KafkaProducer,我们需要设置哪些属性来将消息发送到受SSL保护的kafka主题,这是kafka的新增功能,无法向受SSL保护的kafka发送一条消息
我开始使用Spark流媒体.我想从Kafka获取一个流,其中包含我在Spark文档中找到的示例代码:https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html
这是我的代码:
object SparkStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Test_kafka_spark").setMaster("local[*]") // local parallelism 1
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9093",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
}
}
Run Code Online (Sandbox Code Playgroud)
所有人似乎都开始很好,但工作立即停止,记录如下:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties …Run Code Online (Sandbox Code Playgroud) 我正在使用汇合来连接我的数据库和ES,但出现以下异常:
org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我在卡夫卡连接-JDBC配置是:。
name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app
Run Code Online (Sandbox Code Playgroud)
我的kafka-connect Elasticsearch配置是:
name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id
Run Code Online (Sandbox Code Playgroud)
我的表结构是:
employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1 | Bala | "2017-05-18 …Run Code Online (Sandbox Code Playgroud) jdbc elasticsearch apache-kafka confluent apache-kafka-connect
我正在尝试将自定义转换器与Kafka Connect一起使用,但似乎无法正确使用。我希望有人对此有经验,可以帮助我解决!
我的自定义转换器的类路径为custom.CustomStringConverter。
为避免出现任何错误,我的自定义转换器当前只是先前存在的StringConverter的副本/粘贴(当然,在我开始使用它时,它将改变)。 https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
我有一个由3个节点组成的kafka connect集群,这些节点正在运行confluent的官方docker映像(confluentinc/cp-kafka-connect:3.3.0)。
每个节点都配置为使用我的转换器加载一个jar(使用docker卷)。
当连接器启动时,它们会正确加载罐子并找到自定义转换器。确实,这是我在日志中看到的内容:
[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
Run Code Online (Sandbox Code Playgroud)
然后,我将JSON配置发布到连接器节点之一以创建我的连接器:
{
"name": "hdfsSinkCustom",
"config": {
"topics": "yellow",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "custom.CustomStringConverter",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
"topics.dir": "yellow_storage",
"flush.size": "1",
"rotate.interval.ms": "1000"
}
}
Run Code Online (Sandbox Code Playgroud)
并收到以下回复:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains …Run Code Online (Sandbox Code Playgroud) 我根据本教程编写了一个简单的kafka-stream程序:http:
//kafka.apache.org/10/documentation/streams/tutorial
Pipe.java:
package eric.kafka.stream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
/**
* kafka-stream - pipe,
*/
public class Pipe {
// topic names,
public static final String TOPIC_INPUT = "streams-plaintext-input";
public static final String TOPIC_OUTPUT = "streams-pipe-output";
public static void pipe() {
// set up properties,
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); // app id,
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // kafka server,
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // serialization / …Run Code Online (Sandbox Code Playgroud) 我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.
我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.
我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAM和EXPENSE_TABLE
ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
和
ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
当我执行以下Query时,它给我空指针异常.我尝试了以下查询.
1:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)
2: …
例如,我有三个副本:replica1(leader),replica2(follower),replica3(follower)。但是现在领导者和追随者之间存在差距,而领导者现在已经死亡。
因此,关注者没有最新消息(由于滞后)。那么,卡夫卡如何选出新的领导人,无论如何,都会有数据丢失,因此卡夫卡如何处理这一问题。
语境
我编写了几个小型的Kafka Connect连接器。一个每秒仅生成随机数据,另一个每秒将其记录在控制台中。它们与Schema Registry集成在一起,因此可以使用Avro序列化数据。
我使用Landoop提供的fast-data-dev Docker映像将它们部署到本地Kafka环境中
基本设置有效,并每秒产生一条记录的消息
但是,我想更改主题名称策略。默认一生成两个主题:
${topic}-key${topic}-value根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:
${topic}-${keyRecordName}${topic}-${valueRecordName}根据文档,我的需求适合TopicRecordNameStrategy
我尝试了什么
我创建avroData用于发送值进行连接的对象:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
Run Code Online (Sandbox Code Playgroud)
然后用它来创建SourceRecord响应对象
该文档指出,为了在Kafka Connect中使用架构注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,将它们添加:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Run Code Online (Sandbox Code Playgroud)
问题
连接器似乎忽略了这些属性,并继续使用旧的${topic}-key和${topic}-value主题。
题
Kafka Connect应该支持不同的主题策略。我设法通过编写自己的版本AvroConverter和硬编码来解决此问题,该主题策略是我所需要的。但是,这似乎不是一种好方法,并且在尝试使用Sink …
avro apache-kafka apache-kafka-connect confluent-schema-registry
我想在scala中将字符串arry / list转换为util.Collection [String]对象。我尝试了多种方法,但没有解决。
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaConsumerApp {
def main(args: Array[String]): Unit = {
val prop:Properties = new Properties()
prop.put("bootstrap.servers","192.168.1.100:9092,192.168.1.141:9092,192.168.1.113:9092,192.168.1.118:9092")
prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer(prop)
val topics = List[String] ("my_topic_partition","my_topic_partition")
val a = Collections.singletonList(topics)
consumer.subscribe(a)
}
}
Run Code Online (Sandbox Code Playgroud)
Consumer.subscribe(a)返回编译时错误
Error:(24, 14) overloaded method value subscribe with alternatives:
(x$1: java.util.regex.Pattern)Unit <and>
(x$1: java.util.Collection[String])Unit
cannot be applied to (java.util.List[List[String]])
consumer.subscribe(a)
Run Code Online (Sandbox Code Playgroud) 状态完整操作如何在具有多个实例的Kafka Stream应用程序中工作?让我们说我们有2个主题,每个A和B有2个分区.我们有一个流应用程序,它既消耗了两个主题,又有两个流之间的连接.
现在我们正在运行此流应用程序的2个实例.据我所知,每个实例将分配每个主题的2个分区之一.
现在,如果要连接的消息被应用程序的不同实例使用,联接将如何发生?我无法理解它.
虽然我测试了一个似乎工作正常的小流应用程序.我是否可以在不考虑流应用程序中定义的拓扑类型的情况下,始终增加任何类型应用程序的实例数量?
是否有任何文件可以让我了解其工作细节?