小编Rob*_*att的帖子

绘制超过200万行平面文件数据的最快速,最灵活的方式?

我在flatfile中收集一些系统数据,其格式如下:

YYYY-MM-DD-HH24:MI:SS DD1 DD2 DD3 DD4
Run Code Online (Sandbox Code Playgroud)

其中DD1-DD4是四项数据.该文件的一个示例是:

2011-02-01-13:29:53 16 8 7 68
2011-02-01-13:29:58 13 8 6 110
2011-02-01-13:30:03 26 25 1 109
2011-02-01-13:30:08 13 12 1 31
2011-02-01-13:30:14 192 170 22 34
2011-02-01-13:30:19 16 16 0 10
2011-02-01-13:30:24 137 61 76 9
2011-02-01-13:30:29 452 167 286 42
2011-02-01-13:30:34 471 177 295 11
2011-02-01-13:30:39 502 192 309 10
Run Code Online (Sandbox Code Playgroud)

该文件超过200万行,每五秒钟有一个数据点.

我需要绘制这些数据,以便能够从中获得意义.

我试过的

目前我已尝试使用各种unix工具gnuplot和rrdtool(awk,sed等).这两种方法都有效,但每次我想以不同的方式查看数据时,似乎都需要大量的切割和重新编辑数据.我的直觉是rrdtool是正确的方法,但目前我正在努力将数据快速地加入其中,部分原因是因为我必须将我的时间戳转换为Unix时代.我的理解是,如果我决定我想要一个新的聚合粒度,我必须重建rrd(这对于实时收集是有意义的,但不是像这样的追溯加载).这些事情让我觉得我可能正在使用错误的工具.

将数据收集到平面文件是固定的 - 例如,我无法将集合直接传递到rrdtool.

我的问题

我希望人们对制图的最佳方式有所了解.我有这些要求:

  1. 它应该尽可能快地创建一个图形(不仅仅是渲染,而是设置为渲染)
  2. 它应该尽可能灵活 - 我需要使用图表来计算出数据的最佳粒度(5秒可能过于细化)
  3. 它应该能够在必要时聚合(MAX/AVG/etc)
  4. 它应该是可重复的,并且当它们进入时会有新的数据文件
  5. 理想情况下,我希望能够在本周与DD1重叠DD1与DD2或DD1
  6. Unix或Windows,不在乎.首选*nix虽然:-)

有什么建议?

r graph gnuplot flat-file rrdtool

29
推荐指数
3
解决办法
4414
查看次数

Kafka 连接:提供了配置 XXX 但不是 AdminClientConfig 中的已知配置

在启动 Kafka-Connect 时,我看到了很多警告

10:33:56.706 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.topic' was supplied but isn't a known config.
10:33:56.707 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'group.id' was supplied but isn't a known config.
10:33:56.708 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'status.storage.topic' was supplied but isn't a known config.
10:33:56.709 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.replication.factor' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

17
推荐指数
1
解决办法
2万
查看次数

Kafka 流异常:GroupAuthorizationException

我正在开发一个 Kafka-Stream 应用程序,它将从输入 Kafka 主题中读取消息并过滤不需要的数据并推送到输出 Kafka 主题。

卡夫卡流配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}
Run Code Online (Sandbox Code Playgroud)

KStream过滤逻辑:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /** Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams spring-kafka

7
推荐指数
1
解决办法
1万
查看次数

Kafka 连接,Bootstrap 代理断开连接


我试图设置 Kafka Connect 以运行 ElasticsearchSinkConnector。
Kafka 设置,由 3 个使用 Kerberos、SSL 和 ACL 保护的代理组成。

到目前为止,我一直在尝试使用 docker/docker-compose(Confluent docker-image 5.4 with Kafka 2.4)连接到远程 kafka 安装(Kafka 2.0.1 - 实际上是我们的生产环境)运行连接框架和 elasticserch-server 本地)。

KAFKA_OPTS: -Djava.security.krb5.conf=/etc/kafka-connect/secrets/krb5.conf
      CONNECT_BOOTSTRAP_SERVERS: srv-kafka-1.XXX.com:9093,srv-kafka-2.XXX.com:9093,srv-kafka-3.XXX.com:9093
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: user-grp
      CONNECT_CONFIG_STORAGE_TOPIC: test.internal.connect.configs
      CONNECT_OFFSET_STORAGE_TOPIC: test.internal.connect.offsets
      CONNECT_STATUS_STORAGE_TOPIC: test.internal.connect.status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: srv-kafka-1.XXX.com:2181,srv-kafka-2.XXX.com:2181,srv-kafka-3.XXX.com:2181
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
      CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
                                useKeyTab=true \
                                storeKey=true \
                                keyTab="/etc/kafka-connect/secrets/kafka-connect.keytab" \
                                principal="<principal>;
      CONNECT_SASL_MECHANISM: GSSAPI
      CONNECT_SSL_TRUSTSTORE_LOCATION: <path_to_truststore.jks>
      CONNECT_SSL_TRUSTSTORE_PASSWORD: <PWD> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect confluent-platform

7
推荐指数
1
解决办法
1万
查看次数

消费者组没有活跃成员 - Kafka

当试图描述 kafka-consumer-group 时,消息“Consumer group has no active Members”是什么意思?

深入的解释表示赞赏。

apache-kafka

6
推荐指数
1
解决办法
2万
查看次数

Kafka Stream 与 Kafka Consumer 如何决定使用什么

我曾开发过一些 Kafka 流应用程序和 Kafka 消费者应用程序。最后,Kafka流只不过是消费者,它消费来自Kafka的实时事件。因此,我无法弄清楚何时使用 Kafka 流,或者为什么我们应该使用 Kafka 流,因为我们可以在消费者端执行所有转换。

我想了解 Kafka 流和 Kafka 消费者在实现方面的主要区别,以及如何决定在不同用例中应该使用什么。

预先感谢您的答复。

apache-kafka kafka-consumer-api apache-kafka-streams

6
推荐指数
2
解决办法
3124
查看次数

在 WSL 上运行 Kafka 并在 Windows 上创建生产者

我在 WSL 上运行 Kafka。我正在尝试制作这样的简单生产者(我正在使用 intellj)

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {

        String bootstrapServers = "127.0.0.1:9092";

        //create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //create the producer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //create a producer record
        ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("first_topic","hallo world");

        //send data
        producer.send(record);

        //flush + close
        producer.flush();
        producer.close();
    }

}
Run Code Online (Sandbox Code Playgroud)

但有一个问题,当我尝试运行代码时,它显示此错误

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] …
Run Code Online (Sandbox Code Playgroud)

java windows apache-kafka

6
推荐指数
1
解决办法
2111
查看次数

kinit(v5):获取初始凭据时在 Kerberos 数据库中找不到客户端

我正在 obiee 11.1.1.7.14 中配置 SSO,其中我在配置 krb5.conf 和执行 kinit 命令的步骤中遇到了问题。

关于 Active Directory 的一些注意事项

  • 我们有多个域控制器,为了平衡请求,我们使用端口 3269 维护负载平衡器。
  • 并且 obiee 和 MSAD 之间的集成成功完成,负载均衡器名称为主机,端口为 3269。
  • 并且在 demotrust.jks 和 ovd 存储中添加了很少的证书,并且在新的提供程序中启用了 SSL。
  • 生成的keytab 文件放在obiee 域home 中,相应修改krb5.conf 和krb5Login.conf 文件。

我已经创建了 keytab 文件并将其放置在 obiee 域主目录中,然后通过将 kdc 作为域控制器的 ip 地址之一和 admin-server 作为域控制器的名称来修改 krb5.conf。并在执行

kinit -V -k -t /location/keytabfile.keytab HTTP/obiee_host_name

我收到错误消息“ kinit(v5):获取初始凭据时在 Kerberos 数据库中找不到客户端”。请分享您的想法/建议以解决此问题。

提前致谢

kerberos active-directory obiee

5
推荐指数
1
解决办法
3万
查看次数

通过 debezium mongodb 连接器连接到 mongodb 时身份验证失败

部署了 Strimzi Kafka、Strimzi Zookeeper 和 Debezium mongodb 连接器,并配置了 Debezium mongodb,

curl 'http://my-connect-cluster-connect-api:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '{
  "name": "mongodb-connector", 
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "MainRepSet/mongod-0.mongodb-service.kafka.svc.cluster.local:27017,mongod-1.mongodb-service.kafka.svc.cluster.local:27017,mongod-2.mongodb-service.kafka.svc.cluster.local:27017", 
    "mongodb.name": "MainRepSet", 
    "collection.whitelist": "springdatabase[.]*",
    "mongodb.user": "springuser",
    "mongodb.password": "password"
  }
}'
Run Code Online (Sandbox Code Playgroud)

但得到了身份验证异常,

2019-01-29 13:13:40,170 ERROR Error while reading the 'shards' collection in the 'config' database: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongod-2.mongodb-service.kafka.svc.cluster.local:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='springuser', source='admin', password=<hidden>, mechanismProperties={}}}, caused by …
Run Code Online (Sandbox Code Playgroud)

mongodb apache-kafka-connect debezium

5
推荐指数
1
解决办法
2754
查看次数

Flink 与 Iceberg Catalog 和 Hive Metastore:找不到 org.apache.hadoop.fs.s3a.S3AFileSystem

我正在尝试使用 Apache Iceberg 目录和 Hive Metastore 设置 Flink SQL,但没有成功。以下是我在干净的 Flink 1.18.1 安装中采取的步骤,以及我得到的错误。

\n

设置组件

\n

运行 Hive MetaStore:

\n
docker run --rm --detach --name hms-standalone \\\n           --publish 9083:9083 \\\n           ghcr.io/recap-build/hive-metastore-standalone:latest \n
Run Code Online (Sandbox Code Playgroud)\n

使用 Docker 运行 MinIO:

\n
docker run --rm --detach --name minio \\\n            -p 9001:9001 -p 9000:9000 \\\n            -e "MINIO_ROOT_USER=admin" \\\n            -e "MINIO_ROOT_PASSWORD=password" \\\n            minio/minio server /data --console-address ":9001"\n
Run Code Online (Sandbox Code Playgroud)\n

配置一个存储桶:

\n
docker exec minio \\\n    mc config host add minio http://localhost:9000 admin password\ndocker exec minio \\\n    mc mb …
Run Code Online (Sandbox Code Playgroud)

apache-flink hive-metastore flink-sql apache-iceberg

5
推荐指数
1
解决办法
314
查看次数