FFmpeg
executor.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2023 Nuo Mi
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "config.h"
22 
23 #include "mem.h"
24 #include "thread.h"
25 
26 #include "executor.h"
27 
28 #if !HAVE_THREADS
29 
30 #define ExecutorThread char
31 
32 #define executor_thread_create(t, a, s, ar) 0
33 #define executor_thread_join(t, r) do {} while(0)
34 
35 #else
36 
37 #define ExecutorThread pthread_t
38 
39 #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar)
40 #define executor_thread_join(t, r) pthread_join(t, r)
41 
42 #endif //!HAVE_THREADS
43 
44 typedef struct ThreadInfo {
47 } ThreadInfo;
48 
49 struct AVExecutor {
52 
54  uint8_t *local_contexts;
55 
58  int die;
59 
61 };
62 
63 static AVTask* remove_task(AVTask **prev, AVTask *t)
64 {
65  *prev = t->next;
66  t->next = NULL;
67  return t;
68 }
69 
70 static void add_task(AVTask **prev, AVTask *t)
71 {
72  t->next = *prev;
73  *prev = t;
74 }
75 
76 static int run_one_task(AVExecutor *e, void *lc)
77 {
78  AVTaskCallbacks *cb = &e->cb;
79  AVTask **prev;
80 
81  for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next)
82  /* nothing */;
83  if (*prev) {
84  AVTask *t = remove_task(prev, *prev);
85  ff_mutex_unlock(&e->lock);
86  cb->run(t, lc, cb->user_data);
87  ff_mutex_lock(&e->lock);
88  return 1;
89  }
90  return 0;
91 }
92 
93 #if HAVE_THREADS
94 static void *executor_worker_task(void *data)
95 {
96  ThreadInfo *ti = (ThreadInfo*)data;
97  AVExecutor *e = ti->e;
98  void *lc = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
99 
100  ff_mutex_lock(&e->lock);
101  while (1) {
102  if (e->die) break;
103 
104  if (!run_one_task(e, lc)) {
105  //no task in one loop
106  ff_cond_wait(&e->cond, &e->lock);
107  }
108  }
109  ff_mutex_unlock(&e->lock);
110  return NULL;
111 }
112 #endif
113 
114 static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
115 {
116  if (e->thread_count) {
117  //signal die
118  ff_mutex_lock(&e->lock);
119  e->die = 1;
120  ff_cond_broadcast(&e->cond);
121  ff_mutex_unlock(&e->lock);
122 
123  for (int i = 0; i < e->thread_count; i++)
125  }
126  if (has_cond)
127  ff_cond_destroy(&e->cond);
128  if (has_lock)
129  ff_mutex_destroy(&e->lock);
130 
131  av_free(e->threads);
133 
134  av_free(e);
135 }
136 
137 AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
138 {
139  AVExecutor *e;
140  int has_lock = 0, has_cond = 0;
141  if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
142  return NULL;
143 
144  e = av_mallocz(sizeof(*e));
145  if (!e)
146  return NULL;
147  e->cb = *cb;
148 
149  e->local_contexts = av_calloc(thread_count, e->cb.local_context_size);
150  if (!e->local_contexts)
151  goto free_executor;
152 
153  e->threads = av_calloc(thread_count, sizeof(*e->threads));
154  if (!e->threads)
155  goto free_executor;
156 
157  has_lock = !ff_mutex_init(&e->lock, NULL);
158  has_cond = !ff_cond_init(&e->cond, NULL);
159 
160  if (!has_lock || !has_cond)
161  goto free_executor;
162 
163  for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
164  ThreadInfo *ti = e->threads + e->thread_count;
165  ti->e = e;
166  if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
167  goto free_executor;
168  }
169  return e;
170 
171 free_executor:
172  executor_free(e, has_lock, has_cond);
173  return NULL;
174 }
175 
176 void av_executor_free(AVExecutor **executor)
177 {
178  if (!executor || !*executor)
179  return;
180  executor_free(*executor, 1, 1);
181  *executor = NULL;
182 }
183 
185 {
186  AVTaskCallbacks *cb = &e->cb;
187  AVTask **prev;
188 
189  ff_mutex_lock(&e->lock);
190  if (t) {
191  for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
192  /* nothing */;
193  add_task(prev, t);
194  }
195  ff_cond_signal(&e->cond);
196  ff_mutex_unlock(&e->lock);
197 
198 #if !HAVE_THREADS
199  // We are running in a single-threaded environment, so we must handle all tasks ourselves
200  while (run_one_task(e, e->local_contexts))
201  /* nothing */;
202 #endif
203 }
add_task
static void add_task(AVTask **prev, AVTask *t)
Definition: executor.c:70
ThreadInfo::thread
ExecutorThread thread
Definition: executor.c:46
AVExecutor::threads
ThreadInfo * threads
Definition: executor.c:53
executor_free
static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
Definition: executor.c:114
ff_mutex_init
static int ff_mutex_init(AVMutex *mutex, const void *attr)
Definition: thread.h:187
cb
static double cb(void *priv, double x, double y)
Definition: vf_geq.c:242
thread.h
AVTask::next
AVTask * next
Definition: executor.h:28
data
const char data[16]
Definition: mxf.c:148
ff_cond_broadcast
static int ff_cond_broadcast(AVCond *cond)
Definition: thread.h:197
executor_thread_join
#define executor_thread_join(t, r)
Definition: executor.c:33
ff_mutex_unlock
static int ff_mutex_unlock(AVMutex *mutex)
Definition: thread.h:189
AVTaskCallbacks
Definition: executor.h:31
AVTaskCallbacks::local_context_size
int local_context_size
Definition: executor.h:34
AVExecutor::die
int die
Definition: executor.c:58
av_executor_alloc
AVExecutor * av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
Alloc executor.
Definition: executor.c:137
AVMutex
#define AVMutex
Definition: thread.h:184
ff_cond_wait
static int ff_cond_wait(AVCond *cond, AVMutex *mutex)
Definition: thread.h:198
AVCond
#define AVCond
Definition: thread.h:192
AVTask
Definition: executor.h:27
NULL
#define NULL
Definition: coverity.c:32
AVExecutor
Definition: executor.c:49
AVExecutor::cond
AVCond cond
Definition: executor.c:57
ff_mutex_destroy
static int ff_mutex_destroy(AVMutex *mutex)
Definition: thread.h:190
AVExecutor::thread_count
int thread_count
Definition: executor.c:51
AVExecutor::cb
AVTaskCallbacks cb
Definition: executor.c:50
ThreadInfo::e
AVExecutor * e
Definition: executor.c:45
executor.h
run_one_task
static int run_one_task(AVExecutor *e, void *lc)
Definition: executor.c:76
executor_thread_create
#define executor_thread_create(t, a, s, ar)
Definition: executor.c:32
ThreadInfo
HAVE_THREADS.
Definition: executor.c:44
ff_mutex_lock
static int ff_mutex_lock(AVMutex *mutex)
Definition: thread.h:188
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
AVExecutor::tasks
AVTask * tasks
Definition: executor.c:60
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
remove_task
static AVTask * remove_task(AVTask **prev, AVTask *t)
Definition: executor.c:63
av_executor_free
void av_executor_free(AVExecutor **executor)
Free executor.
Definition: executor.c:176
av_executor_execute
void av_executor_execute(AVExecutor *e, AVTask *t)
Add task to executor.
Definition: executor.c:184
ff_cond_signal
static int ff_cond_signal(AVCond *cond)
Definition: thread.h:196
mem.h
av_free
#define av_free(p)
Definition: tableprint_vlc.h:33
ff_cond_destroy
static int ff_cond_destroy(AVCond *cond)
Definition: thread.h:195
ExecutorThread
#define ExecutorThread
Definition: executor.c:30
ff_cond_init
static int ff_cond_init(AVCond *cond, const void *attr)
Definition: thread.h:194
AVExecutor::lock
AVMutex lock
Definition: executor.c:56
AVExecutor::local_contexts
uint8_t * local_contexts
Definition: executor.c:54