我是卡夫卡的初学者.试图将数据从kafka写入hdfs.在任何地方都没有记录如何使用Confluent的kafka-connect-hdfs Java API.任何链接或代码段都会对我有所帮助.提前致谢.
嗨,我正在运行Kafka Connect码头工人图像
docker run -d \
--name=kafka-connect \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=localhost:39092 \
-e CONNECT_REST_PORT=28082 \
-e CONNECT_GROUP_ID="quickstart" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="http://localhost:8081" \
-e CONNECT_INTERNAL_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_ZOOKEEPER_CONNECT="localhost:2181" \
tim/kafka-connect
Run Code Online (Sandbox Code Playgroud)
并得到
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing Schema registry url!
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:64)
at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:93)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:74)
Run Code Online (Sandbox Code Playgroud)
无法确定在哪里添加“ schema.registry.url” conf!
我在新的linux centos 7上安装了kafka confluent oss 4.0,但是kafka connect无法启动.
重现步骤 :
- Install Oracle JDK 8
- Copy confluent-4.0.0 folder on opt/confluent-4.0.0
- Run /opt/confluent-4.0.0/confluent start
Run Code Online (Sandbox Code Playgroud)
结果:
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
\Kafka Connect failed to start
connect is [DOWN]
Run Code Online (Sandbox Code Playgroud)
错误日志(connect.stderr):
Exception in thread "main" java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method) …Run Code Online (Sandbox Code Playgroud) 在生成/使用消息时,添加架构注册表的附加层(即故障点)是否有任何好处?如果服务中断,则消息将不会被使用或产生。通过使用不减少故障点的Schema Registry,使用Kafka的系统难道不会更容易出错吗?
假设一个3个分区的kafka主题被3个消费者组成的消费者组消费。在云环境中,如果有新的消费者扩大规模,现在该组中有4个消费者。在这种情况下会发生什么?
要么
当我要发送包含字段类型为long的AVRO消息时,出现以下错误:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime
Run Code Online (Sandbox Code Playgroud)
我使用Confluent 3.2.0和Apache Spark 2.2.0。在Spark Job中抛出此错误,该Spark Job处理AVRO消息并将其打印在控制台中。在AVRO模式中,相应的字段定义如下:
{\"name\": \"event_time\", \"type\": { \"type\" : \"long\", \"logicalType\": \"timestamp-millis\"}}
Run Code Online (Sandbox Code Playgroud)
在从.avsc文件生成的Java类中,该字段的定义如下:
private DateTime event_time;
Run Code Online (Sandbox Code Playgroud) 我在我的集群的一个节点上运行了confluent-ksql-server.我们可以让ksql由kafka集群外的特定主机/机器连接吗?
PS-这是为开发人员提供ksql访问权限
谢谢 !
我正在开发一个在一分钟内处理很少记录的应用程序.请求率约为每分钟2次.这些请求是为一组数据创建和更新的.要求是交付保证,可靠的交付,订购保证和防止任何消息丢失.
我们的团队决定使用Kafka,我认为它不适合用例,因为Kafka最适合流数据.相反,我们可以更好地使用传统的消息模型.虽然卡夫卡确实提供每个分区排序,同样可以是传统的邮件系统上,如果消息的数目是低的和数据的来源也低实现.这是一个公平的陈述吗?
我们使用Kafka流来处理数据,处理要求我们对外部系统进行查找.如果外部系统不可用,那么当外部查找系统可用时,我们将停止处理并自动将消息传递到目标系统.目前,我们通过在处理过程中不断循环并检查系统是否可用来停止处理.a)这是在处理过程中中途停止流的最佳方法,以便它不再接收任何消息吗?b)数据流框架是否设计为在中途停止或暂停,以便它们在一段时间内完全停止使用流?
我正在尝试连接一个表和一个流,并创建另一个表,如下所示:
CREATE TABLE table_fx_latest AS
SELECT t1.currencyid,
t1.maxtimestamp,
t2.midprice
FROM stream_fx2 t2 LEFT JOIN table_fx_latest3 t1
ON t1.currencyid = t2.currencyid AND
t1.timestamp = t2.maxtimestamp
GROUP BY t1.currencyid,
t1.maxtimestamp,
t2.midprice;
Run Code Online (Sandbox Code Playgroud)
但是报告了以下错误:
Cannot RUN execution plan for this statement, CreateTableAsSelect{name=TABLE_FX_LATEST_PRICE6, query=Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=[T1.CURRENCYID T1_CURRENCYID, T1.MAXTIMESTAMP MAXTIMESTAMP, T2.MIDPRICE MIDPRICE]}, from=Join{type=LEFT, left=AliasedRelation{relation=STREAM_FX2, alias=T2}, right=AliasedRelation{relation=TABLE_FX_LATEST3, alias=T1}, criteria=Optional[JoinOn{((T1.CURRENCYID = T2.CURRENCYID) AND (T2.TIMESTAMP = T1.MAXTIMESTAMP))}]}, =null, where=null, groupBy=Optional[GroupBy{isDistinct=false, groupingElements=[SimpleGroupBy{columns=[T1.CURRENCYID]}, SimpleGroupBy{columns=[T1.MAXTIMESTAMP]}, SimpleGroupBy{columns=[T2.MIDPRICE]}]}], having=null, orderBy=[], limit=null}, orderBy=[]}, notExists=false, properties={}}
Caused by: io.confluent.ksql.parser.tree.LogicalBinaryExpression cannot be cast to io.confluent.ksql.parser.tree.ComparisonExpression
Run Code Online (Sandbox Code Playgroud)
这是stream_fx2流和 …
我正在使用汇合来连接我的数据库和ES,但出现以下异常:
org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我在卡夫卡连接-JDBC配置是:。
name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app
Run Code Online (Sandbox Code Playgroud)
我的kafka-connect Elasticsearch配置是:
name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id
Run Code Online (Sandbox Code Playgroud)
我的表结构是:
employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1 | Bala | "2017-05-18 …Run Code Online (Sandbox Code Playgroud) jdbc elasticsearch apache-kafka confluent apache-kafka-connect
我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.
我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.
我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAM和EXPENSE_TABLE
ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
和
ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
当我执行以下Query时,它给我空指针异常.我尝试了以下查询.
1:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)
2: …
apache-kafka ×11
confluent ×11
ksql ×3
avro ×1
integration ×1
jdbc ×1
messaging ×1
streaming ×1