From 5418fbb7f44a7b63aec5ea768c6069045078d5fd Mon Sep 17 00:00:00 2001 From: Niklas Haas Date: Sat, 27 Sep 2025 20:02:07 +0200 Subject: [PATCH] fftools/ffmpeg_sched: prevent demuxers from getting stuck When the furthest-behind stream is being fed by a demuxer that is also feeding packets to a choked filter graph, we need to unchoke that filter graph to prevent the demuxer from getting stuck trying to write packets to the choked filter graph. This situation can also apply recursively - if the demuxer is also writing to a filtergraph that is also reading from a choked demuxer, there is a similar deadlock. Solve all such deadlocks by just brute-force recursively unchoking all nodes that can somehow prevent this demuxer from writing packets. This should normally not result in any change in behavior, unless audio/video streams are badly desynchronized, in which case it may result in extra memory usage from the too-far-ahead stream buffering packets inside the muxer. (But this is, of course, preferable to a deadlock) Fixes: https://code.ffmpeg.org/FFmpeg/FFmpeg/issues/20611 Backported-From: 133a0bcb1385f3214e501970f0ced52dcde906cb --- fftools/ffmpeg_sched.c | 46 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index ef0b6e2897..6b338409a0 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1242,6 +1242,45 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src); + +// Unchoke any filter graphs that are downstream of this node, to prevent it +// from getting stuck trying to push data to a full queue +static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) +{ + SchFilterGraph *fg; + SchDec *dec; + SchEnc *enc; + switch (dst->type) { + case SCH_NODE_TYPE_DEC: + dec = &sch->dec[dst->idx]; + for (int i = 0; i < dec->nb_outputs; i++) + unchoke_downstream(sch, dec->outputs[i].dst); + break; + case SCH_NODE_TYPE_ENC: + enc = &sch->enc[dst->idx]; + for (int i = 0; i < enc->nb_dst; i++) + unchoke_downstream(sch, &enc->dst[i]); + break; + case SCH_NODE_TYPE_MUX: + // muxers are never choked + break; + case SCH_NODE_TYPE_FILTER_IN: + fg = &sch->filters[dst->idx]; + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + } else { + // ensure that this filter graph is not stuck waiting for + // input from a different upstream demuxer + unchoke_for_stream(sch, fg->inputs[fg->best_input].src_sched); + } + break; + default: + av_assert0(!"Invalid destination node type?"); + break; + } +} + static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) { while (1) { @@ -1249,7 +1288,12 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) // fed directly by a demuxer (i.e. not through a filtergraph) if (src.type == SCH_NODE_TYPE_DEMUX) { - sch->demux[src.idx].waiter.choked_next = 0; + SchDemux *demux = &sch->demux[src.idx]; + if (demux->waiter.choked_next == 0) + return; // prevent infinite loop + demux->waiter.choked_next = 0; + for (int i = 0; i < demux->nb_streams; i++) + unchoke_downstream(sch, demux->streams[i].dst); return; }