FFmpeg
thread_queue.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include "libavutil/avassert.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
30 
31 #include "libavcodec/packet.h"
32 
33 #include "thread_queue.h"
34 
35 enum {
36  FINISHED_SEND = (1 << 0),
37  FINISHED_RECV = (1 << 1),
38 };
39 
40 struct ThreadQueue {
41  int *finished;
42  unsigned int nb_streams;
43 
45 
48 
51 };
52 
53 void tq_free(ThreadQueue **ptq)
54 {
55  ThreadQueue *tq = *ptq;
56 
57  if (!tq)
58  return;
59 
62 
63  av_freep(&tq->finished);
64 
67 
68  av_freep(ptq);
69 }
70 
71 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
72  enum ThreadQueueType type)
73 {
74  ThreadQueue *tq;
75  int ret;
76 
77  tq = av_mallocz(sizeof(*tq));
78  if (!tq)
79  return NULL;
80 
81  ret = pthread_cond_init(&tq->cond, NULL);
82  if (ret) {
83  av_freep(&tq);
84  return NULL;
85  }
86 
88  if (ret) {
90  av_freep(&tq);
91  return NULL;
92  }
93 
94  tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
95  if (!tq->finished)
96  goto fail;
97  tq->nb_streams = nb_streams;
98 
99  tq->type = type;
100 
101  tq->fifo = (type == THREAD_QUEUE_FRAMES) ?
103  if (!tq->fifo)
104  goto fail;
105 
106  tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0);
107  if (!tq->fifo_stream_index)
108  goto fail;
109 
110  return tq;
111 fail:
112  tq_free(&tq);
113  return NULL;
114 }
115 
116 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
117 {
118  int *finished;
119  int ret;
120 
121  av_assert0(stream_idx < tq->nb_streams);
122  finished = &tq->finished[stream_idx];
123 
124  pthread_mutex_lock(&tq->lock);
125 
126  if (*finished & FINISHED_SEND) {
127  ret = AVERROR(EINVAL);
128  goto finish;
129  }
130 
131  while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index))
132  pthread_cond_wait(&tq->cond, &tq->lock);
133 
134  if (*finished & FINISHED_RECV) {
135  ret = AVERROR_EOF;
136  *finished |= FINISHED_SEND;
137  } else {
138  ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1);
139  if (ret < 0)
140  goto finish;
141 
143  if (ret < 0)
144  goto finish;
145 
147  }
148 
149 finish:
151 
152  return ret;
153 }
154 
155 static int receive_locked(ThreadQueue *tq, int *stream_idx,
156  void *data)
157 {
158  unsigned int nb_finished = 0;
159 
160  while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
161  unsigned idx;
162  int ret;
163 
164  ret = av_fifo_read(tq->fifo_stream_index, &idx, 1);
165  av_assert0(ret >= 0);
166  if (tq->finished[idx] & FINISHED_RECV) {
167  (tq->type == THREAD_QUEUE_FRAMES) ?
169  continue;
170  }
171 
172  *stream_idx = idx;
173  return 0;
174  }
175 
176  for (unsigned int i = 0; i < tq->nb_streams; i++) {
177  if (!tq->finished[i])
178  continue;
179 
180  /* return EOF to the consumer at most once for each stream */
181  if (!(tq->finished[i] & FINISHED_RECV)) {
182  tq->finished[i] |= FINISHED_RECV;
183  *stream_idx = i;
184  return AVERROR_EOF;
185  }
186 
187  nb_finished++;
188  }
189 
190  return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
191 }
192 
193 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
194 {
195  int ret;
196 
197  *stream_idx = -1;
198 
199  pthread_mutex_lock(&tq->lock);
200 
201  while (1) {
202  size_t can_read = av_container_fifo_can_read(tq->fifo);
203 
204  ret = receive_locked(tq, stream_idx, data);
205 
206  // signal other threads if the fifo state changed
207  if (can_read != av_container_fifo_can_read(tq->fifo))
209 
210  if (ret == AVERROR(EAGAIN)) {
211  pthread_cond_wait(&tq->cond, &tq->lock);
212  continue;
213  }
214 
215  break;
216  }
217 
219 
220  return ret;
221 }
222 
223 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
224 {
225  av_assert0(stream_idx < tq->nb_streams);
226 
227  pthread_mutex_lock(&tq->lock);
228 
229  /* mark the stream as send-finished;
230  * next time the consumer thread tries to read this stream it will get
231  * an EOF and recv-finished flag will be set */
232  tq->finished[stream_idx] |= FINISHED_SEND;
234 
236 }
237 
238 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
239 {
240  av_assert0(stream_idx < tq->nb_streams);
241 
242  pthread_mutex_lock(&tq->lock);
243 
244  /* mark the stream as recv-finished;
245  * next time the producer thread tries to send for this stream, it will
246  * get an EOF and send-finished flag will be set */
247  tq->finished[stream_idx] |= FINISHED_RECV;
249 
251 }
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:430
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
av_container_fifo_write
int av_container_fifo_write(AVContainerFifo *cf, void *obj, unsigned flags)
Write the contents of obj to the FIFO.
Definition: container_fifo.c:162
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_container_fifo_alloc_avframe
AVContainerFifo * av_container_fifo_alloc_avframe(unsigned flags)
Allocate an AVContainerFifo instance for AVFrames.
Definition: container_fifo.c:215
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
FINISHED_RECV
@ FINISHED_RECV
Definition: thread_queue.c:37
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
ThreadQueue::lock
pthread_mutex_t lock
Definition: thread_queue.c:49
container_fifo.h
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
ThreadQueue::cond
pthread_cond_t cond
Definition: thread_queue.c:50
data
const char data[16]
Definition: mxf.c:149
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:71
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
fail
#define fail()
Definition: checkasm.h:193
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
ThreadQueue::type
enum ThreadQueueType type
Definition: thread_queue.c:44
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
ThreadQueue::fifo
AVContainerFifo * fifo
Definition: thread_queue.c:46
avassert.h
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
FINISHED_SEND
@ FINISHED_SEND
Definition: thread_queue.c:36
intreadwrite.h
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
receive_locked
static int receive_locked(ThreadQueue *tq, int *stream_idx, void *data)
Definition: thread_queue.c:155
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
nb_streams
static int nb_streams
Definition: ffprobe.c:374
av_container_fifo_read
int av_container_fifo_read(AVContainerFifo *cf, void *obj, unsigned flags)
Read the next available object from the FIFO into obj.
Definition: container_fifo.c:122
AVContainerFifo
AVContainerFifo is a FIFO for "containers" - dynamically allocated reusable structs (e....
Definition: container_fifo.c:27
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:53
NULL
#define NULL
Definition: coverity.c:32
ThreadQueue::finished
int * finished
Definition: thread_queue.c:41
ThreadQueueType
ThreadQueueType
Definition: thread_queue.h:24
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:238
ThreadQueue::fifo_stream_index
AVFifo * fifo_stream_index
Definition: thread_queue.c:47
error.h
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:116
AVFifo
Definition: fifo.c:35
frame.h
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
packet.h
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:623
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
pthread_cond_t
Definition: os2threads.h:58
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:193
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
ret
ret
Definition: filter_design.txt:187
av_container_fifo_free
void av_container_fifo_free(AVContainerFifo **pcf)
Free a AVContainerFifo and everything in it.
Definition: container_fifo.c:101
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
thread_queue.h
av_container_fifo_can_read
size_t av_container_fifo_can_read(const AVContainerFifo *cf)
Definition: container_fifo.c:185
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
av_container_fifo_alloc_avpacket
AVContainerFifo * av_container_fifo_alloc_avpacket(unsigned flags)
Allocate an AVContainerFifo instance for AVPacket.
Definition: packet.c:782
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:223
ThreadQueue::nb_streams
unsigned int nb_streams
Definition: thread_queue.c:42