gstreamer 中的无缝视频循环

vin*_*tch 1 python gstreamer python-gstreamer

我正在尝试使用 gstreamer 循环播放视频,它是 python 绑定。第一次尝试是挂钩 EOS消息并为管道生成搜索消息:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.EOS:  # End-Of-Stream: loop the video, seek to beginning
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.FLUSH,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("ERROR", message)
                break
        time.sleep(0.01) # Tried 0.001 - same result

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

它实际上工作得很好,除了一件事 - 寻找开始并不是真正无缝的。我可以看到微小的故障。因为视频是无限动画,所以这个小故障实际上变得明显。我的第二次尝试是使用队列解码帧和挂钩 EOS事件

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def cb_event(pad, info, *user_data):
    event = info.get_event()
    if event is not None and event.type == Gst.EventType.EOS:
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.PASS

def main():
    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)

    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

在第一个 EOS 事件之后,播放就停止了。我尝试了几种不同的方法,例如:传递EOS 事件、删除EOS 并将偏移量添加到解码器的源焊盘、将搜索事件发送到管道本身等。但我无法让它工作。

为了理解,我还尝试启用调试模式并使用焊盘探针编写我自己的管道活动记录器。调试模式不是很有用,日志非常庞大并且缺少一些细节。我自己的日志包括上游/下游事件和缓冲区计时信息。但是,我仍然无法理解出了什么问题以及如何使其工作。

显然,我不仅错过了一些东西,而且不了解有关 gstreamer 管道如何工作的一些基本知识。

所以,问题是:我应该如何处理第二个版本的代码才能让它工作?
附加问题:是否有一些工具或技术可以清楚地了解管道内部发生的事情及其包含的元素?

我将非常感谢详细的答案。对我来说,了解我做错了什么比仅仅让程序运行更重要。

ps程序在 GNU/Linux 下在 NanoPi S2 板上运行。视频存储在 MP4 容器中(不含音频)并使用 h264 压缩。请随意发布任何语言的代码示例,不一定是 Python。

vin*_*tch 7

哦,那好吧。我没有得到答案,所以我继续研究并最终找到了解决方案。下面我将展示两种不同的方法。首先 - 使用工作代码示例直接回答问题。第二种 - 不同的方法,它似乎更适合 gstreamer,而且绝对更简单。两者都给出了预期的结果 - 无缝视频循环。

更正的代码(答案,但不是最好的方法)

变化:

  1. 添加了视频时长查询。每个循环我们都应该增加视频持续时间值的时间偏移。它可以模拟无限连续的流。
  2. 发射的搜索事件移动到一个单独的线程。根据这篇文章,我们不能从流线程发出seek事件。另外,看看这个文件(来自提到的帖子的链接)。
  3. 事件回调现在丢弃FLUSH事件(连续流不应有FLUSH事件)。
  4. 视频解码器从 nxvideodec 更改为 avdec_h264。这与最初的问题无关,并且是出于非常特殊的原因

代码:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time
import threading

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

# UPD: Get video duration
pipeline0.set_state(Gst.State.PAUSED)
assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
assert duration_ok

######################################################

seek_requested = threading.Event()
# UPD: Seek thread. Wait for seek request from callback and generate seek event
def seek_thread_func(queue_sink_pad):
    cumulative_offset = 0
    while True:
        seek_requested.wait()
        seek_requested.clear()
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        # Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
        cumulative_offset += duration
        queue_sink_pad.set_offset(cumulative_offset)

def cb_event(pad, info):
    event = info.get_event()
    if event is not None:
        if event.type == Gst.EventType.EOS:  # UPD: Set 'seek_requested' flag
            seek_requested.set()
            return Gst.PadProbeReturn.DROP
        elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP:  # UPD: Drop FLUSH
            return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.OK

def main():
    queue_sink_pad = queue.get_static_pad("sink")

    # UPD: Create separate 'seek thread'
    threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()

    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
                           cb_event)

    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

此代码有效。当队列中的缓冲区仍在播放时有效地执行了查找。但是,我相信它可能包含一些缺陷甚至错误。例如,SEGMENT事件随RESET标志向下游传递;似乎不对。实现这种方法的更清晰(并且可能更正确/可靠)的方法是创建一个 gstreamer 插件。插件将管理事件并调整事件和缓冲区的时间戳。

但是有一个更简单和原生的解决方案:

使用段查找和SEGMENT_DONE消息

根据文档

段搜索(使用GST_SEEK_FLAG_SEGMENT)不会EOS 在播放段的末尾发出,但会SEGMENT_DONE 在总线上发布一条消息。此消息由在管道中驱动播放的元素发布,通常是分路器。应用程序收到消息后,可以重新连接管道或在管道中发出其他搜索事件。由于消息在管道中尽早发布,因此应用程序有一些时间发出新的搜索以实现无缝转换。通常,允许的延迟由接收器的缓冲区大小以及管道中任何队列的大小定义。

消息SEGMENT_DONE确实在队列变空之前发布。这提供了足够的时间来执行下一次搜索。所以我们需要做的就是在播放的最开始发出段搜索。然后等待SEGMENT_DONE消息并发送下一个非刷新搜索事件。这是工作示例:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)
    pipeline0.get_state(Gst.CLOCK_TIME_NONE)
    pipeline0.seek(1.0,
                   Gst.Format.TIME,
                   Gst.SeekFlags.SEGMENT,
                   Gst.SeekType.SET, 0,
                   Gst.SeekType.NONE, 0)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.SEGMENT_DONE:
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.SEGMENT,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("bus ERROR", message)
                break
        time.sleep(0.01)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

使用默认队列配置,SEGMENT_DONE消息在播放最后一个视频帧前大约 1 秒发布。非刷新查找确保不会丢失任何帧。这一起提供了完美的结果 - 真正无缝的视频循环。

注意:我将管道切换到 PLAYING 状态,然后执行初始非刷新查找。或者,我们可以将管道切换到 PAUSED 状态,执行刷新段查找,然后将管道切换到 PLAYING 状态。

注 2:不同来源建议的解决方案略有不同。请参阅下面的链接。


相关主题和来源:

  1. http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html
    • https://cgit.freedesktop.org/gstreamer/gst-editing-services/tree/plugins/nle/nlesource.c
  2. http://gstreamer-devel.966125.n4.nabble.com/Loop-a-file-using-playbin-without-artefacts-td4671952.html