我正在开发一个由几个模块组成的应用程序,要求它们彼此共享信息。示例:发布/订阅场景,其中模块发布一些信息(例如状态变量),并且对特定信息感兴趣的模块将其获取。或者是一个请求/答复场景,感兴趣的模块会明确询问有关信息并得到答复。
我一直在研究不同的消息总线实现,即D-bus,ØMQ,RabbitMQ和QPID(后两者基于AMQP)。但是后来有人指出,为什么不尝试使用一些复杂而繁重的消息总线实现,我为什么不简单地使用多播来解决问题。
缺乏经验来查看多播是否真的可以解决我的问题,并了解这两种解决方案的优缺点,因此,我恳请专家帮助我。非常感谢。
我正处于一个项目的中间,我们将基于一系列基于OSGi服务的定制技术迁移一个主要的软件系统.为此,我们可能需要一种与OSGi服务兼容的消息总线.
开源解决方案将是首选,但不是必需的.
我查看了eventbus(按照/sf/answers/136741741/中的建议),但似乎效果不佳.
那么问题是,哪些技术与上述相匹配?
我在Microsoft.Azure.ServiceBus.Message
课堂上遇到了麻烦。我想创建一个包含有效负载对象的消息对象,然后从中读回该对象。在我当前的示例中,我什至没有通过真正的 Azure 总线发送消息;我只是在内存中创建它,然后尝试读取它。
我无法弄清楚我应该将消息正文读取为什么类型。我已经尝试过byte[]
,string
以及原始对象类型。在我的所有情况下,我都会收到一条XmlException
:“输入源格式不正确”。
有人可以告诉我在编码或解码消息时我做错了什么吗?
[DataContract]
public class Thingy
{
[DataMember]
public string Doodad { get; set; }
}
private static Message CreateMessage()
{
var entityMessage = new Thingy {Doodad = "foobar"};
var serializedMessageBody = JsonConvert.SerializeObject(entityMessage);
var contentType = typeof(Thingy).AssemblyQualifiedName;
var bytes = Encoding.UTF8.GetBytes(serializedMessageBody);
var message = new Message(bytes) {ContentType = contentType};
return message;
}
[Test]
public void ReadMessageBytes()
{
var message = CreateMessage();
var body = message.GetBody<byte[]>();
Console.WriteLine(body);
}
[Test]
public …
Run Code Online (Sandbox Code Playgroud) 我只是将gem添加到gemfile并写入js控制台:
MessageBus.subscribe('/my_channel', function(data){
alert(data);
});
Run Code Online (Sandbox Code Playgroud)
它抛出:
Started POST "/message-bus/4700f3c66b254e31875de6caeba011df/poll" for 127.0.0.1 at 2016-04-11 11:32:49 -0300
** [Airbrake] Notice was not sent due to configuration:
Environment Monitored? false
API key set? false
NotImplementedError (only partial hijack is supported.):
rack (1.6.4) lib/rack/handler/webrick.rb:76:in `block in service'
/home/alter/.rvm/gems/ruby-2.2.2@project/bundler/gems/message_bus-7a93b755b456/lib/message_bus/rack/middleware.rb:141:in `call'
/home/alter/.rvm/gems/ruby-2.2.2@project/bundler/gems/message_bus-7a93b755b456/lib/message_bus/rack/middleware.rb:141:in `call'
rack (1.6.4) lib/rack/session/abstract/id.rb:225:in `context'
rack (1.6.4) lib/rack/session/abstract/id.rb:220:in `call'
actionpack (4.2.1) lib/action_dispatch/middleware/cookies.rb:560:in `call'
activerecord (4.2.1) lib/active_record/query_cache.rb:36:in `call'
activerecord (4.2.1) lib/active_record/connection_adapters/abstract/connection_pool.rb:649:in `call'
activerecord (4.2.1) lib/active_record/migration.rb:378:in `call'
actionpack (4.2.1) lib/action_dispatch/middleware/callbacks.rb:29:in `block in call'
activesupport (4.2.1) lib/active_support/callbacks.rb:88:in …
Run Code Online (Sandbox Code Playgroud) 我希望优化当前使用HTTP/REST进行内部节点到节点通信的微服务架构.
一种选择是在服务中实现背压功能,(例如)通过将类似Quasar的东西集成到堆栈中.这无疑会改善一切.但我看到了几个挑战.一个是,异步客户端线程是瞬态的(在内存中),在客户端故障(崩溃)时,这些重试线程将丢失.第二,理论上,如果目标服务器停机一段时间,客户端最终可能会尝试重试OOM,因为线程最终受到限制,甚至是Quasar Fibers.
我知道这有点偏执,但我想知道基于队列的替代方案是否会在非常大的范围内更有利.
它仍然像Quasar /光纤一样异步工作,除了a)队列是集中管理的并且离开客户端JVM,以及b)队列可以是持久的,因此在客户端和/或目标服务器发生故障的情况下,没有在线消息迷路了.
排队的缺点当然是有更多的跳跃,它会减慢系统的速度.但我认为Quasar ROI可能达到峰值,而集中且持久的队列对于扩展和HA来说更为关键.
我的问题是:
是否讨论了这种权衡?是否有任何关于使用集中式外部队列/路由器方法进行内部服务通信的文章.
TL; DR; 我刚才意识到我可以将这个问题说成:
"什么时候使用基于消息总线的内部服务通信,而不是微服务架构中的直接HTTP."
如何在pykafka
主题的特定分区上发布消息.在下面的代码片段中,测试主题有四个分区,我打算在其中一个分区中编写每个消息,但显然它不是那样工作的.
from pykafka import KafkaClient
import logging
logging.basicConfig()
client = KafkaClient(hosts='localhost:9092')
print client.topics
topic = client.topics['test']
with topic.get_producer() as producer:
for i in range(4):
producer.produce('another test message ' + str(i ** 2), partition_key='{}'.format(0))
Run Code Online (Sandbox Code Playgroud) message-bus ×6
apache-kafka ×1
azure ×1
backpressure ×1
c# ×1
java ×1
kafka-python ×1
multicast ×1
osgi ×1
producer ×1
python ×1
quasar ×1