Python分布式计算(工作)

mot*_*oku 7 python sockets distributed-computing pickle multiprocessing

我正在使用旧线程发布新代码,试图解决同样的问题.什么构成安全的泡菜?这个?

sock.py

from socket import socket
from socket import AF_INET
from socket import SOCK_STREAM
from socket import gethostbyname
from socket import gethostname

class SocketServer:
  def __init__(self, port):
    self.sock = socket(AF_INET, SOCK_STREAM)
    self.port = port
  def listen(self, data):
    self.sock.bind(("127.0.0.1", self.port))
    self.sock.listen(len(data))
    while data:
      s = self.sock.accept()[0]
      siz, dat = data.pop()
      s.send(siz)
      s.send(dat)
      s.close()

class Socket:
  def __init__(self, host, port):
    self.sock = socket(AF_INET, SOCK_STREAM)
    self.sock.connect((host, port))
  def recv(self, size):
    return self.sock.recv(size)
Run Code Online (Sandbox Code Playgroud)

pack.py

#http://stackoverflow.com/questions/6234586/we-need-to-pickle-any-sort-of-callable
from marshal import dumps as marshal_dumps
from pickle import dumps as pickle_dumps
from struct import pack as struct_pack

class packer:
  def __init__(self):
    self.f = []
  def pack(self, what):
    if type(what) is type(lambda:None):
      self.f = []
      self.f.append(marshal_dumps(what.func_code))
      self.f.append(pickle_dumps(what.func_name))
      self.f.append(pickle_dumps(what.func_defaults))
      self.f.append(pickle_dumps(what.func_closure))
      self.f = pickle_dumps(self.f)
      return (struct_pack('Q', len(self.f)), self.f)
Run Code Online (Sandbox Code Playgroud)

unpack.py

from types import FunctionType
from pickle import loads as pickle_loads
from marshal import loads as marshal_loads
from struct import unpack as struct_unpack
from struct import calcsize

#http://stackoverflow.com/questions/6234586/we-need-to-pickle-any-sort-of-callable

class unpacker:
  def __init__(self):
    self.f = []
    self.fcompiled = lambda:None
    self.sizeofsize = calcsize('Q')
  def unpack(self, sock):
    size = struct_unpack('Q', sock.recv(self.sizeofsize))[0]
    self.f = pickle_loads(sock.recv(size))
    a = marshal_loads(self.f[0])
    b = globals() ##
    c = pickle_loads(self.f[1])
    d = pickle_loads(self.f[2])
    e = pickle_loads(self.f[3])
    self.fcompiled = FunctionType(a, b, c, d, e)
    return self.fcompiled
Run Code Online (Sandbox Code Playgroud)

test.py

from unpack import unpacker
from pack import packer
from sock import SocketServer
from sock import Socket
from threading import Thread
from time import sleep

count = 2
port = 4446

def f():
  print 42

def server():
  ss = SocketServer(port)
  pack = packer()
  functions = [pack.pack(f) for nothing in range(count)]
  ss.listen(functions)

if __name__ == "__main__":
  Thread(target=server).start()
  sleep(1)
  unpack = unpacker()
  for nothing in range(count):
    print unpack.unpack(Socket("127.0.0.1", port))
Run Code Online (Sandbox Code Playgroud)

输出:

<function f at 0x12917d0>
<function f at 0x12915f0>
Run Code Online (Sandbox Code Playgroud)

Art*_*par 2

ValueError: insecure string pickle当你的泡菜腐败时就会出现。您确定您收到的是整个腌制对象sock.recv()(unpack.py)吗?

\n\n

编辑:为了避免这种情况发生在你可以做的任何大小上(你的Socket类必须支持使用缓冲区大小参数调用recv(即

\n\n
 class Socket:\n    def recv(self, bufsize):\n        return self.sock.recv(bufsize)\n
Run Code Online (Sandbox Code Playgroud)\n\n

)):

\n\n
import struct\n\nstruct.pack(\'Q\', len(pickled_list))\n# Send it, and then send the pickled list.\n
Run Code Online (Sandbox Code Playgroud)\n\n

在接收程序中:

\n\n
import struct\n\nlength = struct.unpack(\'Q\', sock.recv(struct.calcsize(\'Q\')))[0]\npickled_list = sock.recv(length)\n
Run Code Online (Sandbox Code Playgroud)\n\n

“Q”是一个unsigned long long. 对于其他结构体,请参阅结构体模块文档

\n