更改相机流时烧瓶损坏的管道

gaw*_*103 5 python multithreading pipe flask python-3.x

在我的烧瓶应用程序中,我经常遇到管道损坏错误(Errno 32)。我有一个在 RPi 上运行的 Flask 服务器,它有摄像头。应用程序非常简单:读取相机帧并在网站上显示流。但是我想要两个摄像头流:一个有对象检测,一个没有,所以为了实现这一点,我创建了两个类来表示它们。他们有带有相机帧的队列,他们所做的就是在 html 中生成帧。相机和流类在线程中工作。

下面是 Camera 类的样子:

from threading import Thread
from copy import deepcopy

import queue
import cv2

class Camera(Thread):
    def __init__(self, cam, normalQue, detectedQue):
        Thread.__init__(self)
        self.__cam = cam
        self.__normalQue = normalQue
        self.__detectedQue = detectedQue
        self.__shouldStop = False
        
    def __del__(self):
        self.__cam.release()
        print('Camera released')
        
    def run(self):
        while True:
            rval, frame = self.__cam.read()

            if rval:
                frame = cv2.resize(frame, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_AREA)
                _, jpeg = cv2.imencode('.jpg', frame)

                self.__normalQue.put(jpeg.tobytes())
                self.__detectedQue.put(deepcopy(frame))

            if self.__shouldStop:
                break

    def stopCamera(self):
        self.__shouldStop = True
Run Code Online (Sandbox Code Playgroud)

下面是 NormalStream 类的样子:

from threading import Thread

import traceback
import cv2

class NormalVideoStream(Thread):
    def __init__(self, framesQue):
        Thread.__init__(self)
        self.__frames = framesQue
        self.__img = None

    def run(self):
        while True:
            if self.__frames.empty():
                continue

            self.__img = self.__frames.get()

    def gen(self):
        while True:
            try:
                if self.__img is None:
                    print('Normal stream frame is none')
                    continue

                # print(f'Type of self.__img: {type(self.__img)}')

                yield (b'--frame\r\n'
                    b'Content-Type: image/jpeg\r\n\r\n' + self.__img + b'\r\n')
            except:
                traceback.print_exc()
                print('Normal video stream genenation exception')
Run Code Online (Sandbox Code Playgroud)

下面是 DetectionStream 类的样子:

from threading import Thread

import cv2
import traceback

class DetectionVideoStream(Thread):
    def __init__(self, framesQue):
        Thread.__init__(self)
        
        self.__frames = framesQue
        self.__img = None
        self.__faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

    def run(self):
        while True:
            if self.__frames.empty():
                continue
            
            self.__img = self.__detectFace()

    def gen(self):
        while True:
            try:
                if self.__img is None:
                    print('Detected stream frame is none')

                yield (b'--frame\r\n'
                    b'Content-Type: image/jpeg\r\n\r\n' + self.__img + b'\r\n')
            except:
                traceback.print_exc()
                print('Detection video stream genenation exception')
    
    def __detectFace(self):
        retImg = None

        try:
            img = self.__frames.get()

            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

            faces = self.__faceCascade.detectMultiScale(gray, 1.1, 4)

            for (x, y, w, h) in faces:
                cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)

            (_, encodedImage) = cv2.imencode('.jpg', img)

            retImg = encodedImage.tobytes()
        except:
            traceback.print_exc()
            print('Face detection exception')

        return retImg

Run Code Online (Sandbox Code Playgroud)

这是我的主要外观:

from flask import Blueprint, render_template, Response, abort, redirect, url_for
from flask_login import login_required, current_user
from queue import Queue
from . import db
from .Camera import Camera
from .NormalVideoStream import NormalVideoStream
from .DetectionVideoStream import DetectionVideoStream
from .models import User

import cv2

main = Blueprint('main', __name__)

# Queues for both streams
framesNormalQue = Queue(maxsize=0)
framesDetectionQue = Queue(maxsize=0)
print('Queues created')

# RPi camera instance
camera = Camera(cv2.VideoCapture(0), framesNormalQue, framesDetectionQue)
camera.start()
print('Camera thread started')

# Streams
normalStream = NormalVideoStream(framesNormalQue)
detectionStream = DetectionVideoStream(framesDetectionQue)
print('Streams created')

normalStream.start()
print('Normal stream thread started')
detectionStream.start()
print('Detection stream thread started')

@main.route('/')
def index():
    return render_template('index.html')

@main.route('/profile', methods=["POST", "GET"])
def profile():
    if not current_user.is_authenticated:
        abort(403)

    return render_template('profile.html', name=current_user.name, id=current_user.id, detectionState=current_user.detectionState)

@main.route('/video_stream/<int:stream_id>')
def video_stream(stream_id):
    if not current_user.is_authenticated:
        abort(403)

    print(f'Current user detection: {current_user.detectionState}')

    global detectionStream
    global normalStream

    stream = None

    if current_user.detectionState:
        stream = detectionStream
        print('Stream set to detection one')
    else:
        stream = normalStream
        print('Stream set to normal one')

    return Response(stream.gen(), mimetype='multipart/x-mixed-replace; boundary=frame')

@main.route('/detection')
def detection():
    if not current_user.is_authenticated:
        abort(403)

    if current_user.detectionState:
        current_user.detectionState = False
    else:
        current_user.detectionState = True

    user = User.query.filter_by(id=current_user.id)
    user.detectionState = current_user.detectionState

    db.session.commit()

    return redirect(url_for('main.profile', id=current_user.id, user_name=current_user.name))

@main.errorhandler(404)
def page_not_found(e):
    return render_template('404.html'), 404

@main.errorhandler(403)
def page_forbidden(e):
    return render_template('403.html'), 403
Run Code Online (Sandbox Code Playgroud)

我启动的应用程序是这样的:python3 -m flask run —host=ip —port=port —with-threads。对我来说奇怪的是应用程序能够一直正确运行。但是当我尝试更改呈现给用户的流时会显示损坏的管道。我正在更改 def detection() 中的流,这真的很简单,所以我猜问题出在其他地方。

这是有问题的日志:

127.0.0.1 - - [12/Aug/2020 15:40:06] "GET /video_stream/3 HTTP/1.1" 200 -
Traceback (most recent call last):
  File "/Users/piotrekgawronski/Library/Python/3.7/lib/python/site-packages/werkzeug/serving.py", line 295, in execute
    write(data)
  File "/Users/piotrekgawronski/Library/Python/3.7/lib/python/site-packages/werkzeug/serving.py", line 276, in write
    self.wfile.write(data)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/socketserver.py", line 799, in write
    self._sock.sendall(b)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/piotrekgawronski/Documents/programowanie/git_projects/Peephole/server/NormalVideoStream.py", line 29, in gen
    b'Content-Type: image/jpeg\r\n\r\n' + self.__img + b'\r\n')
GeneratorExit
Normal video stream genenation exception
Exception ignored in: <generator object NormalVideoStream.gen at 0x11b8d7750>
RuntimeError: generator ignored GeneratorExit
Run Code Online (Sandbox Code Playgroud)

之后我可以看到 CPU 使用率为 100%,整个应用程序开始滞后。我相信我的 RPi 足够强大,可以正确处理这个应用程序 (RPi 4B 4GB)。

也许有人知道问题出在哪里?