Mel*_*ssa 2 multiprocessing go goroutine
我目前有一个MQTT代码,可以订阅主题,打印出收到的消息,然后将更多指令发布到新主题.在订阅/打印在一个够程完成,出版在另一个goroutine中进行.这是我的代码:
var wg, pg sync.WaitGroup
// All messages are handled here - printing published messages and publishing new messages
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
wg.Add(1)
pg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("%s\n", msg.Payload())
//fmt.Println(os.Getpid())
}()
go func(){
defer pg.Done()
message := ""
//Changing configurations
if strings.Contains(string(msg.Payload()), "arduinoLED") == true {
message = fmt.Sprintf("change configuration")
}
if strings.Contains(string(msg.Payload()), "NAME CHANGED") == true{
message = fmt.Sprintf("change back")
}
// Publish further instructions to "sensor/instruction"
token := client.Publish("sensor/instruction", 0, false, message)
//fmt.Println(os.Getpid())
token.Wait()
}()
}
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
opts := MQTT.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1883")
opts.SetDefaultPublishHandler(f)
// Topic to subscribe to for sensor data
topic := "sensor/data"
opts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(topic, 0, f); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
// Creating new client
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to server\n")
}
wg.Wait()
pg.Wait()
<-c
}
Run Code Online (Sandbox Code Playgroud)
注释掉的os.Getpid()行是检查我正在运行Goroutine的进程.现在它们都显示相同的数字(这意味着它们都在同一个进程上运行?).
我的问题是:如何在不同的进程中运行两个Goroutines ?有办法吗?
编辑:如果无法完成,我想使用频道编写此代码.这是我的代码:
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
sensorData := make(chan []byte)
wg.Add(1)
pg.Add(1)
go func() {
defer wg.Done()
//fmt.Printf("%s\n", msg.Payload())
sensorData <- string(msg.Payload())
fmt.Println(<-sensorData) //currently not printing anything
}()
go func(){
defer pg.Done()
message := ""
//Changing configurations
if strings.Contains(<-sensorData, "arduinoLED") == true{
message = fmt.Sprintf("change configuration")
}
if strings.Contains(<-sensorData, "NAME CHANGED") == true{
message = fmt.Sprintf("change back")
}
// Publish further instructions to "sensor/instruction"
token := client.Publish("sensor/instruction", 0, false, message)
token.Wait()
}()
}
Run Code Online (Sandbox Code Playgroud)
但是,我无法使用频道打印任何数据.我究竟做错了什么?
你可能来自Python,对吗?;-)
它有一个multiprocessing
在stdlib中命名的模块
,这可能很好地解释了为什么你在问题的标题中使用了这个名字,以及为什么你在解释@JimB所说的内容时遇到了麻烦
如果您需要单独的流程,则需要自己执行
问题是,Python multiprocessing是一个相当高级别的东西,隐藏在它的引擎盖下很多东西.当你产生一个multiprocessing.Process并让它运行一个函数时,真正发生的是:
Python解释器创建另一个操作系统的进程(
fork(2)在类Unix系统或CreateProcessWindows上使用)并安排它执行Python interpter.
关键的一点是,您现在将有两个进程运行两个Python interpters.
它被安排为子进程中的Python interpterer有一种与父进程中的Python解释器通信的方式.
这种"通信链接" 必然涉及某种形式的IPC @JimB.没有其他方法可以在不同的流程之间传递数据和操作,因为商品当代操作系统提供了严格的流程分离.
当您在进程之间交换Python对象时,两个通信的Python解释器会在它们背后对它们进行序列化和反序列化,然后通过它们的IPC链接发送它们并相应地从它们接收它们之后.这是使用该pickle模块实现的.
Go没有任何与Python紧密匹配的直接解决方案multiprocessing,我真的怀疑它可以合理地实现.
造成这种情况的主要原因主要是因为Go比Python更低级,因此它没有Python对其管理的价值类型做出纯粹假设的奢侈,并且它也努力减少隐藏成本在其构造中尽可能.
Go还努力避开"框架式"方法来解决问题,并在可能的情况下使用"库式"解决方案.(例如,这里给出了"框架与库"的一个很好的概述.)Go在其标准库中有一些东西可以实现类似于Python的东西,multiprocessing但是没有现成的frakework-y解决方案.
所以你能做的就是沿着这些方向前进:
使用os/exec运行自己的进程的另一个副本.
encoding/在交换时,使用层次结构中的任何包来序列化和反序列化数据.
据说是"首选"解决方案encoding/gob.
创建并实现一个简单的协议,告诉子进程要做什么,以及使用哪些数据,以及如何将结果传回master.
我会说不,不是 - 原因有很多:
Go没有像可怕的GIL那样,所以没有必要回避它以实现真正的并行性.
记忆安全在你手中,当你尽职地遵守通过频道发送的内容现在由接收者拥有的原则时,实现它并不是那么难.换句话说,通过通道发送值也是这些值的所有权转移.
Go工具链集成了竞争检测器,因此您可以使用-race标志运行测试套件并使用go build -race相同的目的创建程序的评估版本:当以这种方式进行检测的程序运行时,竞争检测器会立即崩溃它检测任何未同步的读/写内存访问.该崩溃产生的打印输出包括有关堆栈跟踪的内容和错误的解释性消息.
IPC很慢,因此损失可以抵消收益.
总而言之,我认为没有真正的理由来分离流程,除非你正在编写像电子邮件处理服务器这样的概念自然而然的东西.