如何使用 FastApi 将 yt_dlp 的 Progress_hook 返回给最终用户?

Sci*_*nus 6 python fastapi yt-dlp

我的代码的相关部分看起来像这样:

@directory_router.get("/youtube-dl/{relative_path:path}", tags=["directory"])
def youtube_dl(relative_path, url, name=""):
    """
    Download
    """

    relative_path, _ = set_path(relative_path)

    logger.info(f"{DATA_PATH}{relative_path}")

    if name:
        name = f"{DATA_PATH}{relative_path}/{name}.%(ext)s"
    else:
        name = f"{DATA_PATH}{relative_path}/%(title)s.%(ext)s"

    ydl_opts = {
        "outtmpl": name,
        # "quiet": True
        "logger": logger,
        "progress_hooks": [yt_dlp_hook],
        # "force-overwrites": True
    }

    with yt.YoutubeDL(ydl_opts) as ydl:
        try:
            ydl.download([url])
        except Exception as exp:
            logger.info(exp)
            return str(exp)
Run Code Online (Sandbox Code Playgroud)

我正在使用此 webhook/端点来允许角度应用程序接受 url/名称输入并将文件下载到文件夹。我能够 logger.info .. 等输出 yt_dlp_hook 的值,如下所示:

def yt_dlp_hook(download):
    """
    download Hook

    Args:
        download (_type_): _description_
    """

    global TMP_KEYS

    if download.keys() != TMP_KEYS:
        logger.info(f'Status: {download["status"]}')
        logger.info(f'Dict Keys: {download.keys()}')
        TMP_KEYS = download.keys()
        logger.info(download)
Run Code Online (Sandbox Code Playgroud)

有没有办法将一串相关变量(如预计到达时间、下载速度等)传输到前端?有一个更好的方法吗?

小智 2

您可以使用队列对象在线程之间进行通信。因此,当您调用 youtube_dl 传入一个队列时,您可以在 yt_dlp_hook 内添加消息(您需要使用部分函数来构造它)。您最好使用 asyncio 在更新用户的同时运行下载,例如:

import asyncio
from functools import partial
import threading
from youtube_dl import YoutubeDL
from queue import LifoQueue, Empty


def main():
    # Set the url to download
    url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"

    # Get the current event loop
    loop = asyncio.get_event_loop()

    # Create a Last In First Out Queue to communicate between the threads
    queue = LifoQueue()

    # Create the future which will be marked as done once the file is downloaded
    coros = [youtube_dl(url, queue)]
    future = asyncio.gather(*coros)

    # Start a new thread to run the loop_in_thread function (with the positional arguments passed to it)
    t = threading.Thread(target=loop_in_thread, args=[loop, future])
    t.start()

    # While the future isn't finished yet continue
    while not future.done():
        try:
            # Get the latest status update from the que and print it
            data = queue.get_nowait()
            print(data)
        except Empty as e:
            print("no status updates available")
        finally:
            # Sleep between checking for updates
            asyncio.run(asyncio.sleep(0.1))


def loop_in_thread(loop, future):
    loop.run_until_complete(future)


async def youtube_dl(url, queue, name="temp.mp4"):
    """
    Download
    """

    yt_dlp_hook_partial = partial(yt_dlp_hook, queue)

    ydl_opts = {
        "outtmpl": name,
        "progress_hooks": [yt_dlp_hook_partial],
    }
    with YoutubeDL(ydl_opts) as ydl:
        return ydl.download([url])


def yt_dlp_hook(queue: LifoQueue, download):
    """
    download Hook

    Args:
        download (_type_): _description_
    """
    # Instead of logging the data just add the latest data to the queue
    queue.put(download)


if __name__ == "__main__":
    main()

Run Code Online (Sandbox Code Playgroud)