我有一个场景,我需要非常快速地分配和处理作业.我将在队列中快速填充大约45个作业,我可以同时处理大约20个作业(5台机器,每台4个核心).每个作业花费不同的时间,并且使事情变得复杂,垃圾收集是一个问题所以我需要能够让消费者离线进行垃圾收集.
目前,我有一切都在使用pop(每个消费者每隔5ms弹出一次).这似乎是不合需要的,因为它转换为每秒600次popm请求到rabbitmq.
我很乐意,如果有一个pop命令会像订阅一样,但只有一条消息.(进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)
我试图欺骗AMQP gem来做这样的事情,但它不起作用:在队列为空并且不再向消费者发送消息之前,我似乎无法取消订阅.取消订阅的其他方法我担心会失去信息.
consume_1.rb:
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
Run Code Online (Sandbox Code Playgroud)
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run …
Run Code Online (Sandbox Code Playgroud) ElasticSearch能够将值复制到其他字段(在索引时),使您能够在多个字段上搜索,就像它是一个字段一样(核心类型:copy_to).
但是,似乎没有任何方法可以指定应该复制这些值的顺序.短语匹配时这可能很重要:
curl -XDELETE 'http://10.11.12.13:9200/helloworld'
curl -XPUT 'http://10.11.12.13:9200/helloworld'
# copy_to is ordered alphabetically!
curl -XPUT 'http://10.11.12.13:9200/helloworld/_mapping/people' -d '
{
"people": {
"properties": {
"last_name": {
"type": "string",
"copy_to": "full_name"
},
"first_name": {
"type": "string",
"copy_to": "full_name"
},
"state": {
"type": "string"
},
"city": {
"type": "string"
},
"full_name": {
"type": "string"
}
}
}
}
'
curl -X POST "10.11.12.13:9200/helloworld/people/dork" -d '{"first_name": "Jim", "last_name": "Bob", "state": "California", "city": "San Jose"}'
curl -X POST "10.11.12.13:9200/helloworld/people/face" -d '{"first_name": "Bob", "last_name": …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用对Mesos的持久卷支持,并且很难让它工作.
我已按如下方式配置了每个从属服务器,并确认已成功使用此新配置重新启动:
的/ etc/mesos-从/资源
[ ?
{
"name" : "disk",
"type" : "SCALAR",
"scalar" : { "value" : 4194304 },
"disk" : {
"source" : {
"type" : "PATH",
"path" : { "root" : "/mnt/disk1" }
}
}
},
{
"name" : "disk",
"type" : "SCALAR",
"scalar" : { "value" : 4194304 },
"disk" : {
"source" : {
"type" : "PATH",
"path" : { "root" : "/mnt/disk2" }
}
}
},
{
"name" : "disk",
"type" : "SCALAR",
"scalar" …
Run Code Online (Sandbox Code Playgroud) 嗨其他Scala开发人员,
任何人都可以向我解释下面的代码中的类型推断有什么问题,以及如何修复它.
以下代码是使用Scala 2.10.2的Play 2.2的自定义操作
class Test {
trait Entity
class NodeRequest[A,K <:Entity](val entity: K,
val request: Request[A])
extends WrappedRequest[A](request)
def LocateResource[A,K](itemId: Int, v: List[K],
forceOwners:Boolean = true) =
new ActionBuilder[NodeRequest[A,K]]() {
def invokeBlock[A](request: Request[A],
block: (NodeRequest[A,K]) => Future[SimpleResult]) = {
Future.successful(Ok)
}
}
[error] Test.this.NodeRequest[A,K] takes no type parameters, expected: one
[error] def LocateResource[A,K](itemId: Int, v: List[K] , forceOwners:Boolean = true) = new ActionBuilder[NodeRequest[A,K]]() {
[error] ^
Run Code Online (Sandbox Code Playgroud)