diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index ef0b6e2897..6b338409a0 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1242,6 +1242,45 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ return 0; } +static void unchoke_for_stream(Scheduler *sch, SchedulerNode src); + +// Unchoke any filter graphs that are downstream of this node, to prevent it +// from getting stuck trying to push data to a full queue +static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) +{ + SchFilterGraph *fg; + SchDec *dec; + SchEnc *enc; + switch (dst->type) { + case SCH_NODE_TYPE_DEC: + dec = &sch->dec[dst->idx]; + for (int i = 0; i < dec->nb_outputs; i++) + unchoke_downstream(sch, dec->outputs[i].dst); + break; + case SCH_NODE_TYPE_ENC: + enc = &sch->enc[dst->idx]; + for (int i = 0; i < enc->nb_dst; i++) + unchoke_downstream(sch, &enc->dst[i]); + break; + case SCH_NODE_TYPE_MUX: + // muxers are never choked + break; + case SCH_NODE_TYPE_FILTER_IN: + fg = &sch->filters[dst->idx]; + if (fg->best_input == fg->nb_inputs) { + fg->waiter.choked_next = 0; + } else { + // ensure that this filter graph is not stuck waiting for + // input from a different upstream demuxer + unchoke_for_stream(sch, fg->inputs[fg->best_input].src_sched); + } + break; + default: + av_assert0(!"Invalid destination node type?"); + break; + } +} + static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) { while (1) { @@ -1249,7 +1288,12 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) // fed directly by a demuxer (i.e. not through a filtergraph) if (src.type == SCH_NODE_TYPE_DEMUX) { - sch->demux[src.idx].waiter.choked_next = 0; + SchDemux *demux = &sch->demux[src.idx]; + if (demux->waiter.choked_next == 0) + return; // prevent infinite loop + demux->waiter.choked_next = 0; + for (int i = 0; i < demux->nb_streams; i++) + unchoke_downstream(sch, demux->streams[i].dst); return; }