golang 读取了大量的 websockets

sha*_*hat 2 sockets go websocket

在过去的几周里,我一直潜伏在 Stack Overflow 上寻找与阅读大量 websocket 相关的信息。基本上,我有许多主机都通过 websocket 发出消息,我需要聚合它们。

到目前为止,我已经使用 Golang 完成了单个 websocket 连接。我也使用 Python 完成了我正在寻找的东西,但我真的很想在 Go 中做到这一点!

我使用了 gorilla 的 websocket 示例以及其他一些示例,并且可以在 Go 中成功读取套接字。然而,websocket 服务器似乎并不完全符合典型的开发实践,因为使用 .forEach 或 .Each 等方法在 JS 中;导致握手失败。

原版



    package main

    import (
            "fmt"
            "golang.org/x/net/websocket"
            "log"
    )

    var url = "ws://10.0.1.19:5000/data/websocket"

    func main() {
            ws, err := websocket.Dial(url, "", origin)
            if err != nil {
                    log.Fatal(err)
            }

            var msg = make([]byte, 512)
            _, err = ws.Read(msg)
            if err != nil {
                    log.Fatal(err)
            }
            fmt.Printf("Receive: %s\n", msg)
    }

Run Code Online (Sandbox Code Playgroud)

我实际上不需要向套接字发送任何数据,我只需要连接并继续读取它,然后我会将这些数据聚合到单个流中以执行以后的操作。


更新 (2016-03-19)

在对大猩猩和旧的 x/net/websocket 库进行持续更改和测试后,我发现不幸的是,我连接的 websocket 服务器似乎没有正确遵守大猩猩想要用于握手的标准. 或者我没有告诉大猩猩如何正确连接。x/net/websocket 连接正常;我只是指定 localhost/ 作为原点,它似乎工作。我不确定如何告诉大猩猩如何做同样的事情,看看它是否以同样的方式工作。深入挖掘 DefaultDialer.Dial() 有一些配置选项,但以我对 Go 的有限了解,现在我还没有找到一种方法来利用它来做我想做的事情。

当前版本 (2016-03-19)

package main

import (
    "fmt"
    "golang.org/x/net/websocket"
    // "log"
    "time"
)

var origin = "http://localhost"

type url struct {
    host string
}

func processUrl(host string, messages chan []byte) {
    client, err := websocket.Dial(host, "", origin)
    if err != nil {
        // log.Printf("dial:", err)
    }
    // Clean up on exit from this goroutine
    defer client.Close()
    // Loop reading messages. Send each message to the channel.
    for {
        var msg = make([]byte, 512)
        _, err = client.Read(msg)
        if err != nil {
            // log.Fatal("read:", err)
            return
        }
        messages <- msg
    }
}

func main() {

    // Create an arry of hosts to read websockets from
    urls := []string{
        "ws://10.0.1.90:3000/data/websocket",
        "ws://10.0.2.90:3000/data/websocket",
        "ws://10.0.3.90:3000/data/websocket",
    }

    // Create channel to receive messages from all connections
    messages := make(chan []byte)

    // Run a goroutine for each URL that you want to dial.
    for _, host := range urls {
        go processUrl(host, messages)
    }

    // Print all messages received from the goroutines.
    for msg := range messages {
        fmt.Printf("%d %s\n", time.Now().Unix(), msg)
    }

}
Run Code Online (Sandbox Code Playgroud)

响应(来自 ws 的消息):


    {
        "src_city":"Wayne",
        "dest_city":"Amsterdam",
        "src_country":"US",
        "dest_country":"NL",
        "type":"view"
    }
Run Code Online (Sandbox Code Playgroud)

IOWait 问题

我遇到的一个问题是 IOWait 错误。我在一夜之间针对 10 个 websocket 运行了二进制文件,没有任何问题。我针对 al 488 运行它,我需要运行它,它达到了 IOWait 2 分钟,等等。我看到的一些常规错误:



    goroutine 72 [IO wait]:
    net.runtime_pollWait(0x7f356149b208, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e610, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e610, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e5b0, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a150, 0xc2080d1000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc208005140)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc208005140, 0xc2080f22d0, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005140, 0x7f356149b908, 0xc2080f22d0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc2080d7050, 0xc2080f4c00, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x705010, 0x26, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

    goroutine 73 [IO wait, 2 minutes]:
    net.runtime_pollWait(0x7f356149b158, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e760, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e760, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e700, 0xc208015000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a018, 0xc208015000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc2080042a0)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc2080042a0, 0x67d6e0, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc2080042a0, 0x7f356149b908, 0xc2080196d0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc208024240, 0xc208080000, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x705190, 0x25, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

    goroutine 74 [IO wait]:
    net.runtime_pollWait(0x7f356149b0a8, 0x72, 0x0)
        /usr/lib/go/src/pkg/runtime/netpoll.goc:146 +0x66
    net.(*pollDesc).Wait(0xc20804e8b0, 0x72, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:84 +0x46
    net.(*pollDesc).WaitRead(0xc20804e8b0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/fd_poll_runtime.go:89 +0x42
    net.(*netFD).Read(0xc20804e850, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x7f3561498418, 0xb)
        /usr/lib/go/src/pkg/net/fd_unix.go:242 +0x34c
    net.(*conn).Read(0xc20803a160, 0xc2080d9000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
        /usr/lib/go/src/pkg/net/net.go:122 +0xe7
    bufio.(*Reader).fill(0xc208005200)
        /usr/lib/go/src/pkg/bufio/bufio.go:97 +0x1b3
    bufio.(*Reader).ReadByte(0xc208005200, 0xc2080f2320, 0x0, 0x0)
        /usr/lib/go/src/pkg/bufio/bufio.go:199 +0x7e
    golang.org/x/net/websocket.hybiFrameReaderFactory.NewFrameReader(0xc208005200, 0x7f356149b908, 0xc2080f2320, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/hybi.go:126 +0xd7
    golang.org/x/net/websocket.(*Conn).Read(0xc2080d70e0, 0xc2080f4e00, 0x200, 0x200, 0x0, 0x0, 0x0)
        /home/shat/go/src/golang.org/x/net/websocket/websocket.go:178 +0xfb
    main.processUrl(0x7052d0, 0x27, 0xc208004180)
        /home/shat/go/src/github.com/sh4t/scansock/main.go:26 +0x107
    created by main.main
        /home/shat/go/src/github.com/sh4t/scansock/main.go:101 +0x126

Run Code Online (Sandbox Code Playgroud)

我有另一个二进制文件,它尝试与每个 Web 套接字地址建立初始连接,以便我可以确保它是可访问的,但这又是另一个问题。我的悬而未决的问题是:

  1. 如何使用 Gorilla 的 websocket 实现从 SockJS 提供的 websocket 中读取数据,就像我使用 x/net/websocket 的实现一样。
  2. 我看到的 IOWait 问题可能或可能的原因是什么?
  3. 因为我使用 []byte 作为响应,我的日志文件(我只是将 stdout 传输到文件)包含末尾附加有二进制数据的行。我应该/如何将字节切片转换为仅将其作为文本/字符串写入 stdout 以避免这种情况?
  4. 使用 chan,如何最好地从我的 process 函数中传递一个额外的参数,以将消息从 websocket 的主机返回到通道,以便我可以同时记录主机和消息?我应该使用结构来定义通道并让通道包含我想要的内容:时间戳、主机、消息?

对于 IOWait 错误的问题,我不能只想象 (a) 无法建立连接并且例程将其保持打开状态并最终抛出错误;(b) 是否有可能我运行的例程太多?我已经尝试过 10、20、50 和所有 400+,其中甚至指定 10 的版本都可以工作(只要所有 10 都在响应)和 10 不工作的版本,因为主机没有响应。

我可能会有后续问题,但我感谢洞察力和帮助。频道建议绝对让我前进。我以前使用过它们,但并不总是了解如何最好地实现它们。我的另一个项目利用渠道和等待组 (wg),但老实说,我不明白其中一个的意义。

再次感谢,您的想法和建议很棒!

为这篇文章中奇怪的语法道歉,我似乎无法让编辑器删除我的代码元素周围的一些空行

Cer*_*món 6

启动一个 goroutine 来读取每个连接。将收到的消息发送到频道。从该通道接收以从所有连接获取消息。

// Create channel to receive messages from all connections
messages := make(chan []byte)

// Run a goroutine for each URL that you want to dial.
for _, u := range urls {
    go func(u string) {
        // Dial with Gorilla package. The x/net/websocket package has issues.
        c, _, err := websocket.DefaultDialer.Dial(u, http.Header{"Origin":{origin}})
        if err != nil {
            log.Fatal("dial:", err)
        }
        // Clean up on exit from this goroutine
        defer c.Close()
        // Loop reading messages. Send each message to the channel.
        for {
            _, m, err := c.ReadMessage()
            if err != nil {
                log.Fatal("read:", err)
                return
            }
            messages <- m
        }
    }(u)
}

// Print all messages received from the goroutines.
for m := range messages {
    fmt.Printf("%s\n", m)
}
Run Code Online (Sandbox Code Playgroud)

  • @starriet WeakPointer 指出,将 `m` 发送到答案中的另一个 goroutine 是安全的,因为 `c.ReadMessge()` 在每次调用时都会返回一个新切片。 (2认同)
  • 答案中的代码是安全的。在某些 API 中,返回切片的后备数组会被保留,并在下次调用 API 时被覆盖。有关示例,请参阅 bufio 包。将这些 API 之一返回的切片发送到另一个 goroutine 是不安全的。WeakPointer深入研究了代码,发现ReadMessage并不像那些API。ReadMessage 在每次调用时都会返回一个带有支持数组的新切片。将该切片发送到另一个 goroutine 是安全的。 (2认同)