sha*_*hat 2 sockets go websocket
在过去的几周里,我一直潜伏在 Stack Overflow 上寻找与阅读大量 websocket 相关的信息。基本上,我有许多主机都通过 websocket 发出消息,我需要聚合它们。
到目前为止,我已经使用 Golang 完成了单个 websocket 连接。我也使用 Python 完成了我正在寻找的东西,但我真的很想在 Go 中做到这一点!
我使用了 gorilla 的 websocket 示例以及其他一些示例,并且可以在 Go 中成功读取套接字。然而,websocket 服务器似乎并不完全符合典型的开发实践,因为使用 .forEach 或 .Each 等方法在 JS 中;导致握手失败。
原版
package main
import (
"fmt"
"golang.org/x/net/websocket"
"log"
)
var url = "ws://10.0.1.19:5000/data/websocket"
func main() {
ws, err := websocket.Dial(url, "", origin)
if err != nil {
log.Fatal(err)
}
var msg = make([]byte, 512)
_, err = ws.Read(msg)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Receive: %s\n", msg)
}
Run Code Online (Sandbox Code Playgroud)
我实际上不需要向套接字发送任何数据,我只需要连接并继续读取它,然后我会将这些数据聚合到单个流中以执行以后的操作。
在对大猩猩和旧的 x/net/websocket 库进行持续更改和测试后,我发现不幸的是,我连接的 websocket 服务器似乎没有正确遵守大猩猩想要用于握手的标准. 或者我没有告诉大猩猩如何正确连接。x/net/websocket 连接正常;我只是指定 localhost/ 作为原点,它似乎工作。我不确定如何告诉大猩猩如何做同样的事情,看看它是否以同样的方式工作。深入挖掘 DefaultDialer.Dial() 有一些配置选项,但以我对 Go 的有限了解,现在我还没有找到一种方法来利用它来做我想做的事情。
当前版本 (2016-03-19)
package main
import (
"fmt"
"golang.org/x/net/websocket"
// "log"
"time"
)
var origin = "http://localhost"
type url struct {
host string
}
func processUrl(host string, messages chan []byte) {
client, err := websocket.Dial(host, "", origin)
if err != nil {
// log.Printf("dial:", err)
}
// Clean up on exit from this goroutine
defer client.Close()
// Loop reading messages. Send each message to the channel.
for {
var msg = make([]byte, 512)
_, err = client.Read(msg)
if err != nil {
// log.Fatal("read:", err)
return
}
messages <- msg
}
}
func main() {
// Create an arry of hosts to read websockets from
urls := []string{
"ws://10.0.1.90:3000/data/websocket",
"ws://10.0.2.90:3000/data/websocket",
"ws://10.0.3.90:3000/data/websocket",
}
// Create channel to receive messages from all connections
messages := make(chan []byte)
// Run a goroutine for each URL that you want to dial.
for _, host := range urls {
go processUrl(host, messages)
}
// Print all messages received from the goroutines.
for msg := range messages {
fmt.Printf("%d %s\n", time.Now().Unix(), msg)
}
}
Run Code Online (Sandbox Code Playgroud)
响应(来自 ws 的消息):
{
"src_city":"Wayne",
"dest_city":"Amsterdam",
"src_country":"US",
"dest_country":"NL",
"type":"view"
}
Run Code Online (Sandbox Code Playgroud)
IOWait 问题
我遇到的一个问题是 IOWait 错误。我在一夜之间针对 10 个 websocket 运行了二进制文件,没有任何问题。我针对 al 488 运行它,我需要运行它,它达到了 IOWait 2 分钟,等等。我看到的一些常规错误:
goroutine 72 [IO wait]:
net.runtime_pollWait(0x7f356149b208, 0x72, 0x0)
/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e610, 0x72, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e610, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e5b0, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a150, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc208005140)
/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc208005140, 0xc2080f22d0, 0x0, 0x0)
/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005140, 0x7f356149b908, 0xc2080f22d0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc2080d7050, 0xc2080f4c00, 0x200, 0x200, 0x0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x705010, 0x26, 0xc208004180)
/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126
goroutine 73 [IO wait, 2 minutes]:
net.runtime_pollWait(0x7f356149b158, 0x72, 0x0)
/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e760, 0x72, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e760, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e700, 0xc208015000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a018, 0xc208015000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc2080042a0)
/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc2080042a0, 0x67d6e0, 0x0, 0x0)
/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc2080042a0, 0x7f356149b908, 0xc2080196d0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc208024240, 0xc208080000, 0x200, 0x200, 0x0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x705190, 0x25, 0xc208004180)
/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126
goroutine 74 [IO wait]:
net.runtime_pollWait(0x7f356149b0a8, 0x72, 0x0)
/usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20804e8b0, 0x72, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
net.(*pollDesc).WaitRead(0xc20804e8b0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
net.(*netFD).Read(0xc20804e850, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
/usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
net.(*conn).Read(0xc20803a160, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/lib/go/src/pkg/net/net.go:122 +0xe7
bufio.(*Reader).fill(0xc208005200)
/usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
bufio.(*Reader).ReadByte(0xc208005200, 0xc2080f2320, 0x0, 0x0)
/usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005200, 0x7f356149b908, 0xc2080f2320, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
golang.org/x/net/websocket.(*Conn).Read(0xc2080d70e0, 0xc2080f4e00, 0x200, 0x200, 0x0, 0x0, 0x0)
/home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
main.processUrl(0x7052d0, 0x27, 0xc208004180)
/home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
created by main.main
/home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126
Run Code Online (Sandbox Code Playgroud)
我有另一个二进制文件,它尝试与每个 Web 套接字地址建立初始连接,以便我可以确保它是可访问的,但这又是另一个问题。我的悬而未决的问题是:
对于 IOWait 错误的问题,我不能只想象 (a) 无法建立连接并且例程将其保持打开状态并最终抛出错误;(b) 是否有可能我运行的例程太多?我已经尝试过 10、20、50 和所有 400+,其中甚至指定 10 的版本都可以工作(只要所有 10 都在响应)和 10 不工作的版本,因为主机没有响应。
我可能会有后续问题,但我感谢洞察力和帮助。频道建议绝对让我前进。我以前使用过它们,但并不总是了解如何最好地实现它们。我的另一个项目利用渠道和等待组 (wg),但老实说,我不明白其中一个的意义。
再次感谢,您的想法和建议很棒!
为这篇文章中奇怪的语法道歉,我似乎无法让编辑器删除我的代码元素周围的一些空行
启动一个 goroutine 来读取每个连接。将收到的消息发送到频道。从该通道接收以从所有连接获取消息。
// Create channel to receive messages from all connections
messages := make(chan []byte)
// Run a goroutine for each URL that you want to dial.
for _, u := range urls {
go func(u string) {
// Dial with Gorilla package. The x/net/websocket package has issues.
c, _, err := websocket.DefaultDialer.Dial(u, http.Header{"Origin":{origin}})
if err != nil {
log.Fatal("dial:", err)
}
// Clean up on exit from this goroutine
defer c.Close()
// Loop reading messages. Send each message to the channel.
for {
_, m, err := c.ReadMessage()
if err != nil {
log.Fatal("read:", err)
return
}
messages <- m
}
}(u)
}
// Print all messages received from the goroutines.
for m := range messages {
fmt.Printf("%s\n", m)
}
Run Code Online (Sandbox Code Playgroud)