我正在尝试同时使用 Flask Restful 和 Flask SocketIO。我已经制作了一个 Flask Restful 应用程序,但现在我想使用 Flask SocketIO 在客户端和我的服务器之间进行实时通信。
from flask import Flask
from flask_restful import Api
from flask_socketio import SocketIO
app = Flask(__name__)
api = Api(app)
socketio = SocketIO()
if __name__ == '__main__':
socketio.run(app, port=5000, host='0.0.0.0')
app.run(port=5000, host='0.0.0.0')
Run Code Online (Sandbox Code Playgroud)
一旦我运行这个,我得到
Traceback (most recent call last):
File "app.py", line 10, in <module>
socketio.run(app, port=5000, host='0.0.0.0')
File "C:\Python27\lib\site-packages\flask_socketio\__init__.py", line 475, in run
if self.server.eio.async_mode == 'threading':
AttributeError: 'NoneType' object has no attribute 'eio'
Run Code Online (Sandbox Code Playgroud)
我是 Flask 编码的初学者。希望你能帮助我。谢谢。
我试图从我的 Flask-socketio 服务器调用的函数
from flask_socketio import emit
import asyncio
async def myfunc():
for i in range(10):
j = 1*3
await emit('update', {'j':j})
Run Code Online (Sandbox Code Playgroud)
在我的服务器功能中,我正在运行
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.gather(myfunc())
loop.run_until_complete(task)
Run Code Online (Sandbox Code Playgroud)
我在成功发出的循环的第一次迭代中遇到错误。
File "path\to\Python\Python37-32\Lib\threading.py", line 917, in _bootstrap_inner
self.run()
File "path\to\Python\Python37-32\Lib\threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "path\to\lib\site-packages\socketio\server.py", line 636, in _handle_event_internal
r = server._trigger_event(data[0], namespace, sid, *data[1:])
File "path\to\lib\site-packages\socketio\server.py", line 665, in _trigger_event
return self.handlers[namespace][event](*args)
File "path\to\lib\site-packages\flask_socketio\__init__.py", line 280, in _handler
*args)
File "path\to\lib\site-packages\flask_socketio\__init__.py", line 694, …Run Code Online (Sandbox Code Playgroud) 我有一个简单的 Flask 聊天应用程序,我正在从教程中测试它。据说我只需要使用“python app.py”命令运行它,但是当我通过http://localhost:5000访问它时,我收到一条连续的消息:
我希望得到 200。我已经安装了 Flask、Flask-SocketIO 和 eventlet。任何可以为我指明正确方向的想法?谢谢你。
我需要调试帮助 - 同源策略不允许读取https://some-domain.com 上的远程资源。(原因:CORS 请求没有成功)在 python flask-socketio 错误中。
我正在使用 python flask-socketio 开发一个聊天应用程序。以前我在本地创建了该应用程序,它按预期工作正常,当我将下面的代码移动到服务器时,它显示了上述错误。客户端代码在 https 服务器上运行,服务器代码也在 https 服务器上运行,我不知道为什么会显示该错误。
我在下面附上了我的代码,请给我一个更好的解决方案。
服务器.py
import json
import os
from flask import Flask, render_template, request,session
from flask_socketio import SocketIO, send, emit
from datetime import timedelta,datetime
from flask_cors import CORS
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secretkey'
app.config['DEBUG'] = True
app.config['CORS_HEADERS'] = 'Content-Type'
cors = CORS(app, resources={r"/*": {"origins": "*"}})
socketio = SocketIO(app)
users = {}
@app.before_request
def make_session_permanent():
session.permanent = True
app.permanent_session_lifetime = timedelta(minutes=1)
@app.route('/')
#@cross_origin(origin='*',headers=['Content- Type','Authorization']) …Run Code Online (Sandbox Code Playgroud) 我的应用程序是在客户端打开摄像头,获取帧,在后端对其执行机器学习过程,然后将其返回给客户端。
这部分代码(以粗体显示)抛出错误 - PngImageFile' 对象没有属性 'shape'。
此代码行有问题 -frame = imutils.resize( pimg, width=700)
我猜有些处理的格式不正确。请指导
@socketio.on('image')
def image(data_image):
sbuf = io.StringIO()
sbuf.write(data_image)
# decode and convert into image
b = io.BytesIO(base64.b64decode(data_image))
pimg = Image.open(b)
# Process the image frame
frame = imutils.resize(**pimg,** width=700)
frame = cv2.flip(frame, 1)
imgencode = cv2.imencode('.jpg', frame)[1]
# base64 encode
stringData = base64.b64encode(imgencode).decode('utf-8')
b64_src = 'data:image/jpg;base64,'
stringData = b64_src + stringData
# emit the frame back
emit('response_back', stringData)
Run Code Online (Sandbox Code Playgroud) 我正在尝试在flask socketio和react native socketio之间创建连接,我已经用react native socketio准备了客户端,但是我在rpi中导入flask_socketio时遇到了问题。我尝试尽可能使用最简单的实现,这是我的代码:
from flask import Flask
from flask_socketio import SocketIO, emit
app = Flask(__name__)
@app.route('/', methods=['GET'])
def hello_world():
return "Hello World"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5005)Run Code Online (Sandbox Code Playgroud)
如果没有第 2 行,它可以完美工作,但我需要使用flask_socketio。这也是照片(第一次运行没有导入flask_socketio,然后我尝试导入它,但它不起作用。我尝试重新安装flask_socketio两次,重新启动但没有任何效果:/
:
我正在开发一个聊天应用程序,两个用户可以在其中相互交谈,我正在使用 Flask-socketio 和 React 来实现这一点。
我一直在努力解决我在使用 socket.io 时遇到的这个问题。这是我在浏览器控制台上偶尔遇到的错误连接:
与“ws://127.0.0.1:5000/socket.io/?EIO=4&transport=websocket&sid=s8DKBembmWoxgGAsAAqX”的 WebSocket 连接失败:资源不足
我很难找出导致此错误的原因。我解决这个问题的方法是刷新页面,然后重新建立连接。我想找到一个解决方案,让我不会一直与套接字断开连接并收到相同的错误消息。关于如何做到这一点有什么想法吗?
我正在使用Flask-SocketIO库开发Flask应用程序,以在服务器后台任务和客户端之间进行实时通信。
我需要在后台任务中访问我的数据库,但我不知道如何实现它,因为没有初始化应用程序上下文。
我的代码是:
@socketio.on('connect', namespace='/rt/notifications/')
def start_notifications_thread():
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(target=notifications_job)
def notifications_job():
last_alert_id = None
while True:
socketio.sleep(60)
last_id = Alert.get_last_alert_id() # Flask-SQLAlchemy model
if last_alert_id is not None and last_id != last_alert_id:
socketio.emit('new_alerts', {'msg': 'New alert', 'id': last_id}, namespace='/rt/notifications/')
last_alert_id = last_id
Run Code Online (Sandbox Code Playgroud) 目标:
socketio.run(app) 启动服务器并使用 while 循环将数据无限地发送到多个 JavaScript(客户端)。
数据来自另一个 while 循环,但该循环需要在脚本运行后启动(独立于客户端连接)以供其他使用。
当前的:
对于第一点,我已经有以下代码:
def background_thread():
while True:
socketio.emit('response',{'data': value},namespace='/test')
@socketio.on('connect', namespace='/test')
def test_connect():
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(target=background_thread)
emit('response', {'data': 'Connected'})
if __name__ == '__main__':
socketio.run(app, debug=True, host='localhost', port=args.portNum)
Run Code Online (Sandbox Code Playgroud)从上面,我仅在客户端连接到服务器后添加一个线程。我不知道如何从这一点实现第2点?我正在考虑使用另一个线程,但在组织代码时遇到问题,以便在客户端连接到服务器后socketio.start_background_task可以与默认 python 线程无限共享数据。
附加问题:如何让多个客户端连接到一台服务器?
谢谢大家!
我有一个非常简单的问题,但我还没有\xe2\x80\x99t找到一个非常简单的答案:何时应该使用start_background_task()而不是启动Python线程\xe2\x80\x9cnormally\xe2\x80\x9d?Flask-SocketIO 文档指出:
\n\n\n\n\n该函数返回一个与Python标准库中的Thread类兼容的对象。该函数已调用该对象的 start() 方法。
\n
它并没有过多说明是否有必要在初始化和启动每个模块的线程时使用它threading。
flask-socketio ×10
flask ×6
python ×6
cors ×1
flask-cors ×1
opencv ×1
socket.io ×1
sockets ×1
websocket ×1