我在flatfile中收集一些系统数据,其格式如下:
YYYY-MM-DD-HH24:MI:SS DD1 DD2 DD3 DD4
Run Code Online (Sandbox Code Playgroud)
其中DD1-DD4是四项数据.该文件的一个示例是:
2011-02-01-13:29:53 16 8 7 68
2011-02-01-13:29:58 13 8 6 110
2011-02-01-13:30:03 26 25 1 109
2011-02-01-13:30:08 13 12 1 31
2011-02-01-13:30:14 192 170 22 34
2011-02-01-13:30:19 16 16 0 10
2011-02-01-13:30:24 137 61 76 9
2011-02-01-13:30:29 452 167 286 42
2011-02-01-13:30:34 471 177 295 11
2011-02-01-13:30:39 502 192 309 10
Run Code Online (Sandbox Code Playgroud)
该文件超过200万行,每五秒钟有一个数据点.
我需要绘制这些数据,以便能够从中获得意义.
我试过的
目前我已尝试使用各种unix工具gnuplot和rrdtool(awk,sed等).这两种方法都有效,但每次我想以不同的方式查看数据时,似乎都需要大量的切割和重新编辑数据.我的直觉是rrdtool是正确的方法,但目前我正在努力将数据快速地加入其中,部分原因是因为我必须将我的时间戳转换为Unix时代.我的理解是,如果我决定我想要一个新的聚合粒度,我必须重建rrd(这对于实时收集是有意义的,但不是像这样的追溯加载).这些事情让我觉得我可能正在使用错误的工具.
将数据收集到平面文件是固定的 - 例如,我无法将集合直接传递到rrdtool.
我的问题
我希望人们对制图的最佳方式有所了解.我有这些要求:
有什么建议?
在启动 Kafka-Connect 时,我看到了很多警告
10:33:56.706 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.topic' was supplied but isn't a known config.
10:33:56.707 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'group.id' was supplied but isn't a known config.
10:33:56.708 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'status.storage.topic' was supplied but isn't a known config.
10:33:56.709 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.replication.factor' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] …Run Code Online (Sandbox Code Playgroud) 我正在开发一个 Kafka-Stream 应用程序,它将从输入 Kafka 主题中读取消息并过滤不需要的数据并推送到输出 Kafka 主题。
卡夫卡流配置:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");
return new KafkaStreamsConfiguration(streamsConfiguration);
}
Run Code Online (Sandbox Code Playgroud)
KStream过滤逻辑:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/** Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " …Run Code Online (Sandbox Code Playgroud)
我试图设置 Kafka Connect 以运行 ElasticsearchSinkConnector。
Kafka 设置,由 3 个使用 Kerberos、SSL 和 ACL 保护的代理组成。
到目前为止,我一直在尝试使用 docker/docker-compose(Confluent docker-image 5.4 with Kafka 2.4)连接到远程 kafka 安装(Kafka 2.0.1 - 实际上是我们的生产环境)运行连接框架和 elasticserch-server 本地)。
KAFKA_OPTS: -Djava.security.krb5.conf=/etc/kafka-connect/secrets/krb5.conf
CONNECT_BOOTSTRAP_SERVERS: srv-kafka-1.XXX.com:9093,srv-kafka-2.XXX.com:9093,srv-kafka-3.XXX.com:9093
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: user-grp
CONNECT_CONFIG_STORAGE_TOPIC: test.internal.connect.configs
CONNECT_OFFSET_STORAGE_TOPIC: test.internal.connect.offsets
CONNECT_STATUS_STORAGE_TOPIC: test.internal.connect.status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: srv-kafka-1.XXX.com:2181,srv-kafka-2.XXX.com:2181,srv-kafka-3.XXX.com:2181
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/kafka-connect/secrets/kafka-connect.keytab" \
principal="<principal>;
CONNECT_SASL_MECHANISM: GSSAPI
CONNECT_SSL_TRUSTSTORE_LOCATION: <path_to_truststore.jks>
CONNECT_SSL_TRUSTSTORE_PASSWORD: <PWD> …Run Code Online (Sandbox Code Playgroud) 当试图描述 kafka-consumer-group 时,消息“Consumer group has no active Members”是什么意思?
深入的解释表示赞赏。
我曾开发过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流只不过是消费者,它消费来自Kafka的实时事件。因此,我无法弄清楚何时使用 Kafka 流,或者为什么我们应该使用 Kafka 流,因为我们可以在消费者端执行所有转换。
我想了解 Kafka 流和 Kafka 消费者在实现方面的主要区别,以及如何决定在不同用例中应该使用什么。
预先感谢您的答复。
我在 WSL 上运行 Kafka。我正在尝试制作这样的简单生产者(我正在使用 intellj)
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
//create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//create the producer
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//create a producer record
ProducerRecord<String,String> record =
new ProducerRecord<String, String>("first_topic","hallo world");
//send data
producer.send(record);
//flush + close
producer.flush();
producer.close();
}
}
Run Code Online (Sandbox Code Playgroud)
但有一个问题,当我尝试运行代码时,它显示此错误
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] …Run Code Online (Sandbox Code Playgroud) 我正在 obiee 11.1.1.7.14 中配置 SSO,其中我在配置 krb5.conf 和执行 kinit 命令的步骤中遇到了问题。
关于 Active Directory 的一些注意事项
我已经创建了 keytab 文件并将其放置在 obiee 域主目录中,然后通过将 kdc 作为域控制器的 ip 地址之一和 admin-server 作为域控制器的名称来修改 krb5.conf。并在执行
kinit -V -k -t /location/keytabfile.keytab HTTP/obiee_host_name
我收到错误消息“ kinit(v5):获取初始凭据时在 Kerberos 数据库中找不到客户端”。请分享您的想法/建议以解决此问题。
提前致谢
部署了 Strimzi Kafka、Strimzi Zookeeper 和 Debezium mongodb 连接器,并配置了 Debezium mongodb,
curl 'http://my-connect-cluster-connect-api:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "MainRepSet/mongod-0.mongodb-service.kafka.svc.cluster.local:27017,mongod-1.mongodb-service.kafka.svc.cluster.local:27017,mongod-2.mongodb-service.kafka.svc.cluster.local:27017",
"mongodb.name": "MainRepSet",
"collection.whitelist": "springdatabase[.]*",
"mongodb.user": "springuser",
"mongodb.password": "password"
}
}'
Run Code Online (Sandbox Code Playgroud)
但得到了身份验证异常,
2019-01-29 13:13:40,170 ERROR Error while reading the 'shards' collection in the 'config' database: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongod-2.mongodb-service.kafka.svc.cluster.local:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='springuser', source='admin', password=<hidden>, mechanismProperties={}}}, caused by …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Apache Iceberg 目录和 Hive Metastore 设置 Flink SQL,但没有成功。以下是我在干净的 Flink 1.18.1 安装中采取的步骤,以及我得到的错误。
\n运行 Hive MetaStore:
\ndocker run --rm --detach --name hms-standalone \\\n --publish 9083:9083 \\\n ghcr.io/recap-build/hive-metastore-standalone:latest \nRun Code Online (Sandbox Code Playgroud)\n使用 Docker 运行 MinIO:
\ndocker run --rm --detach --name minio \\\n -p 9001:9001 -p 9000:9000 \\\n -e "MINIO_ROOT_USER=admin" \\\n -e "MINIO_ROOT_PASSWORD=password" \\\n minio/minio server /data --console-address ":9001"\nRun Code Online (Sandbox Code Playgroud)\n配置一个存储桶:
\ndocker exec minio \\\n mc config host add minio http://localhost:9000 admin password\ndocker exec minio \\\n mc mb …Run Code Online (Sandbox Code Playgroud) apache-kafka ×6
apache-flink ×1
debezium ×1
flat-file ×1
flink-sql ×1
gnuplot ×1
graph ×1
java ×1
kerberos ×1
mongodb ×1
obiee ×1
r ×1
rrdtool ×1
spring-kafka ×1
windows ×1