在单独的进程上运行Goroutines(多处理)

Mel*_*ssa 2 multiprocessing go goroutine

我目前有一个MQTT代码,可以订阅主题,打印出收到的消息,然后将更多指令发布到新主题.在订阅/打印在一个够程完成,出版在另一个goroutine中进行.这是我的代码:

var wg, pg sync.WaitGroup
// All messages are handled here - printing published messages and publishing new messages
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {

wg.Add(1)
pg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Printf("%s\n", msg.Payload())
        //fmt.Println(os.Getpid())
    }()
go func(){
    defer pg.Done()
    message := ""
    //Changing configurations
    if strings.Contains(string(msg.Payload()), "arduinoLED") == true {
        message = fmt.Sprintf("change configuration")
    }
    if  strings.Contains(string(msg.Payload()), "NAME CHANGED") == true{
        message = fmt.Sprintf("change back")
    }
    // Publish further instructions to "sensor/instruction"
    token := client.Publish("sensor/instruction", 0, false, message)
    //fmt.Println(os.Getpid())
    token.Wait()

}()
}

func main() {

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    opts := MQTT.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1883")

    opts.SetDefaultPublishHandler(f)
    // Topic to subscribe to for sensor data
    topic := "sensor/data"

    opts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic, 0, f); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }
    // Creating new client
    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to server\n")
    }
    wg.Wait()
    pg.Wait()
    <-c
}
Run Code Online (Sandbox Code Playgroud)

注释掉的os.Getpid()行是检查我正在运行Goroutine的进程.现在它们都显示相同的数字(这意味着它们都在同一个进程上运行?).

我的问题是:如何在不同的进程中运行两个Goroutines ?有办法吗?

编辑:如果无法完成,我想使用频道编写此代码.这是我的代码:

var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    sensorData := make(chan []byte)
wg.Add(1)
pg.Add(1)
    go func() {
        defer wg.Done()
        //fmt.Printf("%s\n", msg.Payload())
        sensorData <- string(msg.Payload())
        fmt.Println(<-sensorData) //currently not printing anything
    }()
go func(){
    defer pg.Done()
    message := ""
    //Changing configurations
    if strings.Contains(<-sensorData, "arduinoLED") == true{
        message = fmt.Sprintf("change configuration")
    }
    if strings.Contains(<-sensorData, "NAME CHANGED") == true{
        message = fmt.Sprintf("change back")
    }
    // Publish further instructions to "sensor/instruction"
    token := client.Publish("sensor/instruction", 0, false, message)
    token.Wait()

}()

}
Run Code Online (Sandbox Code Playgroud)

但是,我无法使用频道打印任何数据.我究竟做错了什么?

kos*_*tix 8

你可能来自Python,对吗?;-)

它有一个multiprocessing 在stdlib中命名的模块 ,这可能很好地解释了为什么你在问题的标题中使用了这个名字,以及为什么你在解释@JimB所说的内容时遇到了麻烦

如果您需要单独的流程,则需要自己执行

Python中的"多处理"

问题是,Python multiprocessing是一个相当高级别的东西,隐藏在它的引擎盖下很多东西.当你产生一个multiprocessing.Process并让它运行一个函数时,真正发生的是:

  1. Python解释器创建另一个操作系统的进程( fork(2)在类Unix系统或CreateProcessWindows上使用)并安排它执行Python interpter.

    关键的一点是,您现在将有两个进程运行两个Python interpters.

  2. 它被安排为子进程中的Python interpterer有一种与父进程中的Python解释器通信的方式.

    这种"通信链接" 必然涉及某种形式的IPC @JimB.没有其他方法可以在不同的流程之间传递数据和操作,因为商品当代操作系统提供了严格的流程分离.

  3. 当您在进程之间交换Python对象时,两个通信的Python解释器会在它们背后对它们进行序列化序列化,然后通过它们的IPC链接发送它们并相应地从它们接收它们之后.这是使用该pickle模块实现的.

回去吧

Go没有任何与Python紧密匹配的直接解决方案multiprocessing,我真的怀疑它可以合理地实现.

造成这种情况的主要原因主要是因为Go比Python更低级,因此它没有Python对其管理的价值类型做出纯粹假设的奢侈,并且它也努力减少隐藏成本在其构造中尽可能.

Go还努力避开"框架式"方法来解决问题,并在可能的情况下使用"库式"解决方案.(例如,这里给出了"框架与库"的一个很好的概述.)Go在其标准库中有一些东西可以实现类似于Python的东西,multiprocessing但是没有现成的frakework-y解决方案.

所以你能做的就是沿着这些方向前进:

  1. 使用os/exec运行自己的进程的另一个副本.

    • 确保生成的进程"知道"它以特殊的"从属"模式启动 - 以便相应地采取行动.
    • 使用任何形式的IPC与新流程进行通信.通过 子进程的标准I/O流交换数据被认为是最简单的滚动方式(除非你需要交换打开的文件,但这是一个更黑的话题,所以我们不要离题).
  2. encoding/在交换时,使用层次结构中的任何包来序列化和反序列化数据.

    据说是"首选"解决方案encoding/gob.

  3. 创建并实现一个简单的协议,告诉子进程要做什么,以及使用哪些数据,以及如何将结果传回master.

这真的值得吗?

我会说不,不是 - 原因有很多:

  • Go没有像可怕的GIL那样,所以没有必要回避它以实现真正的并行性.

  • 记忆安全在你手中,当你尽职地遵守通过频道发送的内容现在由接收者拥有的原则时,实现它并不是那么难.换句话说,通过通道发送值也是这些值的所有权转移.

  • Go工具链集成了竞争检测器,因此您可以使用-race标志运行测试套件并使用go build -race相同的目的创建程序的评估版本:当以这种方式进行检测的程序运行时,竞争检测器会立即崩溃它检测任何未同步的读/写内存访问.该崩溃产生的打印输出包括有关堆栈跟踪的内容错误的解释性消息.

  • IPC很慢,因此损失可以抵消收益.

总而言之,我认为没有真正的理由来分离流程,除非你正在编写像电子邮件处理服务器这样的概念自然而然的东西.