我需要同时发送和接收。
哪个选项会更好:
或者
或者还有其他解决方案吗?
我预计最多有大约 50 个双向连接。这导致选项 #1 中有 50 个线程,选项 #2 中有 100 个线程。
我在python 2.7(在linux下)编写了一个类,它使用多个进程异步操作数据库。我遇到当使用一个很奇怪的阻塞行为multiprocessing.Queue.put()和multiprocessing.Queue.get()我无法解释。
这是我所做的简化版本:
from multiprocessing import Process, Queue
class MyDB(object):
def __init__(self):
self.inqueue = Queue()
p1 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
p1.daemon = True
started = False
while not started:
try:
p1.start()
started = True
except:
time.sleep(1)
#Sometimes I start a same second process but it makes no difference to my problem
p2 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
#blahblah... (same as above)
@staticmethod
def _worker_process(inqueue):
while True:
#--------------this blocks depite data having arrived------------
op …Run Code Online (Sandbox Code Playgroud) 情况是我有一个阻塞管道或套接字 fd,我不想write()阻塞,所以我先做了select()一个,但这仍然不能保证write()不会阻塞。
这是我收集的数据。即使select()指示可以写入,写入的PIPE_BUF字节数也会阻塞。然而,写入最多PIPE_BUF字节在实践中似乎不会阻塞,但POSIX 规范并未强制要求。
那只指定原子行为。Python(!) 文档指出:
报告为准备写入的文件
select(),poll()或此模块中的类似接口保证不会在写入最多PIPE_BUF字节时阻塞。POSIX 保证该值至少为512.
在下面的测试程序中,设置BUF_BYTES为100000在write()成功选择后在 Linux、FreeBSD 或 Solaris 上阻塞
。我假设命名管道与匿名管道具有相似的行为。
不幸的是,阻塞套接字也会发生同样的情况。通话
test_socket()中main(),并使用一个相当大的BUF_BYTES(100000好这里太)。目前尚不清楚是否有像PIPE_BUF套接字一样的安全缓冲区大小
。
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <limits.h>
#include <stdio.h> …Run Code Online (Sandbox Code Playgroud) 我使用 Oozie 创建了一个工作流,该工作流由多个操作节点组成,并且能够通过协调器成功运行这些节点。
我想通过包装器 shell 脚本调用 Oozie 工作流。
包装器脚本应调用 Oozie 命令,等待 oozie 作业完成(成功或错误)并返回 Oozie 成功状态代码 (0) 或失败的 oozie 操作节点的错误代码(如果 oozie 工作流的任何节点具有失败的)。
从我目前所见,我知道只要我调用 oozie 命令来运行工作流,该命令就会退出,并在 linux 控制台上打印作业 ID,而 oozie 作业将在后端异步运行。
我希望我的包装器脚本阻塞,直到 oozie 协调器作业完成并返回成功/错误代码。
您能否告诉我如何/是否可以使用任何 oozie 功能来实现这一目标?
我在 Linux 中使用 Oozie 3.3.2 版和 bash shell。
注意:如果有人对我为什么需要这样的功能感到好奇 - 要求是我的包装器 shell 脚本应该知道 oozie 作业运行了多长时间,oozie 作业何时完成,并相应地返回退出代码,以便调用包装器脚本的父进程知道作业是否成功完成,如果出错,则为支持团队发出警报/票。
如何在两个 python 进程之间创建一个 fifo,如果阅读器无法处理输入,则允许删除行?
read或readline比作者写得更快,它应该阻塞。readline尝试时只应接收写入的最后一行。使用命名的 fifo 是否可以做到这一点,或者还有其他简单的方法可以实现这一目标吗?
我有一个 Observable,它以 JSON 字符串的形式发出事件。我想订阅它阻塞并获取第一个字符串。不是问题。
但我也想超时并在超时触发时发出一个罐装 JSON 字符串。
这是我现有的代码:
String jsonString = rxEvents.subscribe().toBlocking().first();
Run Code Online (Sandbox Code Playgroud) 我正在编写一个需要显式解析所有键盘输入的 python 应用程序。因此我写了一个小循环,不断从标准输入读取。这工作正常,但是, stdin.read(1) 会阻塞,直到我们键入一个字符。现在我希望它在(例如)1 秒后超时,以便其他事情可能发生。我在 python 中阅读了 select 模块,现在我有以下内容:
def getch(timeout):
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
ch = None
try:
tty.setraw(fd)
rlist, _, _ = select([sys.stdin], [], [], timeout)
if len(rlist) > 0:
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
Run Code Online (Sandbox Code Playgroud)
这段代码的问题是,当我按下箭头键时,我只收到“\x1b”。select 函数永远不会为剩余的 '[' 和 'D' 触发。
如何正确读取这些箭头键字符?或者我怎样才能再次触发 select 函数(因为 stdin 上仍有可用的数据)。
谢谢!
我试图在从管道接收时读取 Python 脚本中的 stdin。
我使用了这些行:
for line in sys.stdin:
print line
Run Code Online (Sandbox Code Playgroud)
并运行脚本: echo "test" | script.py
到目前为止它工作正常。但是,如果我不使用管道,程序就会坚持for命令。这意味着调用:./script.py将使脚本不起作用。我怎样才能解决这个问题?
我正在寻找方法来了解进程或线程花时间等待的系统调用或子系统,即阻塞而不是安排在 CPU 上运行。
具体来说,如果我有一些未知的过程,或者一个我们只知道“它很慢”的过程,我希望能够学习以下内容:
sys_write()fd 13 上,即 /some/file”换句话说,当我的程序不在 CPU 上运行时,它在做什么?
这是一个惊人很难与回答perf,因为它不会出现有任何方式的记录系统调用从sys_enter持续时间sys_exit或以其他方式跟踪多久的事件。大概是由于其采样性质。
我知道一些针对 Linux 4.6 及更高版本的 eBPF 的实验性工作可能会有所帮助,Brendan Gregg 的off-cpu 工作。但在悲惨的运维世界中,支持 4.6 内核的独角兽却是难得一见的珍贵。
现实世界的选择是什么?
ftrace、systemtap 等在这里提供任何见解吗?
我有一个与多个客户端一起工作的多线程聊天服务器(每个客户端都在一个新线程中处理)。
如果客户端断开连接,服务器上的相关线程将挂在 inputstream.readLine() 上,这是一个阻塞调用。
如何设置某种超时,以便在 2000 毫秒后关闭套接字并释放线程而没有响应?
class Handler(socket: Socket) extends Runnable {
def run() : Unit = {
val inputstream = new BufferedReader(new InputStreamReader(socket.getInputStream()))
val outputstream = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))
var break = false
while(break != true){
val input = inputstream.readLine() // blocking call
// how do i time out after 2000ms?
input match {
case "update" =>
outputstream.write("message")
outputstream.newLine()
outputstream.flush()
case _ => // any other input, break and close socket
break = true
}
}
socket.close()
} …Run Code Online (Sandbox Code Playgroud) blocking ×10
python ×4
sockets ×3
linux ×2
nonblocking ×2
pipe ×2
stdin ×2
error-code ×1
fifo ×1
networking ×1
observable ×1
oozie ×1
perf ×1
performance ×1
posix ×1
queue ×1
rx-java ×1
rx-java2 ×1
scala ×1
serversocket ×1
shell ×1
systemtap ×1
unix ×1