标签: apache-pulsar

Apache pulsar无限保留

在Apache Pulsar主题文档中,它说我们可以将主题时间保留策略设置为-1,以实现无限时间保留,具有无限保留的缺点是什么?我们可以使用脉冲星作为消息存储,其中数据永远存在于主题中并构建事件源他们周围的应用?

apache-pulsar

4
推荐指数
1
解决办法
289
查看次数

如何避免在 Apache Pulsar 中自动删除非活动主题

我有一个应用程序,它在特定主题下向 Pulsar 生成消息,并在完成后关闭该应用程序;同时,不存在阅读此主题的消费者。

过了一会儿,当我创建一个consumer,想把写入的数据读出来的时候,发现我写的topic被Pulsar删除了,所有的数据都丢失了。

如何禁用 Pulsar 中非活动主题的自动删除?

message-queue apache-pulsar

4
推荐指数
1
解决办法
994
查看次数

如何在 Pulsar 中将订阅光标倒回到特定时间?

在文档中: https: //pulsar.apache.org/docs/en/concepts-clients/,它解释了您可以从最早或最晚的位置启动消费者,也可以指定消息id。能否支持这样的需求,即指定一个时间点并从那里开始,比如现在我想迭代昨天12:00到昨天13:00的消息?

apache-pulsar

3
推荐指数
1
解决办法
1217
查看次数

删除/使 Apache Pulsar 主题中的所有消息过期的最有效方法是什么?

我试图找出从 Pulsar 主题中删除所有消息(无论是逻辑上还是物理上)的最佳方法是什么,以便它们不再可以通过订阅使用?

我知道我们可以简单地做到$ pulsar-admin persistent delete persistent://tenant/namespace/topic

但是,这个解决方案有一些缺点:它完全删除了主题(因此我们必须稍后重新创建它),然后不应该有活动的客户端连接到它(即:订阅或生产者)。

或者,是否有一种方法可以以编程方式使两个 MessageId 之间的所有消息对订阅不可用?

谢谢

apache-pulsar

3
推荐指数
1
解决办法
4188
查看次数

如何将 Apache Pulsar 与 Spring Cloud Stream 结合使用?

Apache Pulsar 没有适用于 Spring Cloud Stream 的官方库。我找到了Spring for Apache Pulsar,但它仅与 Spring Boot 有关。在它的 GitHub 存储库中,有一个 [spring-pulsar-spring-cloud-stream-binder],但在 mvnrepository 中找不到。那么我该如何使用它呢?

有没有在 Spring Cloud Stream 中集成 Apache Pulsar 的好方法?

spring-cloud-stream apache-pulsar spring-pulsar

3
推荐指数
1
解决办法
325
查看次数

Debezium 错误,此连接器不知道架构

我有一个使用 Debezium 的项目,主要基于此示例,然后将其连接到 Apache Pulsar。

\n\n

我改变了一些配置。该文件现在如下所示:

\n\n
database.history=io.debezium.relational.history.MemoryDatabaseHistory\nconnector.class=io.debezium.connector.mysql.MySqlConnector\noffset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore\noffset.storage.file.filename=offset.dat\noffset.flush.interval.ms=5000\nname=mysql-dbz-connector\ndatabase.hostname={ip}\ndatabase.port=3308\ndatabase.user={user}\ndatabase.password={pass}\ndatabase.dbname=database\ndatabase.server.name=test\ntable.whitelist=database.history_table,database.project_table\nsnapshot.mode=schema_only\nschemas.enable=false        \ninclude.schema.changes=false\npulsar.topic=persistent://public/default/{0}\npulsar.broker.address=pulsar://{ip}:6650\ndatabase.history=io.debezium.relational.history.MemoryDatabaseHistory\n
Run Code Online (Sandbox Code Playgroud)\n\n

正如您可能理解的,我想要做的是监视数据库的修改history_table和修改,然后将有效负载写入 Apache Pulsar。project_table

\n\n

我的问题如下。无论我使用什么快照模式,当写入偏移量时,我都无法重新启动 Debezium,而不会在下一次数据库更新时出现错误。

\n\n
Encountered change event for table database.history_table whose schema isn\'t known to this connector\n
Run Code Online (Sandbox Code Playgroud)\n\n

它仅发生在现有offset.dat文件上。我认为这是因为文件中的架构为空offset.dat。以这个为例:

\n\n
\xc2\xa8\xc3\x8csrjava.util.HashMap\xe2\x81\x84\xc2\xa1\xe2\x88\x9a`\xe2\x80\x94F\nloadFactorI thresholdxp?@wur[B\xc2\xa8\xc3\x9b\xc2\xafT\xe2\x80\xa1xpG{"schema":null,"payload":["mysql-dbz-connector",{"server":"test"}]}uq~U{"ts_sec":1563802215,"file":"database-bin.000005","pos":79574,"server_id":1,"event":1}x\n
Run Code Online (Sandbox Code Playgroud)\n\n

我首先怀疑我用来使 JSON 更简洁的schemas.enable=falseinclude.schema.changes=false参数,但它们的值不会改变文件中的任何内容offset.dat

\n

database jboss-tools debezium apache-pulsar

2
推荐指数
1
解决办法
5006
查看次数

如何缩小 pulsar BookKeeper 集群

问题

一个月前,我向 pulsar 添加了 2 个 bookie,但我意识到这是浪费资源。我怎样才能将博彩公司从 7 个减少到 3 个。

我的努力

通过 bookkeeper 的文档,我发现当 bookie 崩溃时,自动恢复会将 BookKeeper 集群中的所有分类帐恢复为完全复制。但如果我一次合上四本书,我想有些账本可能会永远丢失。

bookkeeper-server/bin/bookkeeper shell recover \
zk1.example.com:2181 \ # IP and port for ZooKeeper
192.168.1.10:3181      # IP and port for the failed bookie
Run Code Online (Sandbox Code Playgroud)

所以我想我可以一一关闭它,但是我怎么知道恢复是否完成呢?

apache-pulsar

2
推荐指数
1
解决办法
575
查看次数

使用 Apache Pulsar 中注册的模式发布到主题

如Pulsar Schema 注册表文档中的示例所示

\n\n
Producer<User> producer = client.newProducer(JSONSchema.of(User.class))\n    .topic(topic)\n    .create();\nUser user = new User(\xe2\x80\x9cTom\xe2\x80\x9d, 28);\nproducer.send(User);\n
Run Code Online (Sandbox Code Playgroud)\n\n

您可以使用 Java 客户端注册生产者和消费者的架构。还提到其他语言的客户端不支持模式注册。

\n\n

现在是否可以从 Python API 生产者发送关于 Pulsar 主题的消息,该消息将由具有注册模式的消费者使用?例如

\n\n
processor = PulsarClient.builder()\n            .serviceUrl("pulsar://pulsarhost:6650")\n            .build()\n            .newConsumer(JSONSchema.of(User.class))\n            .topic("sometopic")\n            .subscriptionName("somesubscription")\n            .subscribe();\n
Run Code Online (Sandbox Code Playgroud)\n\n

Python:\n 导入脉冲星

\n\n
client = pulsar.Client(\'pulsar://pulsarhost:6650\')\n\nproducer = client.create_producer(\'sometopic\')\nclient.close()\n
Run Code Online (Sandbox Code Playgroud)\n

python-pulsar apache-pulsar

2
推荐指数
1
解决办法
1661
查看次数

Apache Pulsar 消息传递语义

我浏览了消息传递语义的 Apache Pulsar 文档。Apache 函数提到的交付语义(至少一次,最多一次和有效一次),如果我们不使用 Apache 函数,那么可用的所有不同交付语义是什么?

queue message-queue apache-pulsar

2
推荐指数
1
解决办法
902
查看次数

Pulsar 上的同一主题是否可以有多个制作者?

我知道你可以将主题订阅设置为共享订阅,以允许多个消费者关注同一主题。这也可以为多个生产者完成吗?

出于某种原因,当我尝试这样做时,我得到了Producer with name '<topic_name>' is already connected to topic

pulsar apache-pulsar

2
推荐指数
1
解决办法
1460
查看次数

Apache Pulsar 中的主题复制

有关 Pulsar 中复制的文档描述性并不强。我想知道复制的详细工作原理以及命名空间的持久性策略如何发挥作用。文档讨论了这些参数

  • bookkeeper-ack-quorom:等待每个条目的ack(保证副本)数量
  • bookkeeper-ensemble:用于某个主题的 bookie 数量
  • bookkeeper-write-quorum:每个条目要写入多少次

是否bookkeeper-ack-quorom意味着对客户端的确认会延迟,直到一定数量的博彩公司将条目写入磁盘?

bookkeeper-ensemble和 和有什么区别bookkeeper-write-quorum

假设我有 3 个 bookie,并且我希望命名空间中的主题驻留在每个 bookie 上,然后我将这两个值都设置为3?

apache-pulsar

1
推荐指数
1
解决办法
574
查看次数

使用 Kubernetes Ingress 公开 Pulsar 代理

我使用 helm 部署了 pulsar,但没有激活 tls。现在我想使用 nginx 入口来公开它,而不是第一次使用 tls。它不起作用,我在客户端得到这个:

Got exception TooLongFrameException : Adjusted frame length exceeds 5253120: 1213486164 - discarded
Run Code Online (Sandbox Code Playgroud)

是否缺少特定注释?

第二次,我想使用 ssl 证书以安全的方式公开它。Ingress 可以与 pulsar+ssl 协议一起使用吗?我还需要具体注释吗?

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: pulsar
  namespace: default
spec:
  rules:
  - host: pulsar.myurl.com
    http:
      paths:
      - backend:
          serviceName: pulsar-proxy
          servicePort: 6650
  tls:
  - hosts:
    - pulsar.myurl.com
    secretName: pulsar.myurl.com.crt
Run Code Online (Sandbox Code Playgroud)

ssl apache-pulsar kubernetes-ingress nginx-ingress

1
推荐指数
1
解决办法
1022
查看次数

Apache Bookkeeper 删除日志

在 Apache 簿记员中,我们如何删除分类帐中的日志条目?如果壁架是不可变的并且无法删除条目,pulsar 如何从博彩公司删除过期消息?

apache-pulsar apache-bookkeeper

0
推荐指数
1
解决办法
647
查看次数