RabbitMQ 客户端 (DotNet Core) 阻止应用程序关闭

Tim*_*ray 4 c# rabbitmq .net-core

我有一段代码,它使用rabbitMQ 来管理一段时间内的作业列表。因此,我有一个连接和一个向 RabbitMQ 服务器开放的通道来对这些作业进行操作。我使用以下内容对作业进行排队:

    public override void QueueJob(string qid, string jobId) {
        this.VerifyReadyToGo();

        this.CreateQueue(qid);
        byte[] messageBody = Encoding.UTF8.GetBytes(jobId);
        this.channel.BasicPublish(
            exchange: Exchange,
            routingKey: qid,
            body: messageBody,
            basicProperties: null
        );
        OLog.Debug($"Queued job {jobId} on {qid}");
    }

    public override string RetrieveJobID(string qid) {
        this.VerifyReadyToGo();

        this.CreateQueue(qid);

        BasicGetResult data = this.channel.BasicGet(qid, false);
        string jobData = Encoding.UTF8.GetString(data.Body);

        int addCount = 0;
        while (!this.jobWaitingAck.TryAdd(jobData, data.DeliveryTag)) {
            // try again.
            Thread.Sleep(10);
            if (addCount++ > 2) {
                throw new JobReceptionException("Failed to add job to waiting ack list.");
            }
        }
        OLog.Debug($"Found job {jobData} on queue {qid} with ackId {data.DeliveryTag}");
        return jobData;
    }
Run Code Online (Sandbox Code Playgroud)

问题是,在任何像这样的方法调用(发布、获取或确认)之后,都会创建某种后台线程,当通道和连接关闭时,该后台线程不会关闭。这意味着测试通过并且操作成功完成,但是当应用程序尝试关闭时它会挂起并且永远不会完成。

这是连接方法供参考

    public override void Connect() {
        if (this.Connected) {
            return;
        }
        this.factory = new ConnectionFactory {
            HostName = this.config.Hostname,
            Password = this.config.Password,
            UserName = this.config.Username,
            Port = this.config.Port,
            VirtualHost = VirtualHost
        };
        this.connection = this.factory.CreateConnection();
        this.channel = this.connection.CreateModel();
        this.channel.ExchangeDeclare(
            exchange: Exchange,
            type: "direct",
            durable: true
        );
    }
Run Code Online (Sandbox Code Playgroud)

我该怎么做才能纠正这个问题(rabbitmq 客户端阻止应用程序退出)?

Tim*_*ray 5

我不知道为什么,但对 Connect 方法的更改会产生不同:

    public override void Connect() {
        if (this.Connected) {
            return;
        }
        this.factory = new ConnectionFactory {
            HostName = this.config.Hostname,
            Password = this.config.Password,
            UserName = this.config.Username,
            Port = this.config.Port,
            UseBackgroundThreadsForIO = true
        };
        this.connection = this.factory.CreateConnection();
        this.channel = this.connection.CreateModel();
        this.channel.ExchangeDeclare(
            exchange: Exchange,
            type: "direct",
            durable: true
        );
    }
Run Code Online (Sandbox Code Playgroud)

  • 我刚刚为此损失了一整天。为什么默认值不是 UseBackgroundThreadsForIO = true ? (2认同)