FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDec {
75  const AVClass *class;
76 
79  uint8_t *dst_finished;
80  unsigned nb_dst;
81 
83  // Queue for receiving input packets, one stream.
85 
86  // Queue for sending post-flush end timestamps back to the source
89 
90  // temporary storage used by sch_dec_send()
92 } SchDec;
93 
94 typedef struct SchSyncQueue {
98 
99  unsigned *enc_idx;
100  unsigned nb_enc_idx;
101 } SchSyncQueue;
102 
103 typedef struct SchEnc {
104  const AVClass *class;
105 
108  uint8_t *dst_finished;
109  unsigned nb_dst;
110 
111  // [0] - index of the sync queue in Scheduler.sq_enc,
112  // [1] - index of this encoder in the sq
113  int sq_idx[2];
114 
115  /* Opening encoders is somewhat nontrivial due to their interaction with
116  * sync queues, which are (among other things) responsible for maintaining
117  * constant audio frame size, when it is required by the encoder.
118  *
119  * Opening the encoder requires stream parameters, obtained from the first
120  * frame. However, that frame cannot be properly chunked by the sync queue
121  * without knowing the required frame size, which is only available after
122  * opening the encoder.
123  *
124  * This apparent circular dependency is resolved in the following way:
125  * - the caller creating the encoder gives us a callback which opens the
126  * encoder and returns the required frame size (if any)
127  * - when the first frame is sent to the encoder, the sending thread
128  * - calls this callback, opening the encoder
129  * - passes the returned frame size to the sync queue
130  */
131  int (*open_cb)(void *opaque, const AVFrame *frame);
132  int opened;
133 
135  // Queue for receiving input frames, one stream.
137  // tq_send() to queue returned EOF
139 
140  // temporary storage used by sch_enc_send()
142 } SchEnc;
143 
144 typedef struct SchDemuxStream {
146  uint8_t *dst_finished;
147  unsigned nb_dst;
149 
150 typedef struct SchDemux {
151  const AVClass *class;
152 
154  unsigned nb_streams;
155 
158 
159  // temporary storage used by sch_demux_send()
161 
162  // protected by schedule_lock
164 } SchDemux;
165 
166 typedef struct PreMuxQueue {
167  /**
168  * Queue for buffering the packets before the muxer task can be started.
169  */
171  /**
172  * Maximum number of packets in fifo.
173  */
175  /*
176  * The size of the AVPackets' buffers in queue.
177  * Updated when a packet is either pushed or pulled from the queue.
178  */
179  size_t data_size;
180  /* Threshold after which max_packets will be in effect */
182 } PreMuxQueue;
183 
184 typedef struct SchMuxStream {
187 
188  unsigned *sub_heartbeat_dst;
190 
192 
193  // an EOF was generated while flushing the pre-mux queue
194  int init_eof;
195 
196  ////////////////////////////////////////////////////////////
197  // The following are protected by Scheduler.schedule_lock //
198 
199  /* dts+duration of the last packet sent to this stream
200  in AV_TIME_BASE_Q */
201  int64_t last_dts;
202  // this stream no longer accepts input
204  ////////////////////////////////////////////////////////////
205 } SchMuxStream;
206 
207 typedef struct SchMux {
208  const AVClass *class;
209 
211  unsigned nb_streams;
213 
214  int (*init)(void *arg);
215 
217  /**
218  * Set to 1 after starting the muxer task and flushing the
219  * pre-muxing queues.
220  * Set either before any tasks have started, or with
221  * Scheduler.mux_ready_lock held.
222  */
225  unsigned queue_size;
226 
228 } SchMux;
229 
230 typedef struct SchFilterIn {
235 } SchFilterIn;
236 
237 typedef struct SchFilterOut {
239 } SchFilterOut;
240 
241 typedef struct SchFilterGraph {
242  const AVClass *class;
243 
245  unsigned nb_inputs;
248 
250  unsigned nb_outputs;
251 
253  // input queue, nb_inputs+1 streams
254  // last stream is control
257 
258  // protected by schedule_lock
259  unsigned best_input;
262 
267 };
268 
269 struct Scheduler {
270  const AVClass *class;
271 
273  unsigned nb_demux;
274 
276  unsigned nb_mux;
277 
278  unsigned nb_mux_ready;
280 
281  unsigned nb_mux_done;
284 
285 
287  unsigned nb_dec;
288 
290  unsigned nb_enc;
291 
293  unsigned nb_sq_enc;
294 
296  unsigned nb_filters;
297 
299  int sdp_auto;
300 
304 
306 
308 };
309 
310 /**
311  * Wait until this task is allowed to proceed.
312  *
313  * @retval 0 the caller should proceed
314  * @retval 1 the caller should terminate
315  */
316 static int waiter_wait(Scheduler *sch, SchWaiter *w)
317 {
318  int terminate;
319 
320  if (!atomic_load(&w->choked))
321  return 0;
322 
323  pthread_mutex_lock(&w->lock);
324 
325  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
326  pthread_cond_wait(&w->cond, &w->lock);
327 
328  terminate = atomic_load(&sch->terminate);
329 
330  pthread_mutex_unlock(&w->lock);
331 
332  return terminate;
333 }
334 
335 static void waiter_set(SchWaiter *w, int choked)
336 {
337  pthread_mutex_lock(&w->lock);
338 
339  atomic_store(&w->choked, choked);
340  pthread_cond_signal(&w->cond);
341 
342  pthread_mutex_unlock(&w->lock);
343 }
344 
345 static int waiter_init(SchWaiter *w)
346 {
347  int ret;
348 
349  atomic_init(&w->choked, 0);
350 
351  ret = pthread_mutex_init(&w->lock, NULL);
352  if (ret)
353  return AVERROR(ret);
354 
355  ret = pthread_cond_init(&w->cond, NULL);
356  if (ret)
357  return AVERROR(ret);
358 
359  return 0;
360 }
361 
362 static void waiter_uninit(SchWaiter *w)
363 {
364  pthread_mutex_destroy(&w->lock);
365  pthread_cond_destroy(&w->cond);
366 }
367 
368 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
369  enum QueueType type)
370 {
371  ThreadQueue *tq;
372  ObjPool *op;
373 
374  if (queue_size <= 0) {
375  if (type == QUEUE_FRAMES)
376  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
377  else
379  }
380 
381  if (type == QUEUE_FRAMES) {
382  // This queue length is used in the decoder code to ensure that
383  // there are enough entries in fixed-size frame pools to account
384  // for frames held in queues inside the ffmpeg utility. If this
385  // can ever dynamically change then the corresponding decode
386  // code needs to be updated as well.
388  }
389 
392  if (!op)
393  return AVERROR(ENOMEM);
394 
395  tq = tq_alloc(nb_streams, queue_size, op,
397  if (!tq) {
398  objpool_free(&op);
399  return AVERROR(ENOMEM);
400  }
401 
402  *ptq = tq;
403  return 0;
404 }
405 
406 static void *task_wrapper(void *arg);
407 
408 static int task_start(SchTask *task)
409 {
410  int ret;
411 
412  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
413 
414  av_assert0(!task->thread_running);
415 
416  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
417  if (ret) {
418  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
419  strerror(ret));
420  return AVERROR(ret);
421  }
422 
423  task->thread_running = 1;
424  return 0;
425 }
426 
427 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
428  SchThreadFunc func, void *func_arg)
429 {
430  task->parent = sch;
431 
432  task->node.type = type;
433  task->node.idx = idx;
434 
435  task->func = func;
436  task->func_arg = func_arg;
437 }
438 
439 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
440 {
441  int64_t min_dts = INT64_MAX;
442 
443  for (unsigned i = 0; i < sch->nb_mux; i++) {
444  const SchMux *mux = &sch->mux[i];
445 
446  for (unsigned j = 0; j < mux->nb_streams; j++) {
447  const SchMuxStream *ms = &mux->streams[j];
448 
449  if (ms->source_finished && !count_finished)
450  continue;
451  if (ms->last_dts == AV_NOPTS_VALUE)
452  return AV_NOPTS_VALUE;
453 
454  min_dts = FFMIN(min_dts, ms->last_dts);
455  }
456  }
457 
458  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
459 }
460 
461 void sch_free(Scheduler **psch)
462 {
463  Scheduler *sch = *psch;
464 
465  if (!sch)
466  return;
467 
468  sch_stop(sch, NULL);
469 
470  for (unsigned i = 0; i < sch->nb_demux; i++) {
471  SchDemux *d = &sch->demux[i];
472 
473  for (unsigned j = 0; j < d->nb_streams; j++) {
474  SchDemuxStream *ds = &d->streams[j];
475  av_freep(&ds->dst);
476  av_freep(&ds->dst_finished);
477  }
478  av_freep(&d->streams);
479 
480  av_packet_free(&d->send_pkt);
481 
482  waiter_uninit(&d->waiter);
483  }
484  av_freep(&sch->demux);
485 
486  for (unsigned i = 0; i < sch->nb_mux; i++) {
487  SchMux *mux = &sch->mux[i];
488 
489  for (unsigned j = 0; j < mux->nb_streams; j++) {
490  SchMuxStream *ms = &mux->streams[j];
491 
492  if (ms->pre_mux_queue.fifo) {
493  AVPacket *pkt;
494  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
497  }
498 
500  }
501  av_freep(&mux->streams);
502 
504 
505  tq_free(&mux->queue);
506  }
507  av_freep(&sch->mux);
508 
509  for (unsigned i = 0; i < sch->nb_dec; i++) {
510  SchDec *dec = &sch->dec[i];
511 
512  tq_free(&dec->queue);
513 
515 
516  av_freep(&dec->dst);
517  av_freep(&dec->dst_finished);
518 
519  av_frame_free(&dec->send_frame);
520  }
521  av_freep(&sch->dec);
522 
523  for (unsigned i = 0; i < sch->nb_enc; i++) {
524  SchEnc *enc = &sch->enc[i];
525 
526  tq_free(&enc->queue);
527 
528  av_packet_free(&enc->send_pkt);
529 
530  av_freep(&enc->dst);
531  av_freep(&enc->dst_finished);
532  }
533  av_freep(&sch->enc);
534 
535  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
536  SchSyncQueue *sq = &sch->sq_enc[i];
537  sq_free(&sq->sq);
538  av_frame_free(&sq->frame);
540  av_freep(&sq->enc_idx);
541  }
542  av_freep(&sch->sq_enc);
543 
544  for (unsigned i = 0; i < sch->nb_filters; i++) {
545  SchFilterGraph *fg = &sch->filters[i];
546 
547  tq_free(&fg->queue);
548 
549  av_freep(&fg->inputs);
550  av_freep(&fg->outputs);
551 
552  waiter_uninit(&fg->waiter);
553  }
554  av_freep(&sch->filters);
555 
556  av_freep(&sch->sdp_filename);
557 
559 
561 
564 
565  av_freep(psch);
566 }
567 
568 static const AVClass scheduler_class = {
569  .class_name = "Scheduler",
570  .version = LIBAVUTIL_VERSION_INT,
571 };
572 
574 {
575  Scheduler *sch;
576  int ret;
577 
578  sch = av_mallocz(sizeof(*sch));
579  if (!sch)
580  return NULL;
581 
582  sch->class = &scheduler_class;
583  sch->sdp_auto = 1;
584 
586  if (ret)
587  goto fail;
588 
590  if (ret)
591  goto fail;
592 
594  if (ret)
595  goto fail;
596 
598  if (ret)
599  goto fail;
600 
601  return sch;
602 fail:
603  sch_free(&sch);
604  return NULL;
605 }
606 
607 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
608 {
609  av_freep(&sch->sdp_filename);
610  sch->sdp_filename = av_strdup(sdp_filename);
611  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
612 }
613 
614 static const AVClass sch_mux_class = {
615  .class_name = "SchMux",
616  .version = LIBAVUTIL_VERSION_INT,
617  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
618 };
619 
620 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
621  void *arg, int sdp_auto, unsigned thread_queue_size)
622 {
623  const unsigned idx = sch->nb_mux;
624 
625  SchMux *mux;
626  int ret;
627 
628  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
629  if (ret < 0)
630  return ret;
631 
632  mux = &sch->mux[idx];
633  mux->class = &sch_mux_class;
634  mux->init = init;
635  mux->queue_size = thread_queue_size;
636 
637  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
638 
639  sch->sdp_auto &= sdp_auto;
640 
641  return idx;
642 }
643 
644 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
645 {
646  SchMux *mux;
647  SchMuxStream *ms;
648  unsigned stream_idx;
649  int ret;
650 
651  av_assert0(mux_idx < sch->nb_mux);
652  mux = &sch->mux[mux_idx];
653 
654  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
655  if (ret < 0)
656  return ret;
657  stream_idx = mux->nb_streams - 1;
658 
659  ms = &mux->streams[stream_idx];
660 
661  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
662  if (!ms->pre_mux_queue.fifo)
663  return AVERROR(ENOMEM);
664 
665  ms->last_dts = AV_NOPTS_VALUE;
666 
667  return stream_idx;
668 }
669 
670 static const AVClass sch_demux_class = {
671  .class_name = "SchDemux",
672  .version = LIBAVUTIL_VERSION_INT,
673  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
674 };
675 
677 {
678  const unsigned idx = sch->nb_demux;
679 
680  SchDemux *d;
681  int ret;
682 
683  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
684  if (ret < 0)
685  return ret;
686 
687  d = &sch->demux[idx];
688 
689  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
690 
691  d->class = &sch_demux_class;
692  d->send_pkt = av_packet_alloc();
693  if (!d->send_pkt)
694  return AVERROR(ENOMEM);
695 
696  ret = waiter_init(&d->waiter);
697  if (ret < 0)
698  return ret;
699 
700  return idx;
701 }
702 
703 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
704 {
705  SchDemux *d;
706  int ret;
707 
708  av_assert0(demux_idx < sch->nb_demux);
709  d = &sch->demux[demux_idx];
710 
711  ret = GROW_ARRAY(d->streams, d->nb_streams);
712  return ret < 0 ? ret : d->nb_streams - 1;
713 }
714 
715 static const AVClass sch_dec_class = {
716  .class_name = "SchDec",
717  .version = LIBAVUTIL_VERSION_INT,
718  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
719 };
720 
722  int send_end_ts)
723 {
724  const unsigned idx = sch->nb_dec;
725 
726  SchDec *dec;
727  int ret;
728 
729  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
730  if (ret < 0)
731  return ret;
732 
733  dec = &sch->dec[idx];
734 
735  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
736 
737  dec->class = &sch_dec_class;
738  dec->send_frame = av_frame_alloc();
739  if (!dec->send_frame)
740  return AVERROR(ENOMEM);
741 
742  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
743  if (ret < 0)
744  return ret;
745 
746  if (send_end_ts) {
748  if (ret < 0)
749  return ret;
750  }
751 
752  return idx;
753 }
754 
755 static const AVClass sch_enc_class = {
756  .class_name = "SchEnc",
757  .version = LIBAVUTIL_VERSION_INT,
758  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
759 };
760 
762  int (*open_cb)(void *opaque, const AVFrame *frame))
763 {
764  const unsigned idx = sch->nb_enc;
765 
766  SchEnc *enc;
767  int ret;
768 
769  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
770  if (ret < 0)
771  return ret;
772 
773  enc = &sch->enc[idx];
774 
775  enc->class = &sch_enc_class;
776  enc->open_cb = open_cb;
777  enc->sq_idx[0] = -1;
778  enc->sq_idx[1] = -1;
779 
780  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
781 
782  enc->send_pkt = av_packet_alloc();
783  if (!enc->send_pkt)
784  return AVERROR(ENOMEM);
785 
786  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
787  if (ret < 0)
788  return ret;
789 
790  return idx;
791 }
792 
793 static const AVClass sch_fg_class = {
794  .class_name = "SchFilterGraph",
795  .version = LIBAVUTIL_VERSION_INT,
796  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
797 };
798 
799 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
800  SchThreadFunc func, void *ctx)
801 {
802  const unsigned idx = sch->nb_filters;
803 
804  SchFilterGraph *fg;
805  int ret;
806 
807  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
808  if (ret < 0)
809  return ret;
810  fg = &sch->filters[idx];
811 
812  fg->class = &sch_fg_class;
813 
814  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
815 
816  if (nb_inputs) {
817  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
818  if (!fg->inputs)
819  return AVERROR(ENOMEM);
820  fg->nb_inputs = nb_inputs;
821  }
822 
823  if (nb_outputs) {
824  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
825  if (!fg->outputs)
826  return AVERROR(ENOMEM);
827  fg->nb_outputs = nb_outputs;
828  }
829 
830  ret = waiter_init(&fg->waiter);
831  if (ret < 0)
832  return ret;
833 
834  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
835  if (ret < 0)
836  return ret;
837 
838  return idx;
839 }
840 
841 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
842 {
843  SchSyncQueue *sq;
844  int ret;
845 
846  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
847  if (ret < 0)
848  return ret;
849  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
850 
851  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
852  if (!sq->sq)
853  return AVERROR(ENOMEM);
854 
855  sq->frame = av_frame_alloc();
856  if (!sq->frame)
857  return AVERROR(ENOMEM);
858 
859  ret = pthread_mutex_init(&sq->lock, NULL);
860  if (ret)
861  return AVERROR(ret);
862 
863  return sq - sch->sq_enc;
864 }
865 
866 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
867  int limiting, uint64_t max_frames)
868 {
869  SchSyncQueue *sq;
870  SchEnc *enc;
871  int ret;
872 
873  av_assert0(sq_idx < sch->nb_sq_enc);
874  sq = &sch->sq_enc[sq_idx];
875 
876  av_assert0(enc_idx < sch->nb_enc);
877  enc = &sch->enc[enc_idx];
878 
879  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
880  if (ret < 0)
881  return ret;
882  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
883 
884  ret = sq_add_stream(sq->sq, limiting);
885  if (ret < 0)
886  return ret;
887 
888  enc->sq_idx[0] = sq_idx;
889  enc->sq_idx[1] = ret;
890 
891  if (max_frames != INT64_MAX)
892  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
893 
894  return 0;
895 }
896 
898 {
899  int ret;
900 
901  switch (src.type) {
902  case SCH_NODE_TYPE_DEMUX: {
903  SchDemuxStream *ds;
904 
905  av_assert0(src.idx < sch->nb_demux &&
906  src.idx_stream < sch->demux[src.idx].nb_streams);
907  ds = &sch->demux[src.idx].streams[src.idx_stream];
908 
909  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
910  if (ret < 0)
911  return ret;
912 
913  ds->dst[ds->nb_dst - 1] = dst;
914 
915  // demuxed packets go to decoding or streamcopy
916  switch (dst.type) {
917  case SCH_NODE_TYPE_DEC: {
918  SchDec *dec;
919 
920  av_assert0(dst.idx < sch->nb_dec);
921  dec = &sch->dec[dst.idx];
922 
923  av_assert0(!dec->src.type);
924  dec->src = src;
925  break;
926  }
927  case SCH_NODE_TYPE_MUX: {
928  SchMuxStream *ms;
929 
930  av_assert0(dst.idx < sch->nb_mux &&
931  dst.idx_stream < sch->mux[dst.idx].nb_streams);
932  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
933 
934  av_assert0(!ms->src.type);
935  ms->src = src;
936 
937  break;
938  }
939  default: av_assert0(0);
940  }
941 
942  break;
943  }
944  case SCH_NODE_TYPE_DEC: {
945  SchDec *dec;
946 
947  av_assert0(src.idx < sch->nb_dec);
948  dec = &sch->dec[src.idx];
949 
950  ret = GROW_ARRAY(dec->dst, dec->nb_dst);
951  if (ret < 0)
952  return ret;
953 
954  dec->dst[dec->nb_dst - 1] = dst;
955 
956  // decoded frames go to filters or encoding
957  switch (dst.type) {
959  SchFilterIn *fi;
960 
961  av_assert0(dst.idx < sch->nb_filters &&
962  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
963  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
964 
965  av_assert0(!fi->src.type);
966  fi->src = src;
967  break;
968  }
969  case SCH_NODE_TYPE_ENC: {
970  SchEnc *enc;
971 
972  av_assert0(dst.idx < sch->nb_enc);
973  enc = &sch->enc[dst.idx];
974 
975  av_assert0(!enc->src.type);
976  enc->src = src;
977  break;
978  }
979  default: av_assert0(0);
980  }
981 
982  break;
983  }
985  SchFilterOut *fo;
986 
987  av_assert0(src.idx < sch->nb_filters &&
988  src.idx_stream < sch->filters[src.idx].nb_outputs);
989  fo = &sch->filters[src.idx].outputs[src.idx_stream];
990 
991  av_assert0(!fo->dst.type);
992  fo->dst = dst;
993 
994  // filtered frames go to encoding or another filtergraph
995  switch (dst.type) {
996  case SCH_NODE_TYPE_ENC: {
997  SchEnc *enc;
998 
999  av_assert0(dst.idx < sch->nb_enc);
1000  enc = &sch->enc[dst.idx];
1001 
1002  av_assert0(!enc->src.type);
1003  enc->src = src;
1004  break;
1005  }
1006  case SCH_NODE_TYPE_FILTER_IN: {
1007  SchFilterIn *fi;
1008 
1009  av_assert0(dst.idx < sch->nb_filters &&
1010  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1011  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1012 
1013  av_assert0(!fi->src.type);
1014  fi->src = src;
1015  break;
1016  }
1017  default: av_assert0(0);
1018  }
1019 
1020 
1021  break;
1022  }
1023  case SCH_NODE_TYPE_ENC: {
1024  SchEnc *enc;
1025 
1026  av_assert0(src.idx < sch->nb_enc);
1027  enc = &sch->enc[src.idx];
1028 
1029  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1030  if (ret < 0)
1031  return ret;
1032 
1033  enc->dst[enc->nb_dst - 1] = dst;
1034 
1035  // encoding packets go to muxing or decoding
1036  switch (dst.type) {
1037  case SCH_NODE_TYPE_MUX: {
1038  SchMuxStream *ms;
1039 
1040  av_assert0(dst.idx < sch->nb_mux &&
1041  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1042  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1043 
1044  av_assert0(!ms->src.type);
1045  ms->src = src;
1046 
1047  break;
1048  }
1049  case SCH_NODE_TYPE_DEC: {
1050  SchDec *dec;
1051 
1052  av_assert0(dst.idx < sch->nb_dec);
1053  dec = &sch->dec[dst.idx];
1054 
1055  av_assert0(!dec->src.type);
1056  dec->src = src;
1057 
1058  break;
1059  }
1060  default: av_assert0(0);
1061  }
1062 
1063  break;
1064  }
1065  default: av_assert0(0);
1066  }
1067 
1068  return 0;
1069 }
1070 
1071 static int mux_task_start(SchMux *mux)
1072 {
1073  int ret = 0;
1074 
1075  ret = task_start(&mux->task);
1076  if (ret < 0)
1077  return ret;
1078 
1079  /* flush the pre-muxing queues */
1080  for (unsigned i = 0; i < mux->nb_streams; i++) {
1081  SchMuxStream *ms = &mux->streams[i];
1082  AVPacket *pkt;
1083 
1084  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1085  if (pkt) {
1086  if (!ms->init_eof)
1087  ret = tq_send(mux->queue, i, pkt);
1088  av_packet_free(&pkt);
1089  if (ret == AVERROR_EOF)
1090  ms->init_eof = 1;
1091  else if (ret < 0)
1092  return ret;
1093  } else
1094  tq_send_finish(mux->queue, i);
1095  }
1096  }
1097 
1098  atomic_store(&mux->mux_started, 1);
1099 
1100  return 0;
1101 }
1102 
1103 int print_sdp(const char *filename);
1104 
1105 static int mux_init(Scheduler *sch, SchMux *mux)
1106 {
1107  int ret;
1108 
1109  ret = mux->init(mux->task.func_arg);
1110  if (ret < 0)
1111  return ret;
1112 
1113  sch->nb_mux_ready++;
1114 
1115  if (sch->sdp_filename || sch->sdp_auto) {
1116  if (sch->nb_mux_ready < sch->nb_mux)
1117  return 0;
1118 
1119  ret = print_sdp(sch->sdp_filename);
1120  if (ret < 0) {
1121  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1122  return ret;
1123  }
1124 
1125  /* SDP is written only after all the muxers are ready, so now we
1126  * start ALL the threads */
1127  for (unsigned i = 0; i < sch->nb_mux; i++) {
1128  ret = mux_task_start(&sch->mux[i]);
1129  if (ret < 0)
1130  return ret;
1131  }
1132  } else {
1133  ret = mux_task_start(mux);
1134  if (ret < 0)
1135  return ret;
1136  }
1137 
1138  return 0;
1139 }
1140 
1141 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1142  size_t data_threshold, int max_packets)
1143 {
1144  SchMux *mux;
1145  SchMuxStream *ms;
1146 
1147  av_assert0(mux_idx < sch->nb_mux);
1148  mux = &sch->mux[mux_idx];
1149 
1150  av_assert0(stream_idx < mux->nb_streams);
1151  ms = &mux->streams[stream_idx];
1152 
1153  ms->pre_mux_queue.max_packets = max_packets;
1154  ms->pre_mux_queue.data_threshold = data_threshold;
1155 }
1156 
1157 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1158 {
1159  SchMux *mux;
1160  int ret = 0;
1161 
1162  av_assert0(mux_idx < sch->nb_mux);
1163  mux = &sch->mux[mux_idx];
1164 
1165  av_assert0(stream_idx < mux->nb_streams);
1166 
1168 
1169  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1170 
1171  // this may be called during initialization - do not start
1172  // threads before sch_start() is called
1173  if (++mux->nb_streams_ready == mux->nb_streams &&
1174  sch->state >= SCH_STATE_STARTED)
1175  ret = mux_init(sch, mux);
1176 
1178 
1179  return ret;
1180 }
1181 
1182 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1183  unsigned dec_idx)
1184 {
1185  SchMux *mux;
1186  SchMuxStream *ms;
1187  int ret = 0;
1188 
1189  av_assert0(mux_idx < sch->nb_mux);
1190  mux = &sch->mux[mux_idx];
1191 
1192  av_assert0(stream_idx < mux->nb_streams);
1193  ms = &mux->streams[stream_idx];
1194 
1196  if (ret < 0)
1197  return ret;
1198 
1199  av_assert0(dec_idx < sch->nb_dec);
1200  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1201 
1202  if (!mux->sub_heartbeat_pkt) {
1204  if (!mux->sub_heartbeat_pkt)
1205  return AVERROR(ENOMEM);
1206  }
1207 
1208  return 0;
1209 }
1210 
1212 {
1213  while (1) {
1214  SchFilterGraph *fg;
1215 
1216  // fed directly by a demuxer (i.e. not through a filtergraph)
1217  if (src.type == SCH_NODE_TYPE_DEMUX) {
1218  sch->demux[src.idx].waiter.choked_next = 0;
1219  return;
1220  }
1221 
1223  fg = &sch->filters[src.idx];
1224 
1225  // the filtergraph contains internal sources and
1226  // requested to be scheduled directly
1227  if (fg->best_input == fg->nb_inputs) {
1228  fg->waiter.choked_next = 0;
1229  return;
1230  }
1231 
1232  src = fg->inputs[fg->best_input].src_sched;
1233  }
1234 }
1235 
1237 {
1238  int64_t dts;
1239  int have_unchoked = 0;
1240 
1241  // on termination request all waiters are choked,
1242  // we are not to unchoke them
1243  if (atomic_load(&sch->terminate))
1244  return;
1245 
1246  dts = trailing_dts(sch, 0);
1247 
1248  atomic_store(&sch->last_dts, dts);
1249 
1250  // initialize our internal state
1251  for (unsigned type = 0; type < 2; type++)
1252  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1253  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1254  w->choked_prev = atomic_load(&w->choked);
1255  w->choked_next = 1;
1256  }
1257 
1258  // figure out the sources that are allowed to proceed
1259  for (unsigned i = 0; i < sch->nb_mux; i++) {
1260  SchMux *mux = &sch->mux[i];
1261 
1262  for (unsigned j = 0; j < mux->nb_streams; j++) {
1263  SchMuxStream *ms = &mux->streams[j];
1264 
1265  // unblock sources for output streams that are not finished
1266  // and not too far ahead of the trailing stream
1267  if (ms->source_finished)
1268  continue;
1269  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1270  continue;
1271  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1272  continue;
1273 
1274  // resolve the source to unchoke
1275  unchoke_for_stream(sch, ms->src_sched);
1276  have_unchoked = 1;
1277  }
1278  }
1279 
1280  // make sure to unchoke at least one source, if still available
1281  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1282  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1283  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1284  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1285  if (!exited) {
1286  w->choked_next = 0;
1287  have_unchoked = 1;
1288  break;
1289  }
1290  }
1291 
1292 
1293  for (unsigned type = 0; type < 2; type++)
1294  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1295  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1296  if (w->choked_prev != w->choked_next)
1297  waiter_set(w, w->choked_next);
1298  }
1299 
1300 }
1301 
1302 enum {
1306 };
1307 
1308 static int
1310  uint8_t *filters_visited, SchedulerNode *filters_stack)
1311 {
1312  unsigned nb_filters_stack = 0;
1313 
1314  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1315 
1316  while (1) {
1317  const SchFilterGraph *fg = &sch->filters[src.idx];
1318 
1319  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1320 
1321  // descend into every input, depth first
1322  if (src.idx_stream < fg->nb_inputs) {
1323  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1324 
1325  // connected to demuxer, no cycles possible
1326  if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1327  continue;
1328 
1329  // otherwise connected to another filtergraph
1331 
1332  // found a cycle
1333  if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1334  return AVERROR(EINVAL);
1335 
1336  // place current position on stack and descend
1337  av_assert0(nb_filters_stack < sch->nb_filters);
1338  filters_stack[nb_filters_stack++] = src;
1339  src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1340  continue;
1341  }
1342 
1343  filters_visited[src.idx] = CYCLE_NODE_DONE;
1344 
1345  // previous search finished,
1346  if (nb_filters_stack) {
1347  src = filters_stack[--nb_filters_stack];
1348  continue;
1349  }
1350  return 0;
1351  }
1352 }
1353 
1354 static int check_acyclic(Scheduler *sch)
1355 {
1356  uint8_t *filters_visited = NULL;
1357  SchedulerNode *filters_stack = NULL;
1358 
1359  int ret = 0;
1360 
1361  if (!sch->nb_filters)
1362  return 0;
1363 
1364  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1365  if (!filters_visited)
1366  return AVERROR(ENOMEM);
1367 
1368  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1369  if (!filters_stack) {
1370  ret = AVERROR(ENOMEM);
1371  goto fail;
1372  }
1373 
1374  // trace the transcoding graph upstream from every filtegraph
1375  for (unsigned i = 0; i < sch->nb_filters; i++) {
1376  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1377  filters_visited, filters_stack);
1378  if (ret < 0) {
1379  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1380  goto fail;
1381  }
1382  }
1383 
1384 fail:
1385  av_freep(&filters_visited);
1386  av_freep(&filters_stack);
1387  return ret;
1388 }
1389 
1390 static int start_prepare(Scheduler *sch)
1391 {
1392  int ret;
1393 
1394  for (unsigned i = 0; i < sch->nb_demux; i++) {
1395  SchDemux *d = &sch->demux[i];
1396 
1397  for (unsigned j = 0; j < d->nb_streams; j++) {
1398  SchDemuxStream *ds = &d->streams[j];
1399 
1400  if (!ds->nb_dst) {
1402  "Demuxer stream %u not connected to any sink\n", j);
1403  return AVERROR(EINVAL);
1404  }
1405 
1406  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1407  if (!ds->dst_finished)
1408  return AVERROR(ENOMEM);
1409  }
1410  }
1411 
1412  for (unsigned i = 0; i < sch->nb_dec; i++) {
1413  SchDec *dec = &sch->dec[i];
1414 
1415  if (!dec->src.type) {
1416  av_log(dec, AV_LOG_ERROR,
1417  "Decoder not connected to a source\n");
1418  return AVERROR(EINVAL);
1419  }
1420  if (!dec->nb_dst) {
1421  av_log(dec, AV_LOG_ERROR,
1422  "Decoder not connected to any sink\n");
1423  return AVERROR(EINVAL);
1424  }
1425 
1426  dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
1427  if (!dec->dst_finished)
1428  return AVERROR(ENOMEM);
1429  }
1430 
1431  for (unsigned i = 0; i < sch->nb_enc; i++) {
1432  SchEnc *enc = &sch->enc[i];
1433 
1434  if (!enc->src.type) {
1435  av_log(enc, AV_LOG_ERROR,
1436  "Encoder not connected to a source\n");
1437  return AVERROR(EINVAL);
1438  }
1439  if (!enc->nb_dst) {
1440  av_log(enc, AV_LOG_ERROR,
1441  "Encoder not connected to any sink\n");
1442  return AVERROR(EINVAL);
1443  }
1444 
1445  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1446  if (!enc->dst_finished)
1447  return AVERROR(ENOMEM);
1448  }
1449 
1450  for (unsigned i = 0; i < sch->nb_mux; i++) {
1451  SchMux *mux = &sch->mux[i];
1452 
1453  for (unsigned j = 0; j < mux->nb_streams; j++) {
1454  SchMuxStream *ms = &mux->streams[j];
1455 
1456  switch (ms->src.type) {
1457  case SCH_NODE_TYPE_ENC: {
1458  SchEnc *enc = &sch->enc[ms->src.idx];
1459  if (enc->src.type == SCH_NODE_TYPE_DEC) {
1460  ms->src_sched = sch->dec[enc->src.idx].src;
1462  } else {
1463  ms->src_sched = enc->src;
1465  }
1466  break;
1467  }
1468  case SCH_NODE_TYPE_DEMUX:
1469  ms->src_sched = ms->src;
1470  break;
1471  default:
1472  av_log(mux, AV_LOG_ERROR,
1473  "Muxer stream #%u not connected to a source\n", j);
1474  return AVERROR(EINVAL);
1475  }
1476  }
1477 
1478  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1479  QUEUE_PACKETS);
1480  if (ret < 0)
1481  return ret;
1482  }
1483 
1484  for (unsigned i = 0; i < sch->nb_filters; i++) {
1485  SchFilterGraph *fg = &sch->filters[i];
1486 
1487  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1488  SchFilterIn *fi = &fg->inputs[j];
1489  SchDec *dec;
1490 
1491  if (!fi->src.type) {
1492  av_log(fg, AV_LOG_ERROR,
1493  "Filtergraph input %u not connected to a source\n", j);
1494  return AVERROR(EINVAL);
1495  }
1496 
1497  if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1498  fi->src_sched = fi->src;
1499  else {
1501  dec = &sch->dec[fi->src.idx];
1502 
1503  switch (dec->src.type) {
1504  case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1505  case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1506  default: av_assert0(0);
1507  }
1508  }
1509  }
1510 
1511  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1512  SchFilterOut *fo = &fg->outputs[j];
1513 
1514  if (!fo->dst.type) {
1515  av_log(fg, AV_LOG_ERROR,
1516  "Filtergraph %u output %u not connected to a sink\n", i, j);
1517  return AVERROR(EINVAL);
1518  }
1519  }
1520  }
1521 
1522  // Check that the transcoding graph has no cycles.
1523  ret = check_acyclic(sch);
1524  if (ret < 0)
1525  return ret;
1526 
1527  return 0;
1528 }
1529 
1531 {
1532  int ret;
1533 
1534  ret = start_prepare(sch);
1535  if (ret < 0)
1536  return ret;
1537 
1539  sch->state = SCH_STATE_STARTED;
1540 
1541  for (unsigned i = 0; i < sch->nb_mux; i++) {
1542  SchMux *mux = &sch->mux[i];
1543 
1544  if (mux->nb_streams_ready == mux->nb_streams) {
1545  ret = mux_init(sch, mux);
1546  if (ret < 0)
1547  goto fail;
1548  }
1549  }
1550 
1551  for (unsigned i = 0; i < sch->nb_enc; i++) {
1552  SchEnc *enc = &sch->enc[i];
1553 
1554  ret = task_start(&enc->task);
1555  if (ret < 0)
1556  goto fail;
1557  }
1558 
1559  for (unsigned i = 0; i < sch->nb_filters; i++) {
1560  SchFilterGraph *fg = &sch->filters[i];
1561 
1562  ret = task_start(&fg->task);
1563  if (ret < 0)
1564  goto fail;
1565  }
1566 
1567  for (unsigned i = 0; i < sch->nb_dec; i++) {
1568  SchDec *dec = &sch->dec[i];
1569 
1570  ret = task_start(&dec->task);
1571  if (ret < 0)
1572  goto fail;
1573  }
1574 
1575  for (unsigned i = 0; i < sch->nb_demux; i++) {
1576  SchDemux *d = &sch->demux[i];
1577 
1578  if (!d->nb_streams)
1579  continue;
1580 
1581  ret = task_start(&d->task);
1582  if (ret < 0)
1583  goto fail;
1584  }
1585 
1589 
1590  return 0;
1591 fail:
1592  sch_stop(sch, NULL);
1593  return ret;
1594 }
1595 
1596 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1597 {
1598  int ret, err;
1599 
1600  // convert delay to absolute timestamp
1601  timeout_us += av_gettime();
1602 
1604 
1605  if (sch->nb_mux_done < sch->nb_mux) {
1606  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1607  .tv_nsec = (timeout_us % 1000000) * 1000 };
1609  }
1610 
1611  ret = sch->nb_mux_done == sch->nb_mux;
1612 
1614 
1615  *transcode_ts = atomic_load(&sch->last_dts);
1616 
1617  // abort transcoding if any task failed
1618  err = atomic_load(&sch->task_failed);
1619 
1620  return ret || err;
1621 }
1622 
1623 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1624 {
1625  int ret;
1626 
1627  ret = enc->open_cb(enc->task.func_arg, frame);
1628  if (ret < 0)
1629  return ret;
1630 
1631  // ret>0 signals audio frame size, which means sync queue must
1632  // have been enabled during encoder creation
1633  if (ret > 0) {
1634  SchSyncQueue *sq;
1635 
1636  av_assert0(enc->sq_idx[0] >= 0);
1637  sq = &sch->sq_enc[enc->sq_idx[0]];
1638 
1639  pthread_mutex_lock(&sq->lock);
1640 
1641  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1642 
1643  pthread_mutex_unlock(&sq->lock);
1644  }
1645 
1646  return 0;
1647 }
1648 
1650 {
1651  int ret;
1652 
1653  if (!frame) {
1654  tq_send_finish(enc->queue, 0);
1655  return 0;
1656  }
1657 
1658  if (enc->in_finished)
1659  return AVERROR_EOF;
1660 
1661  ret = tq_send(enc->queue, 0, frame);
1662  if (ret < 0)
1663  enc->in_finished = 1;
1664 
1665  return ret;
1666 }
1667 
1668 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1669 {
1670  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1671  int ret = 0;
1672 
1673  // inform the scheduling code that no more input will arrive along this path;
1674  // this is necessary because the sync queue may not send an EOF downstream
1675  // until other streams finish
1676  // TODO: consider a cleaner way of passing this information through
1677  // the pipeline
1678  if (!frame) {
1679  for (unsigned i = 0; i < enc->nb_dst; i++) {
1680  SchMux *mux;
1681  SchMuxStream *ms;
1682 
1683  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1684  continue;
1685 
1686  mux = &sch->mux[enc->dst[i].idx];
1687  ms = &mux->streams[enc->dst[i].idx_stream];
1688 
1690 
1691  ms->source_finished = 1;
1693 
1695  }
1696  }
1697 
1698  pthread_mutex_lock(&sq->lock);
1699 
1700  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1701  if (ret < 0)
1702  goto finish;
1703 
1704  while (1) {
1705  SchEnc *enc;
1706 
1707  // TODO: the SQ API should be extended to allow returning EOF
1708  // for individual streams
1709  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1710  if (ret < 0) {
1711  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1712  break;
1713  }
1714 
1715  enc = &sch->enc[sq->enc_idx[ret]];
1716  ret = send_to_enc_thread(sch, enc, sq->frame);
1717  if (ret < 0) {
1718  av_frame_unref(sq->frame);
1719  if (ret != AVERROR_EOF)
1720  break;
1721 
1722  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1723  continue;
1724  }
1725  }
1726 
1727  if (ret < 0) {
1728  // close all encoders fed from this sync queue
1729  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1730  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1731 
1732  // if the sync queue error is EOF and closing the encoder
1733  // produces a more serious error, make sure to pick the latter
1734  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1735  }
1736  }
1737 
1738 finish:
1739  pthread_mutex_unlock(&sq->lock);
1740 
1741  return ret;
1742 }
1743 
1744 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1745 {
1746  if (enc->open_cb && frame && !enc->opened) {
1747  int ret = enc_open(sch, enc, frame);
1748  if (ret < 0)
1749  return ret;
1750  enc->opened = 1;
1751 
1752  // discard empty frames that only carry encoder init parameters
1753  if (!frame->buf[0]) {
1755  return 0;
1756  }
1757  }
1758 
1759  return (enc->sq_idx[0] >= 0) ?
1760  send_to_enc_sq (sch, enc, frame) :
1761  send_to_enc_thread(sch, enc, frame);
1762 }
1763 
1765 {
1766  PreMuxQueue *q = &ms->pre_mux_queue;
1767  AVPacket *tmp_pkt = NULL;
1768  int ret;
1769 
1770  if (!av_fifo_can_write(q->fifo)) {
1771  size_t packets = av_fifo_can_read(q->fifo);
1772  size_t pkt_size = pkt ? pkt->size : 0;
1773  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1774  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1775  size_t new_size = FFMIN(2 * packets, max_packets);
1776 
1777  if (new_size <= packets) {
1778  av_log(mux, AV_LOG_ERROR,
1779  "Too many packets buffered for output stream.\n");
1780  return AVERROR(ENOSPC);
1781  }
1782  ret = av_fifo_grow2(q->fifo, new_size - packets);
1783  if (ret < 0)
1784  return ret;
1785  }
1786 
1787  if (pkt) {
1788  tmp_pkt = av_packet_alloc();
1789  if (!tmp_pkt)
1790  return AVERROR(ENOMEM);
1791 
1792  av_packet_move_ref(tmp_pkt, pkt);
1793  q->data_size += tmp_pkt->size;
1794  }
1795  av_fifo_write(q->fifo, &tmp_pkt, 1);
1796 
1797  return 0;
1798 }
1799 
1800 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1801  AVPacket *pkt)
1802 {
1803  SchMuxStream *ms = &mux->streams[stream_idx];
1804  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1807 
1808  // queue the packet if the muxer cannot be started yet
1809  if (!atomic_load(&mux->mux_started)) {
1810  int queued = 0;
1811 
1812  // the muxer could have started between the above atomic check and
1813  // locking the mutex, then this block falls through to normal send path
1815 
1816  if (!atomic_load(&mux->mux_started)) {
1817  int ret = mux_queue_packet(mux, ms, pkt);
1818  queued = ret < 0 ? ret : 1;
1819  }
1820 
1822 
1823  if (queued < 0)
1824  return queued;
1825  else if (queued)
1826  goto update_schedule;
1827  }
1828 
1829  if (pkt) {
1830  int ret;
1831 
1832  if (ms->init_eof)
1833  return AVERROR_EOF;
1834 
1835  ret = tq_send(mux->queue, stream_idx, pkt);
1836  if (ret < 0)
1837  return ret;
1838  } else
1839  tq_send_finish(mux->queue, stream_idx);
1840 
1841 update_schedule:
1842  // TODO: use atomics to check whether this changes trailing dts
1843  // to avoid locking unnecesarily
1844  if (dts != AV_NOPTS_VALUE || !pkt) {
1846 
1847  if (pkt) ms->last_dts = dts;
1848  else ms->source_finished = 1;
1849 
1851 
1853  }
1854 
1855  return 0;
1856 }
1857 
1858 static int
1860  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1861 {
1862  int ret;
1863 
1864  if (*dst_finished)
1865  return AVERROR_EOF;
1866 
1867  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1870  pkt = NULL;
1871  }
1872 
1873  if (!pkt)
1874  goto finish;
1875 
1876  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1877  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1878  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1879  if (ret == AVERROR_EOF)
1880  goto finish;
1881 
1882  return ret;
1883 
1884 finish:
1885  if (dst.type == SCH_NODE_TYPE_MUX)
1886  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1887  else
1888  tq_send_finish(sch->dec[dst.idx].queue, 0);
1889 
1890  *dst_finished = 1;
1891  return AVERROR_EOF;
1892 }
1893 
1895  AVPacket *pkt, unsigned flags)
1896 {
1897  unsigned nb_done = 0;
1898 
1899  for (unsigned i = 0; i < ds->nb_dst; i++) {
1900  AVPacket *to_send = pkt;
1901  uint8_t *finished = &ds->dst_finished[i];
1902 
1903  int ret;
1904 
1905  // sending a packet consumes it, so make a temporary reference if needed
1906  if (pkt && i < ds->nb_dst - 1) {
1907  to_send = d->send_pkt;
1908 
1909  ret = av_packet_ref(to_send, pkt);
1910  if (ret < 0)
1911  return ret;
1912  }
1913 
1914  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1915  if (to_send)
1916  av_packet_unref(to_send);
1917  if (ret == AVERROR_EOF)
1918  nb_done++;
1919  else if (ret < 0)
1920  return ret;
1921  }
1922 
1923  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1924 }
1925 
1927 {
1928  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1929 
1930  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1931 
1932  for (unsigned i = 0; i < d->nb_streams; i++) {
1933  SchDemuxStream *ds = &d->streams[i];
1934 
1935  for (unsigned j = 0; j < ds->nb_dst; j++) {
1936  const SchedulerNode *dst = &ds->dst[j];
1937  SchDec *dec;
1938  int ret;
1939 
1940  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1941  continue;
1942 
1943  dec = &sch->dec[dst->idx];
1944 
1945  ret = tq_send(dec->queue, 0, pkt);
1946  if (ret < 0)
1947  return ret;
1948 
1949  if (dec->queue_end_ts) {
1950  Timestamp ts;
1952  if (ret < 0)
1953  return ret;
1954 
1955  if (max_end_ts.ts == AV_NOPTS_VALUE ||
1956  (ts.ts != AV_NOPTS_VALUE &&
1957  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1958  max_end_ts = ts;
1959 
1960  }
1961  }
1962  }
1963 
1964  pkt->pts = max_end_ts.ts;
1965  pkt->time_base = max_end_ts.tb;
1966 
1967  return 0;
1968 }
1969 
1970 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
1971  unsigned flags)
1972 {
1973  SchDemux *d;
1974  int terminate;
1975 
1976  av_assert0(demux_idx < sch->nb_demux);
1977  d = &sch->demux[demux_idx];
1978 
1979  terminate = waiter_wait(sch, &d->waiter);
1980  if (terminate)
1981  return AVERROR_EXIT;
1982 
1983  // flush the downstreams after seek
1984  if (pkt->stream_index == -1)
1985  return demux_flush(sch, d, pkt);
1986 
1987  av_assert0(pkt->stream_index < d->nb_streams);
1988 
1989  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
1990 }
1991 
1992 static int demux_done(Scheduler *sch, unsigned demux_idx)
1993 {
1994  SchDemux *d = &sch->demux[demux_idx];
1995  int ret = 0;
1996 
1997  for (unsigned i = 0; i < d->nb_streams; i++) {
1998  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
1999  if (err != AVERROR_EOF)
2000  ret = err_merge(ret, err);
2001  }
2002 
2004 
2005  d->task_exited = 1;
2006 
2008 
2010 
2011  return ret;
2012 }
2013 
2014 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2015 {
2016  SchMux *mux;
2017  int ret, stream_idx;
2018 
2019  av_assert0(mux_idx < sch->nb_mux);
2020  mux = &sch->mux[mux_idx];
2021 
2022  ret = tq_receive(mux->queue, &stream_idx, pkt);
2023  pkt->stream_index = stream_idx;
2024  return ret;
2025 }
2026 
2027 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2028 {
2029  SchMux *mux;
2030 
2031  av_assert0(mux_idx < sch->nb_mux);
2032  mux = &sch->mux[mux_idx];
2033 
2034  av_assert0(stream_idx < mux->nb_streams);
2035  tq_receive_finish(mux->queue, stream_idx);
2036 
2038  mux->streams[stream_idx].source_finished = 1;
2039 
2041 
2043 }
2044 
2045 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2046  const AVPacket *pkt)
2047 {
2048  SchMux *mux;
2049  SchMuxStream *ms;
2050 
2051  av_assert0(mux_idx < sch->nb_mux);
2052  mux = &sch->mux[mux_idx];
2053 
2054  av_assert0(stream_idx < mux->nb_streams);
2055  ms = &mux->streams[stream_idx];
2056 
2057  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2058  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2059  int ret;
2060 
2062  if (ret < 0)
2063  return ret;
2064 
2065  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2066  }
2067 
2068  return 0;
2069 }
2070 
2071 static int mux_done(Scheduler *sch, unsigned mux_idx)
2072 {
2073  SchMux *mux = &sch->mux[mux_idx];
2074 
2076 
2077  for (unsigned i = 0; i < mux->nb_streams; i++) {
2078  tq_receive_finish(mux->queue, i);
2079  mux->streams[i].source_finished = 1;
2080  }
2081 
2083 
2085 
2087 
2088  av_assert0(sch->nb_mux_done < sch->nb_mux);
2089  sch->nb_mux_done++;
2090 
2092 
2094 
2095  return 0;
2096 }
2097 
2098 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2099 {
2100  SchDec *dec;
2101  int ret, dummy;
2102 
2103  av_assert0(dec_idx < sch->nb_dec);
2104  dec = &sch->dec[dec_idx];
2105 
2106  // the decoder should have given us post-flush end timestamp in pkt
2107  if (dec->expect_end_ts) {
2108  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2110  if (ret < 0)
2111  return ret;
2112 
2113  dec->expect_end_ts = 0;
2114  }
2115 
2116  ret = tq_receive(dec->queue, &dummy, pkt);
2117  av_assert0(dummy <= 0);
2118 
2119  // got a flush packet, on the next call to this function the decoder
2120  // will give us post-flush end timestamp
2121  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2122  dec->expect_end_ts = 1;
2123 
2124  return ret;
2125 }
2126 
2128  unsigned in_idx, AVFrame *frame)
2129 {
2130  if (frame)
2131  return tq_send(fg->queue, in_idx, frame);
2132 
2133  if (!fg->inputs[in_idx].send_finished) {
2134  fg->inputs[in_idx].send_finished = 1;
2135  tq_send_finish(fg->queue, in_idx);
2136 
2137  // close the control stream when all actual inputs are done
2138  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2139  tq_send_finish(fg->queue, fg->nb_inputs);
2140  }
2141  return 0;
2142 }
2143 
2144 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2145  uint8_t *dst_finished, AVFrame *frame)
2146 {
2147  int ret;
2148 
2149  if (*dst_finished)
2150  return AVERROR_EOF;
2151 
2152  if (!frame)
2153  goto finish;
2154 
2155  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2156  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2157  send_to_enc(sch, &sch->enc[dst.idx], frame);
2158  if (ret == AVERROR_EOF)
2159  goto finish;
2160 
2161  return ret;
2162 
2163 finish:
2164  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2165  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2166  else
2167  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2168 
2169  *dst_finished = 1;
2170 
2171  return AVERROR_EOF;
2172 }
2173 
2174 int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
2175 {
2176  SchDec *dec;
2177  int ret = 0;
2178  unsigned nb_done = 0;
2179 
2180  av_assert0(dec_idx < sch->nb_dec);
2181  dec = &sch->dec[dec_idx];
2182 
2183  for (unsigned i = 0; i < dec->nb_dst; i++) {
2184  uint8_t *finished = &dec->dst_finished[i];
2185  AVFrame *to_send = frame;
2186 
2187  // sending a frame consumes it, so make a temporary reference if needed
2188  if (i < dec->nb_dst - 1) {
2189  to_send = dec->send_frame;
2190 
2191  // frame may sometimes contain props only,
2192  // e.g. to signal EOF timestamp
2193  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2194  av_frame_copy_props(to_send, frame);
2195  if (ret < 0)
2196  return ret;
2197  }
2198 
2199  ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
2200  if (ret < 0) {
2201  av_frame_unref(to_send);
2202  if (ret == AVERROR_EOF) {
2203  nb_done++;
2204  ret = 0;
2205  continue;
2206  }
2207  return ret;
2208  }
2209  }
2210 
2211  return (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
2212 }
2213 
2214 static int dec_done(Scheduler *sch, unsigned dec_idx)
2215 {
2216  SchDec *dec = &sch->dec[dec_idx];
2217  int ret = 0;
2218 
2219  tq_receive_finish(dec->queue, 0);
2220 
2221  // make sure our source does not get stuck waiting for end timestamps
2222  // that will never arrive
2223  if (dec->queue_end_ts)
2225 
2226  for (unsigned i = 0; i < dec->nb_dst; i++) {
2227  int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
2228  if (err < 0 && err != AVERROR_EOF)
2229  ret = err_merge(ret, err);
2230  }
2231 
2232  return ret;
2233 }
2234 
2235 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2236 {
2237  SchEnc *enc;
2238  int ret, dummy;
2239 
2240  av_assert0(enc_idx < sch->nb_enc);
2241  enc = &sch->enc[enc_idx];
2242 
2243  ret = tq_receive(enc->queue, &dummy, frame);
2244  av_assert0(dummy <= 0);
2245 
2246  return ret;
2247 }
2248 
2249 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2250  uint8_t *dst_finished, AVPacket *pkt)
2251 {
2252  int ret;
2253 
2254  if (*dst_finished)
2255  return AVERROR_EOF;
2256 
2257  if (!pkt)
2258  goto finish;
2259 
2260  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2261  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2262  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2263  if (ret == AVERROR_EOF)
2264  goto finish;
2265 
2266  return ret;
2267 
2268 finish:
2269  if (dst.type == SCH_NODE_TYPE_MUX)
2270  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2271  else
2272  tq_send_finish(sch->dec[dst.idx].queue, 0);
2273 
2274  *dst_finished = 1;
2275 
2276  return AVERROR_EOF;
2277 }
2278 
2279 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2280 {
2281  SchEnc *enc;
2282  int ret;
2283 
2284  av_assert0(enc_idx < sch->nb_enc);
2285  enc = &sch->enc[enc_idx];
2286 
2287  for (unsigned i = 0; i < enc->nb_dst; i++) {
2288  uint8_t *finished = &enc->dst_finished[i];
2289  AVPacket *to_send = pkt;
2290 
2291  // sending a packet consumes it, so make a temporary reference if needed
2292  if (i < enc->nb_dst - 1) {
2293  to_send = enc->send_pkt;
2294 
2295  ret = av_packet_ref(to_send, pkt);
2296  if (ret < 0)
2297  return ret;
2298  }
2299 
2300  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2301  if (ret < 0) {
2302  av_packet_unref(to_send);
2303  if (ret == AVERROR_EOF)
2304  continue;
2305  return ret;
2306  }
2307  }
2308 
2309  return 0;
2310 }
2311 
2312 static int enc_done(Scheduler *sch, unsigned enc_idx)
2313 {
2314  SchEnc *enc = &sch->enc[enc_idx];
2315  int ret = 0;
2316 
2317  tq_receive_finish(enc->queue, 0);
2318 
2319  for (unsigned i = 0; i < enc->nb_dst; i++) {
2320  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2321  if (err < 0 && err != AVERROR_EOF)
2322  ret = err_merge(ret, err);
2323  }
2324 
2325  return ret;
2326 }
2327 
2328 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2329  unsigned *in_idx, AVFrame *frame)
2330 {
2331  SchFilterGraph *fg;
2332 
2333  av_assert0(fg_idx < sch->nb_filters);
2334  fg = &sch->filters[fg_idx];
2335 
2336  av_assert0(*in_idx <= fg->nb_inputs);
2337 
2338  // update scheduling to account for desired input stream, if it changed
2339  //
2340  // this check needs no locking because only the filtering thread
2341  // updates this value
2342  if (*in_idx != fg->best_input) {
2344 
2345  fg->best_input = *in_idx;
2347 
2349  }
2350 
2351  if (*in_idx == fg->nb_inputs) {
2352  int terminate = waiter_wait(sch, &fg->waiter);
2353  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2354  }
2355 
2356  while (1) {
2357  int ret, idx;
2358 
2359  ret = tq_receive(fg->queue, &idx, frame);
2360  if (idx < 0)
2361  return AVERROR_EOF;
2362  else if (ret >= 0) {
2363  *in_idx = idx;
2364  return 0;
2365  }
2366 
2367  // disregard EOFs for specific streams - they should always be
2368  // preceded by an EOF frame
2369  }
2370 }
2371 
2372 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2373 {
2374  SchFilterGraph *fg;
2375  SchFilterIn *fi;
2376 
2377  av_assert0(fg_idx < sch->nb_filters);
2378  fg = &sch->filters[fg_idx];
2379 
2380  av_assert0(in_idx < fg->nb_inputs);
2381  fi = &fg->inputs[in_idx];
2382 
2383  if (!fi->receive_finished) {
2384  fi->receive_finished = 1;
2385  tq_receive_finish(fg->queue, in_idx);
2386 
2387  // close the control stream when all actual inputs are done
2388  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2389  tq_receive_finish(fg->queue, fg->nb_inputs);
2390  }
2391 }
2392 
2393 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2394 {
2395  SchFilterGraph *fg;
2396  SchedulerNode dst;
2397 
2398  av_assert0(fg_idx < sch->nb_filters);
2399  fg = &sch->filters[fg_idx];
2400 
2401  av_assert0(out_idx < fg->nb_outputs);
2402  dst = fg->outputs[out_idx].dst;
2403 
2404  return (dst.type == SCH_NODE_TYPE_ENC) ?
2405  send_to_enc (sch, &sch->enc[dst.idx], frame) :
2406  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2407 }
2408 
2409 static int filter_done(Scheduler *sch, unsigned fg_idx)
2410 {
2411  SchFilterGraph *fg = &sch->filters[fg_idx];
2412  int ret = 0;
2413 
2414  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2415  tq_receive_finish(fg->queue, i);
2416 
2417  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2418  SchedulerNode dst = fg->outputs[i].dst;
2419  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2420  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2421  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2422 
2423  if (err < 0 && err != AVERROR_EOF)
2424  ret = err_merge(ret, err);
2425  }
2426 
2428 
2429  fg->task_exited = 1;
2430 
2432 
2434 
2435  return ret;
2436 }
2437 
2438 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2439 {
2440  SchFilterGraph *fg;
2441 
2442  av_assert0(fg_idx < sch->nb_filters);
2443  fg = &sch->filters[fg_idx];
2444 
2445  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2446 }
2447 
2448 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2449 {
2450  switch (node.type) {
2451  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2452  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2453  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2454  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2455  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2456  default: av_assert0(0);
2457  }
2458 }
2459 
2460 static void *task_wrapper(void *arg)
2461 {
2462  SchTask *task = arg;
2463  Scheduler *sch = task->parent;
2464  int ret;
2465  int err = 0;
2466 
2467  ret = task->func(task->func_arg);
2468  if (ret < 0)
2469  av_log(task->func_arg, AV_LOG_ERROR,
2470  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2471 
2472  err = task_cleanup(sch, task->node);
2473  ret = err_merge(ret, err);
2474 
2475  // EOF is considered normal termination
2476  if (ret == AVERROR_EOF)
2477  ret = 0;
2478  if (ret < 0)
2479  atomic_store(&sch->task_failed, 1);
2480 
2482  "Terminating thread with return code %d (%s)\n", ret,
2483  ret < 0 ? av_err2str(ret) : "success");
2484 
2485  return (void*)(intptr_t)ret;
2486 }
2487 
2488 static int task_stop(Scheduler *sch, SchTask *task)
2489 {
2490  int ret;
2491  void *thread_ret;
2492 
2493  if (!task->thread_running)
2494  return task_cleanup(sch, task->node);
2495 
2496  ret = pthread_join(task->thread, &thread_ret);
2497  av_assert0(ret == 0);
2498 
2499  task->thread_running = 0;
2500 
2501  return (intptr_t)thread_ret;
2502 }
2503 
2504 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2505 {
2506  int ret = 0, err;
2507 
2508  if (sch->state != SCH_STATE_STARTED)
2509  return 0;
2510 
2511  atomic_store(&sch->terminate, 1);
2512 
2513  for (unsigned type = 0; type < 2; type++)
2514  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2515  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2516  waiter_set(w, 1);
2517  }
2518 
2519  for (unsigned i = 0; i < sch->nb_demux; i++) {
2520  SchDemux *d = &sch->demux[i];
2521 
2522  err = task_stop(sch, &d->task);
2523  ret = err_merge(ret, err);
2524  }
2525 
2526  for (unsigned i = 0; i < sch->nb_dec; i++) {
2527  SchDec *dec = &sch->dec[i];
2528 
2529  err = task_stop(sch, &dec->task);
2530  ret = err_merge(ret, err);
2531  }
2532 
2533  for (unsigned i = 0; i < sch->nb_filters; i++) {
2534  SchFilterGraph *fg = &sch->filters[i];
2535 
2536  err = task_stop(sch, &fg->task);
2537  ret = err_merge(ret, err);
2538  }
2539 
2540  for (unsigned i = 0; i < sch->nb_enc; i++) {
2541  SchEnc *enc = &sch->enc[i];
2542 
2543  err = task_stop(sch, &enc->task);
2544  ret = err_merge(ret, err);
2545  }
2546 
2547  for (unsigned i = 0; i < sch->nb_mux; i++) {
2548  SchMux *mux = &sch->mux[i];
2549 
2550  err = task_stop(sch, &mux->task);
2551  ret = err_merge(ret, err);
2552  }
2553 
2554  if (finish_ts)
2555  *finish_ts = trailing_dts(sch, 1);
2556 
2557  sch->state = SCH_STATE_STOPPED;
2558 
2559  return ret;
2560 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:292
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:427
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1071
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:345
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2393
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:41
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:82
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:289
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:281
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:263
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2027
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:95
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:1926
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:293
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:227
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:649
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:141
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:676
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:179
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:264
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2071
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:290
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:160
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1309
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:374
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2448
frame_move
static void frame_move(void *dst, void *src)
Definition: ffmpeg_utils.h:51
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:524
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:212
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:154
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1800
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1304
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:278
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2312
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:196
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:542
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:207
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:270
objpool_alloc_packets
ObjPool * objpool_alloc_packets(void)
Definition: objpool.c:124
SchFilterOut
Definition: ffmpeg_sched.c:237
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:264
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:170
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:201
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:250
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:307
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:160
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1157
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1649
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2488
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:234
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:373
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2504
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:113
fail
#define fail()
Definition: checkasm.h:179
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2174
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:238
SchEnc
Definition: ffmpeg_sched.c:103
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:84
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:644
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:208
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:246
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:266
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:608
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:287
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
SchDec::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:78
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:799
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:148
SchFilterGraph
Definition: ffmpeg_sched.c:241
SchMuxStream::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:186
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:461
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:301
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:210
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
Scheduler::mux_done_cond
pthread_cond_t mux_done_cond
Definition: ffmpeg_sched.c:283
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:184
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:620
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:335
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:250
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:88
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1623
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:573
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2144
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:427
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:189
op
static int op(uint8_t **dst, const uint8_t *dst_end, GetByteContext *gb, int pixel, int count, int *x, int width, int linesize)
Perform decode operation.
Definition: anm.c:76
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:107
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:703
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:185
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:384
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:203
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2409
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:242
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2235
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
objpool_alloc_frames
ObjPool * objpool_alloc_frames(void)
Definition: objpool.c:128
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:607
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:104
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1894
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:191
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:620
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:223
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:272
pkt_move
static void pkt_move(void *dst, void *src)
Definition: ffmpeg_utils.h:46
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:507
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:362
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:298
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1668
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:131
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:709
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:245
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1303
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:275
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1236
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:241
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:761
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:99
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:265
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2214
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:255
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:108
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:721
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:328
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:793
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:435
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:484
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2279
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
Scheduler
Definition: ffmpeg_sched.c:269
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:211
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:97
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:188
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:75
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:661
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:138
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:156
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:147
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:247
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:366
AVPacket::size
int size
Definition: packet.h:525
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:100
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:260
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:384
threadmessage.h
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:174
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:252
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:343
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:181
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:699
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:715
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:244
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:305
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:523
ObjPool
Definition: objpool.c:30
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2249
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:614
SchFilterIn
Definition: ffmpeg_sched.c:230
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2328
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:276
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:132
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:568
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:273
Scheduler::task_failed
atomic_int task_failed
Definition: ffmpeg_sched.c:303
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:390
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:146
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:163
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:408
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:295
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:517
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:1992
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:259
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:31
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:279
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:302
SchDec
Definition: ffmpeg_sched.c:74
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:245
QueueType
QueueType
Definition: ffmpeg_sched.c:47
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:606
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:439
Scheduler::mux_done_lock
pthread_mutex_t mux_done_lock
Definition: ffmpeg_sched.c:282
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:249
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:755
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1105
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2127
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:196
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:145
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:897
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1744
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2438
SchDemuxStream
Definition: ffmpeg_sched.c:144
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
SchFilterIn::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:232
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2098
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:71
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:264
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:194
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1764
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:214
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:670
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2460
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:216
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:88
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:1970
SchDemux
Definition: ffmpeg_sched.c:150
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:286
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:131
SchDec::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:79
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:87
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1859
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:77
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:526
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:465
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:224
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:157
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:316
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:96
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:866
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:508
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1390
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1182
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2372
ffmpeg_sched.h
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:106
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1596
AVPacket
This structure stores compressed data.
Definition: packet.h:501
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
src
INIT_CLIP pixel * src
Definition: h264pred_template.c:418
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:94
d
d
Definition: ffmpeg_filter.c:424
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:225
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:91
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:368
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1530
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:474
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2045
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:136
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:675
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:134
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:231
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1141
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1354
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:153
PreMuxQueue
Definition: ffmpeg_sched.c:166
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1305
int
int
Definition: ffmpeg_filter.c:424
SchDec::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:80
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:299
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:233
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:256
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:568
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1211
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:536
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2014
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:841
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:109
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:226
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:296