我正在构建一个cdc管道来通过maxwell读取mysql binlog并将它们放入kafka中,我的压缩类型在maxwell配置中是snappy。但是在我的spring项目的消费者端我收到了这个错误。
org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] no native library is found for os.name=Mac and os.arch=aarch64
at org.xerial.snappy.SnappyLoader.findNativeLibrary(SnappyLoader.java:361) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:195) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at org.xerial.snappy.SnappyLoader.loadSnappyApi(SnappyLoader.java:167) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at org.xerial.snappy.Snappy.init(Snappy.java:69) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at org.xerial.snappy.Snappy.<clinit>(Snappy.java:46) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466) ~[snappy-java-1.1.7.7.jar:1.1.7.7]
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:271) ~[na:na]
at org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint(ByteUtils.java:170) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:205) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:296) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.record.DefaultRecordBatch$2.doReadRecord(DefaultRecordBatch.java:278) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.record.DefaultRecordBatch$StreamRecordIterator.readNext(DefaultRecordBatch.java:617) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:582) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:551) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1578) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1613) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:687) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:638) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1299) ~[kafka-clients-2.7.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[kafka-clients-2.7.2.jar:na]
at …Run Code Online (Sandbox Code Playgroud) 我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
Run Code Online (Sandbox Code Playgroud)
我的 Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
Run Code Online (Sandbox Code Playgroud)
我的输入输出通道接口
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)
我的控制台日志--
:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 …
spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka