标签: confluent

需要Java代码才能使用Confluent kafka connect HDFS API

我是卡夫卡的初学者.试图将数据从kafka写入hdfs.在任何地方都没有记录如何使用Confluent的kafka-connect-hdfs Java API.任何链接或代码段都会对我有所帮助.提前致谢.

apache-kafka confluent

3
推荐指数
1
解决办法
946
查看次数

使用Avro Converter运行Kafka Connect:ConfigException:“缺少架构注册表URL”

嗨,我正在运行Kafka Connect码头工人图像

docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_BOOTSTRAP_SERVERS=localhost:39092 \
  -e CONNECT_REST_PORT=28082 \
  -e CONNECT_GROUP_ID="quickstart" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_ZOOKEEPER_CONNECT="localhost:2181" \
  tim/kafka-connect
Run Code Online (Sandbox Code Playgroud)

并得到

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing Schema registry url!
    at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:64)
    at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:93)
    at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:74)
Run Code Online (Sandbox Code Playgroud)

无法确定在哪里添加“ schema.registry.url” conf!

apache-kafka confluent apache-kafka-connect

3
推荐指数
1
解决办法
4316
查看次数

Kafka Connect未能启动

我在新的linux centos 7上安装了kafka confluent oss 4.0,但是kafka connect无法启动.

重现步骤 :

 - Install Oracle JDK 8
 - Copy confluent-4.0.0 folder on opt/confluent-4.0.0
 - Run /opt/confluent-4.0.0/confluent start
Run Code Online (Sandbox Code Playgroud)

结果:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
\Kafka Connect failed to start
connect is [DOWN]
Run Code Online (Sandbox Code Playgroud)

错误日志(connect.stderr):

Exception in thread "main" java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka confluent apache-kafka-connect

3
推荐指数
1
解决办法
3879
查看次数

添加额外的Schema Registry层可以带来好处吗?

在生成/使用消息时,添加架构注册表的附加层(即故障点)是否有任何好处?如果服务中断,则消息将不会被使用或产生。通过使用不减少故障点的Schema Registry,使用Kafka的系统难道不会更容易出错吗?

apache-kafka confluent

2
推荐指数
1
解决办法
450
查看次数

将新使用者添加到正在运行的使用者组时会发生什么?

假设一个3个分区的kafka主题被3个消费者组成的消费者组消费。在云环境中,如果有新的消费者扩大规模,现在该组中有4个消费者。在这种情况下会发生什么?

  • Kafka是否创建另一个分区,以便新使用者可以访问它

要么

  • 新消费者是否闲置并且不消费任何东西?

apache-kafka confluent

2
推荐指数
1
解决办法
628
查看次数

Kafka AVRO-从长到日期时间的转换

当我要发送包含字段类型为long的AVRO消息时,出现以下错误:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime
Run Code Online (Sandbox Code Playgroud)

我使用Confluent 3.2.0和Apache Spark 2.2.0。在Spark Job中抛出此错误,该Spark Job处理AVRO消息并将其打印在控制台中。在AVRO模式中,相应的字段定义如下:

{\"name\": \"event_time\", \"type\": { \"type\" : \"long\", \"logicalType\": \"timestamp-millis\"}}
Run Code Online (Sandbox Code Playgroud)

在从.avsc文件生成的Java类中,该字段的定义如下:

private DateTime event_time;
Run Code Online (Sandbox Code Playgroud)

avro deserialization apache-kafka confluent

2
推荐指数
1
解决办法
1517
查看次数

你能从远程主机运行KSQL吗?

我在我的集​​群的一个节点上运行了confluent-ksql-server.我们可以让ksql由kafka集群外的特定主机/机器连接吗?

PS-这是为开发人员提供ksql访问权限

谢谢 !

apache-kafka confluent ksql

2
推荐指数
1
解决办法
228
查看次数

暂停流消耗

我正在开发一个在一分钟内处理很少记录的应用程序.请求率约为每分钟2次.这些请求是为一组数据创建和更新的.要求是交付保证,可靠的交付,订购保证和防止任何消息丢失.

  1. 我们的团队决定使用Kafka,我认为它不适合用例,因为Kafka最适合流数据.相反,我们可以更好地使用传统的消息模型.虽然卡夫卡确实提供每个分区排序,同样可以是传统的邮件系统上,如果消息的数目是低的和数据的来源也低实现.这是一个公平的陈述吗?

  2. 我们使用Kafka流来处理数据,处理要求我们对外部系统进行查找.如果外部系统不可用,那么当外部查找系统可用时,我们将停止处理并自动将消息传递到目标系统.目前,我们通过在处理过程中不断循环并检查系统是否可用来停止处理.a)这是在处理过程中中途停止流的最佳方法,以便它不再接收任何消息吗?b)数据流框架是否设计为在中途停止或暂停,以便它们在一段时间内完全停止使用流?

streaming integration messaging apache-kafka confluent

1
推荐指数
1
解决办法
3182
查看次数

尝试连接表和流时发生错误

我正在尝试连接一个表和一个流,并创建另一个表,如下所示:

CREATE TABLE table_fx_latest AS
   SELECT t1.currencyid,
          t1.maxtimestamp,
          t2.midprice 
  FROM stream_fx2 t2 LEFT JOIN table_fx_latest3 t1 
  ON t1.currencyid = t2.currencyid AND 
     t1.timestamp = t2.maxtimestamp 
  GROUP BY t1.currencyid, 
           t1.maxtimestamp, 
           t2.midprice;
Run Code Online (Sandbox Code Playgroud)

但是报告了以下错误:

Cannot RUN execution plan for this statement, CreateTableAsSelect{name=TABLE_FX_LATEST_PRICE6, query=Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=[T1.CURRENCYID T1_CURRENCYID, T1.MAXTIMESTAMP MAXTIMESTAMP, T2.MIDPRICE MIDPRICE]}, from=Join{type=LEFT, left=AliasedRelation{relation=STREAM_FX2, alias=T2}, right=AliasedRelation{relation=TABLE_FX_LATEST3, alias=T1}, criteria=Optional[JoinOn{((T1.CURRENCYID = T2.CURRENCYID) AND (T2.TIMESTAMP = T1.MAXTIMESTAMP))}]}, =null, where=null, groupBy=Optional[GroupBy{isDistinct=false, groupingElements=[SimpleGroupBy{columns=[T1.CURRENCYID]}, SimpleGroupBy{columns=[T1.MAXTIMESTAMP]}, SimpleGroupBy{columns=[T2.MIDPRICE]}]}], having=null, orderBy=[], limit=null}, orderBy=[]}, notExists=false, properties={}}
Caused by: io.confluent.ksql.parser.tree.LogicalBinaryExpression cannot be cast to io.confluent.ksql.parser.tree.ComparisonExpression
Run Code Online (Sandbox Code Playgroud)

这是stream_fx2流和 …

apache-kafka confluent ksql

1
推荐指数
1
解决办法
455
查看次数

融合Kafka Connect Elasticsearch文档ID创建

我正在使用汇合来连接我的数据库和ES,但出现以下异常:

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我在卡夫卡连接-JDBC配置是:

name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app
Run Code Online (Sandbox Code Playgroud)

我的kafka-connect Elasticsearch配置是:

name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id
Run Code Online (Sandbox Code Playgroud)

我的表结构是:

employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1                  |  Bala    |  "2017-05-18 …
Run Code Online (Sandbox Code Playgroud)

jdbc elasticsearch apache-kafka confluent apache-kafka-connect

0
推荐指数
1
解决办法
962
查看次数

KSQL左连接不起作用

我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.

我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.

我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAMEXPENSE_TABLE

ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)

ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)

当我执行以下Query时,它给我空指针异常.我尝试了以下查询.

1:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)

2: …

apache-kafka confluent ksql

0
推荐指数
1
解决办法
307
查看次数