在Apache Pulsar主题文档中,它说我们可以将主题时间保留策略设置为-1,以实现无限时间保留,具有无限保留的缺点是什么?我们可以使用脉冲星作为消息存储,其中数据永远存在于主题中并构建事件源他们周围的应用?
我有一个应用程序,它在特定主题下向 Pulsar 生成消息,并在完成后关闭该应用程序;同时,不存在阅读此主题的消费者。
过了一会儿,当我创建一个consumer,想把写入的数据读出来的时候,发现我写的topic被Pulsar删除了,所有的数据都丢失了。
如何禁用 Pulsar 中非活动主题的自动删除?
在文档中: https: //pulsar.apache.org/docs/en/concepts-clients/,它解释了您可以从最早或最晚的位置启动消费者,也可以指定消息id。能否支持这样的需求,即指定一个时间点并从那里开始,比如现在我想迭代昨天12:00到昨天13:00的消息?
我试图找出从 Pulsar 主题中删除所有消息(无论是逻辑上还是物理上)的最佳方法是什么,以便它们不再可以通过订阅使用?
我知道我们可以简单地做到$ pulsar-admin persistent delete persistent://tenant/namespace/topic。
但是,这个解决方案有一些缺点:它完全删除了主题(因此我们必须稍后重新创建它),然后不应该有活动的客户端连接到它(即:订阅或生产者)。
或者,是否有一种方法可以以编程方式使两个 MessageId 之间的所有消息对订阅不可用?
谢谢
Apache Pulsar 没有适用于 Spring Cloud Stream 的官方库。我找到了Spring for Apache Pulsar,但它仅与 Spring Boot 有关。在它的 GitHub 存储库中,有一个 [spring-pulsar-spring-cloud-stream-binder],但在 mvnrepository 中找不到。那么我该如何使用它呢?
有没有在 Spring Cloud Stream 中集成 Apache Pulsar 的好方法?
我有一个使用 Debezium 的项目,主要基于此示例,然后将其连接到 Apache Pulsar。
\n\n我改变了一些配置。该文件现在如下所示:
\n\ndatabase.history=io.debezium.relational.history.MemoryDatabaseHistory\nconnector.class=io.debezium.connector.mysql.MySqlConnector\noffset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore\noffset.storage.file.filename=offset.dat\noffset.flush.interval.ms=5000\nname=mysql-dbz-connector\ndatabase.hostname={ip}\ndatabase.port=3308\ndatabase.user={user}\ndatabase.password={pass}\ndatabase.dbname=database\ndatabase.server.name=test\ntable.whitelist=database.history_table,database.project_table\nsnapshot.mode=schema_only\nschemas.enable=false \ninclude.schema.changes=false\npulsar.topic=persistent://public/default/{0}\npulsar.broker.address=pulsar://{ip}:6650\ndatabase.history=io.debezium.relational.history.MemoryDatabaseHistory\nRun Code Online (Sandbox Code Playgroud)\n\n正如您可能理解的,我想要做的是监视数据库的修改history_table和修改,然后将有效负载写入 Apache Pulsar。project_table
我的问题如下。无论我使用什么快照模式,当写入偏移量时,我都无法重新启动 Debezium,而不会在下一次数据库更新时出现错误。
\n\nEncountered change event for table database.history_table whose schema isn\'t known to this connector\nRun Code Online (Sandbox Code Playgroud)\n\n它仅发生在现有offset.dat文件上。我认为这是因为文件中的架构为空offset.dat。以这个为例:
\xc2\xa8\xc3\x8csrjava.util.HashMap\xe2\x81\x84\xc2\xa1\xe2\x88\x9a`\xe2\x80\x94F\nloadFactorI thresholdxp?@wur[B\xc2\xa8\xc3\x9b\xc2\xafT\xe2\x80\xa1xpG{"schema":null,"payload":["mysql-dbz-connector",{"server":"test"}]}uq~U{"ts_sec":1563802215,"file":"database-bin.000005","pos":79574,"server_id":1,"event":1}x\nRun Code Online (Sandbox Code Playgroud)\n\n我首先怀疑我用来使 JSON 更简洁的schemas.enable=false或include.schema.changes=false参数,但它们的值不会改变文件中的任何内容offset.dat。
一个月前,我向 pulsar 添加了 2 个 bookie,但我意识到这是浪费资源。我怎样才能将博彩公司从 7 个减少到 3 个。
通过 bookkeeper 的文档,我发现当 bookie 崩溃时,自动恢复会将 BookKeeper 集群中的所有分类帐恢复为完全复制。但如果我一次合上四本书,我想有些账本可能会永远丢失。
bookkeeper-server/bin/bookkeeper shell recover \
zk1.example.com:2181 \ # IP and port for ZooKeeper
192.168.1.10:3181 # IP and port for the failed bookie
Run Code Online (Sandbox Code Playgroud)
所以我想我可以一一关闭它,但是我怎么知道恢复是否完成呢?
如Pulsar Schema 注册表文档中的示例所示
\n\nProducer<User> producer = client.newProducer(JSONSchema.of(User.class))\n .topic(topic)\n .create();\nUser user = new User(\xe2\x80\x9cTom\xe2\x80\x9d, 28);\nproducer.send(User);\nRun Code Online (Sandbox Code Playgroud)\n\n您可以使用 Java 客户端注册生产者和消费者的架构。还提到其他语言的客户端不支持模式注册。
\n\n现在是否可以从 Python API 生产者发送关于 Pulsar 主题的消息,该消息将由具有注册模式的消费者使用?例如
\n\nprocessor = PulsarClient.builder()\n .serviceUrl("pulsar://pulsarhost:6650")\n .build()\n .newConsumer(JSONSchema.of(User.class))\n .topic("sometopic")\n .subscriptionName("somesubscription")\n .subscribe();\nRun Code Online (Sandbox Code Playgroud)\n\nPython:\n 导入脉冲星
\n\nclient = pulsar.Client(\'pulsar://pulsarhost:6650\')\n\nproducer = client.create_producer(\'sometopic\')\nclient.close()\nRun Code Online (Sandbox Code Playgroud)\n 我浏览了消息传递语义的 Apache Pulsar 文档。Apache 函数提到的交付语义(至少一次,最多一次和有效一次),如果我们不使用 Apache 函数,那么可用的所有不同交付语义是什么?
我知道你可以将主题订阅设置为共享订阅,以允许多个消费者关注同一主题。这也可以为多个生产者完成吗?
出于某种原因,当我尝试这样做时,我得到了Producer with name '<topic_name>' is already connected to topic
有关 Pulsar 中复制的文档描述性并不强。我想知道复制的详细工作原理以及命名空间的持久性策略如何发挥作用。文档讨论了这些参数
bookkeeper-ack-quorom:等待每个条目的ack(保证副本)数量bookkeeper-ensemble:用于某个主题的 bookie 数量bookkeeper-write-quorum:每个条目要写入多少次是否bookkeeper-ack-quorom意味着对客户端的确认会延迟,直到一定数量的博彩公司将条目写入磁盘?
bookkeeper-ensemble和 和有什么区别bookkeeper-write-quorum?
假设我有 3 个 bookie,并且我希望命名空间中的主题驻留在每个 bookie 上,然后我将这两个值都设置为3?
我使用 helm 部署了 pulsar,但没有激活 tls。现在我想使用 nginx 入口来公开它,而不是第一次使用 tls。它不起作用,我在客户端得到这个:
Got exception TooLongFrameException : Adjusted frame length exceeds 5253120: 1213486164 - discarded
Run Code Online (Sandbox Code Playgroud)
是否缺少特定注释?
第二次,我想使用 ssl 证书以安全的方式公开它。Ingress 可以与 pulsar+ssl 协议一起使用吗?我还需要具体注释吗?
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: pulsar
namespace: default
spec:
rules:
- host: pulsar.myurl.com
http:
paths:
- backend:
serviceName: pulsar-proxy
servicePort: 6650
tls:
- hosts:
- pulsar.myurl.com
secretName: pulsar.myurl.com.crt
Run Code Online (Sandbox Code Playgroud) 在 Apache 簿记员中,我们如何删除分类帐中的日志条目?如果壁架是不可变的并且无法删除条目,pulsar 如何从博彩公司删除过期消息?