我是一个Sql Server Service Broker新手,我正在尝试掌握为一个(看似)简单的用例设置Service Broker的最佳方法:我想创建一个简单的工作队列,其中一个应用程序将工作项丢弃到队列和单独的应用程序从该队列中获取工作项并处理它们.第一个应用程序不需要从第二个应用程序返回状态消息.我希望队列存在于单个Sql Server实例中.
最令我困惑的是对话/对话与这种情况的关系.我知道你只能在对话/对话的上下文中发送/接收消息,但由于这两个应用程序之间没有来回的喋喋不休,我对于何时是创建新会话的正确时间感到迷茫.两种极端的替代方案似乎是:
走这些路线的后果是什么?
此外,在第一种情况下,似乎我需要进行一些END CONVERSATION,以便Sql Server能够在内部清理资源.什么时候将它们放入正确的地方是否有任何指导?(或者最终依靠对话超时可能会更好?)
我正在使用线程和队列模块在Python中编写一个简单的爬虫程序.我获取一个页面,检查链接并将它们放入队列,当某个线程完成处理页面时,它从队列中抓取下一个.我正在为我已经访问过的页面使用一个数组来过滤我添加到队列中的链接,但是如果有多个线程并且它们在不同页面上获得相同的链接,则它们会将重复的链接放入队列.那么如何才能找出某个url是否已经在队列中以避免再次将其放入队列中?
我想std::copy用于将元素插入到队列中,如下所示:
vector<int> v;
v.push_back( 1 );
v.push_back( 2 );
queue<int> q;
copy( v.begin(), v.end(), insert_iterator< queue<int> >( q, q.front() ) );
Run Code Online (Sandbox Code Playgroud)
但这无法编译,抱怨begin不是其中的一员std::queue.
注意:我也试过std::inserter了 - 这也失败了,这次说'reference'不是'std :: queue'的成员. std::back_inserter并且std::back_insert_iterator还失败,相同的错误.
我错过了一些明显的东西,还是insert_iterator只是不能使用队列?
我最近听说过很多关于.NET 4.0中TPL的播客.他们中的大多数使用任务描述后台活动,如下载图像或进行计算,以便工作不会干扰GUI线程.
我工作的大多数代码都有更多的多生产者/单一消费者风格,其中来自多个来源的工作项必须排队,然后按顺序处理.一个示例是日志记录,其中来自多个线程的日志行被顺序化为单个队列,以便最终写入文件或数据库.来自任何单一来源的所有记录必须保持有序,并且来自同一时刻的记录应该在最终输出中彼此"接近".
因此,多个线程或任务或任何调用队列的任何东西:
lock( _queue ) // or use a lock-free queue!
{
_queue.enqueue( some_work );
_queueSemaphore.Release();
}
Run Code Online (Sandbox Code Playgroud)
专用工作线程处理队列:
while( _queueSemaphore.WaitOne() )
{
lock( _queue )
{
some_work = _queue.dequeue();
}
deal_with( some_work );
}
Run Code Online (Sandbox Code Playgroud)
将工作线程专门用于这些任务的消费者方面似乎总是合理的.我应该使用TPL中的某些构造来编写未来的程序吗?哪一个?为什么?
我的代码中有一个奇怪的错误.这是非常罕见的(可能每隔几周发生一次),但它就在那里,我不知道为什么.
我们有2个线程正在运行,1个线程获取网络消息并将它们添加到队列中,如下所示:
DataMessages.Enqueue(new DataMessage(client, msg));
Run Code Online (Sandbox Code Playgroud)
另一个线程将消息从此队列中取出并处理它们,如下所示:
while (NetworkingClient.DataMessages.Count > 0)
{
DataMessage message = NetworkingClient.DataMessages.Dequeue();
switch (message.messageType)
{
...
}
}
Run Code Online (Sandbox Code Playgroud)
但是,每隔一段时间我就会在行上得到一个NullReferenceException switch (message.messageType),我可以在调试器中看到该消息为null.
将空值放入队列是不可能的(参见代码的第一位),这些是使用队列的唯一两件事.
队列是不是线程安全的,是不是我在另一个线程入队的确切时刻出列,这会导致故障?
计划1:
#include <iostream>
#include <cstdlib>
#include <vector>
int main(){
//compiles successfully
std::vector<int> vec{1,2,3,4,5};
return EXIT_SUCCESS;
}
Run Code Online (Sandbox Code Playgroud)
计划2:
#include <iostream>
#include <cstdlib>
#include <queue>
int main(){
//compiler error
std::queue<int> que{1,2,3,4,5};
return EXIT_SUCCESS;
}
Run Code Online (Sandbox Code Playgroud)
错误信息:
main.cpp: In function ‘int main()’:
main.cpp:7:31: error: no matching function for call to ‘std::queue<int>::queue(<brace-enclosed initializer list>)’
main.cpp:7:31: note: candidates are:
/usr/include/c++/4.6/bits/stl_queue.h:141:7: note: std::queue<_Tp, _Sequence>::queue(_Sequence&&) [with _Tp = int, _Sequence = std::deque<int, std::allocator<int> >]
/usr/include/c++/4.6/bits/stl_queue.h:141:7: note: candidate expects 1 argument, 5 provided
/usr/include/c++/4.6/bits/stl_queue.h:137:7: note: std::queue<_Tp, _Sequence>::queue(const _Sequence&) [with …Run Code Online (Sandbox Code Playgroud) 在与某人就Python中的异常处理进行了短暂的辩论 - 由于处理队列对象而引发的 - 我以为我会把它扔出去......
import Queue
q = Queue.Queue()
try:
task=q.get(False)
#Opt 1: Handle task here and call q.task_done()
except Queue.Empty:
#Handle empty queue here
pass
#Opt2: Handle task here and call q.task_done()
Run Code Online (Sandbox Code Playgroud)
import Queue
q = Queue.Queue()
if q.empty():
#Handle empty queue here
else:
task = q.get()
#Handle task here
q.task_done()
Run Code Online (Sandbox Code Playgroud)
一个参数是方法1是错误的,因为队列为空不是错误,因此不应使用Queue.Empty异常处理.此外,如果您认为任务处理部分可能很大,那么在以这种方式编码时,它可能会使调试更加困难.
另一个论点是,在Python中可以接受任何一种方式,并且如果任务处理很大,处理try/except之外的任务可能有助于调试,尽管同意这可能看起来比使用方法2更丑.
意见?
更新:在回答1出现之后再多一点信息......争论是在方法1在一些多线程代码中使用之后开始的.在这种情况下,代码将获取锁(来自threading.Lock对象)并在其返回的任务或Queue.Empty被抛出时释放它
更新2:我们俩都不知道队列对象是线程安全的.看起来像尝试/除了是要走的路!
我在python中遇到这个问题:
到目前为止,我设法实现这个"手动"这样:
while 1:
self.updateQueue()
while not self.mainUrlQueue.empty():
domain = self.mainUrlQueue.get()
# if we didn't launched any process yet, we need to do so
if len(self.jobs) < maxprocess:
self.startJob(domain)
#time.sleep(1)
else:
# If we already have process started we need to clear the old process in our pool and start new ones
jobdone = 0
# We circle through each of the process, until we find one free ; only then leave the loop
while jobdone == …Run Code Online (Sandbox Code Playgroud) 我试图在python中使用Queue,这将是多线程的.我只是想知道我使用的方法是否正确.如果我正在做一些多余的事情或者如果有更好的方法我应该使用.
我试图从表中获取新请求,并使用某些逻辑来安排它们执行某些操作,如运行查询.
所以这里从主线程我为队列生成一个单独的线程.
if __name__=='__main__':
request_queue = SetQueue(maxsize=-1)
worker = Thread(target=request_queue.process_queue)
worker.setDaemon(True)
worker.start()
while True:
try:
#Connect to the database get all the new requests to be verified
db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
#Get new requests for verification
verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS='%s' ORDER BY JOB_ID" %
(username_testschema, 'INITIATED'))
#If there are some requests to be verified, put them in the queue.
if len(verify_these) > 0:
for row in verify_these:
print "verifying : %s" …Run Code Online (Sandbox Code Playgroud) 在使用多处理模块的Python中,有两种队列:
他们之间有什么区别?
from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue
Run Code Online (Sandbox Code Playgroud)
from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion
Run Code Online (Sandbox Code Playgroud)