FFmpeg
udp.c
Go to the documentation of this file.
1 /*
2  * UDP prototype streaming system
3  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 /**
23  * @file
24  * UDP protocol
25  */
26 
27 #define _DEFAULT_SOURCE
28 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
29 
30 #include "avformat.h"
31 #include "libavutil/avassert.h"
32 #include "libavutil/mem.h"
33 #include "libavutil/parseutils.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/intreadwrite.h"
36 #include "libavutil/opt.h"
37 #include "libavutil/log.h"
38 #include "libavutil/time.h"
39 #include "network.h"
40 #include "os_support.h"
41 #include "url.h"
42 #include "ip.h"
43 
44 #ifdef __APPLE__
45 #include "TargetConditionals.h"
46 #endif
47 
48 #if HAVE_UDPLITE_H
49 #include "udplite.h"
50 #else
51 /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
52  * So, we provide a fallback here.
53  */
54 #define UDPLITE_SEND_CSCOV 10
55 #define UDPLITE_RECV_CSCOV 11
56 #endif
57 
58 #ifndef IPPROTO_UDPLITE
59 #define IPPROTO_UDPLITE 136
60 #endif
61 
62 #if HAVE_W32THREADS
63 #undef HAVE_PTHREAD_CANCEL
64 #define HAVE_PTHREAD_CANCEL 1
65 #endif
66 
67 #if HAVE_PTHREAD_CANCEL
68 #include "libavutil/thread.h"
69 #endif
70 
71 #ifndef IPV6_ADD_MEMBERSHIP
72 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
73 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
74 #endif
75 
76 #define UDP_TX_BUF_SIZE 32768
77 #define UDP_RX_BUF_SIZE 393216
78 #define UDP_MAX_PKT_SIZE 65536
79 #define UDP_HEADER_SIZE 8
80 
81 typedef struct UDPQueuedPacketHeader {
82  int pkt_size;
84  socklen_t addr_len;
86 
87 typedef struct UDPContext {
88  const AVClass *class;
89  int udp_fd;
90  int ttl;
93  int pkt_size;
102 
103  /* Circular Buffer variables for use in UDP receive code */
108  int64_t bitrate; /* number of bits to send per second */
111 #if HAVE_PTHREAD_CANCEL
112  pthread_t circular_buffer_thread;
115  int thread_started;
116 #endif
119  char *localaddr;
120  int timeout;
122  char *sources;
123  char *block;
127 } UDPContext;
128 
129 #define OFFSET(x) offsetof(UDPContext, x)
130 #define D AV_OPT_FLAG_DECODING_PARAM
131 #define E AV_OPT_FLAG_ENCODING_PARAM
132 static const AVOption options[] = {
133  { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
134  { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
135  { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
136  { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E },
137  { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
138  { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
139  { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
140  { "pkt_size", "Maximum UDP packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 1472 }, -1, INT_MAX, .flags = D|E },
141  { "reuse", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, D|E },
142  { "reuse_socket", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E },
143  { "broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, E },
144  { "ttl", "Time to live (multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, { .i64 = 16 }, 0, 255, E },
145  { "connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, .flags = D|E },
146  { "fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
147  { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D },
148  { "timeout", "set raise error timeout, in microseconds (only in read mode)",OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D },
149  { "sources", "Source list", OFFSET(sources), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
150  { "block", "Block list", OFFSET(block), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
151  { NULL }
152 };
153 
154 static const AVClass udp_class = {
155  .class_name = "udp",
156  .item_name = av_default_item_name,
157  .option = options,
158  .version = LIBAVUTIL_VERSION_INT,
159 };
160 
162  .class_name = "udplite",
163  .item_name = av_default_item_name,
164  .option = options,
165  .version = LIBAVUTIL_VERSION_INT,
166 };
167 
168 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
169  struct sockaddr *addr,
170  void *logctx)
171 {
172  int protocol, cmd;
173 
174  /* There is some confusion in the world whether IP_MULTICAST_TTL
175  * takes a byte or an int as an argument.
176  * BSD seems to indicate byte so we are going with that and use
177  * int and fall back to byte to be safe */
178  switch (addr->sa_family) {
179 #ifdef IP_MULTICAST_TTL
180  case AF_INET:
181  protocol = IPPROTO_IP;
182  cmd = IP_MULTICAST_TTL;
183  break;
184 #endif
185 #ifdef IPV6_MULTICAST_HOPS
186  case AF_INET6:
187  protocol = IPPROTO_IPV6;
188  cmd = IPV6_MULTICAST_HOPS;
189  break;
190 #endif
191  default:
192  return 0;
193  }
194 
195  if (setsockopt(sockfd, protocol, cmd, &mcastTTL, sizeof(mcastTTL)) < 0) {
196  /* BSD compatibility */
197  unsigned char ttl = (unsigned char) mcastTTL;
198 
199  ff_log_net_error(logctx, AV_LOG_DEBUG, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
200  if (setsockopt(sockfd, protocol, cmd, &ttl, sizeof(ttl)) < 0) {
201  ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
202  return ff_neterrno();
203  }
204  }
205 
206  return 0;
207 }
208 
209 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,
210  struct sockaddr *local_addr, void *logctx)
211 {
212 #ifdef IP_ADD_MEMBERSHIP
213  if (addr->sa_family == AF_INET) {
214  struct ip_mreq mreq;
215 
216  mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
217  if (local_addr)
218  mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
219  else
220  mreq.imr_interface.s_addr = INADDR_ANY;
221  if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
222  ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
223  return ff_neterrno();
224  }
225  }
226 #endif
227 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
228  if (addr->sa_family == AF_INET6) {
229  struct ipv6_mreq mreq6;
230 
231  memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
232  //TODO: Interface index should be looked up from local_addr
233  mreq6.ipv6mr_interface = 0;
234  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
235  ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
236  return ff_neterrno();
237  }
238  }
239 #endif
240  return 0;
241 }
242 
243 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,
244  struct sockaddr *local_addr, void *logctx)
245 {
246 #ifdef IP_DROP_MEMBERSHIP
247  if (addr->sa_family == AF_INET) {
248  struct ip_mreq mreq;
249 
250  mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
251  if (local_addr)
252  mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
253  else
254  mreq.imr_interface.s_addr = INADDR_ANY;
255  if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
256  ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
257  return -1;
258  }
259  }
260 #endif
261 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
262  if (addr->sa_family == AF_INET6) {
263  struct ipv6_mreq mreq6;
264 
265  memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
266  //TODO: Interface index should be looked up from local_addr
267  mreq6.ipv6mr_interface = 0;
268  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
269  ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
270  return -1;
271  }
272  }
273 #endif
274  return 0;
275 }
276 
278  int sockfd, struct sockaddr *addr,
279  int addr_len, struct sockaddr_storage *local_addr,
280  struct sockaddr_storage *sources,
281  int nb_sources, int include)
282 {
283  int i;
284  if (addr->sa_family != AF_INET) {
285 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
286  /* For IPv4 prefer the old approach, as that alone works reliably on
287  * Windows and it also supports supplying the interface based on its
288  * address. */
289  int i;
290  for (i = 0; i < nb_sources; i++) {
291  struct group_source_req mreqs;
292  int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
293 
294  //TODO: Interface index should be looked up from local_addr
295  mreqs.gsr_interface = 0;
296  memcpy(&mreqs.gsr_group, addr, addr_len);
297  memcpy(&mreqs.gsr_source, &sources[i], sizeof(*sources));
298 
299  if (setsockopt(sockfd, level,
300  include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
301  (const void *)&mreqs, sizeof(mreqs)) < 0) {
302  if (include)
303  ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
304  else
305  ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
306  return ff_neterrno();
307  }
308  }
309  return 0;
310 #else
312  "Setting multicast sources only supported for IPv4\n");
313  return AVERROR(EINVAL);
314 #endif
315  }
316 #if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
317  for (i = 0; i < nb_sources; i++) {
318  struct ip_mreq_source mreqs;
319  if (sources[i].ss_family != AF_INET) {
320  av_log(h, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
321  return AVERROR(EINVAL);
322  }
323 
324  mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
325  if (local_addr)
326  mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
327  else
328  mreqs.imr_interface.s_addr = INADDR_ANY;
329  mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr;
330 
331  if (setsockopt(sockfd, IPPROTO_IP,
332  include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
333  (const void *)&mreqs, sizeof(mreqs)) < 0) {
334  if (include)
335  ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
336  else
337  ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
338  return ff_neterrno();
339  }
340  }
341 #else
342  return AVERROR(ENOSYS);
343 #endif
344  return 0;
345 }
347  struct sockaddr_storage *addr,
348  const char *hostname, int port)
349 {
350  struct addrinfo *res0;
351  int addr_len;
352 
353  res0 = ff_ip_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
354  if (!res0) return AVERROR(EIO);
355  memcpy(addr, res0->ai_addr, res0->ai_addrlen);
356  addr_len = res0->ai_addrlen;
357  freeaddrinfo(res0);
358 
359  return addr_len;
360 }
361 
362 static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr,
363  socklen_t *addr_len, const char *localaddr)
364 {
365  UDPContext *s = h->priv_data;
366  int udp_fd = -1;
367  struct addrinfo *res0, *res;
368  int family = AF_UNSPEC;
369 
370  if (((struct sockaddr *) &s->dest_addr)->sa_family)
371  family = ((struct sockaddr *) &s->dest_addr)->sa_family;
372  res0 = ff_ip_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
373  s->local_port,
374  SOCK_DGRAM, family, AI_PASSIVE);
375  if (!res0)
376  goto fail;
377  for (res = res0; res; res=res->ai_next) {
378  if (s->udplite_coverage)
379  udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE, h);
380  else
381  udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0, h);
382  if (udp_fd != -1) break;
383  ff_log_net_error(h, AV_LOG_ERROR, "socket");
384  }
385 
386  if (udp_fd < 0)
387  goto fail;
388 
389  memcpy(addr, res->ai_addr, res->ai_addrlen);
390  *addr_len = res->ai_addrlen;
391 
392  freeaddrinfo(res0);
393 
394  return udp_fd;
395 
396  fail:
397  if (udp_fd >= 0)
398  closesocket(udp_fd);
399  if(res0)
400  freeaddrinfo(res0);
401  return -1;
402 }
403 
404 static int udp_port(struct sockaddr_storage *addr, int addr_len)
405 {
406  char sbuf[sizeof(int)*3+1];
407  int error;
408 
409  if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
410  av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
411  return -1;
412  }
413 
414  return strtol(sbuf, NULL, 10);
415 }
416 
417 
418 /**
419  * If no filename is given to av_open_input_file because you want to
420  * get the local port first, then you must call this function to set
421  * the remote server address.
422  *
423  * url syntax: udp://host:port[?option=val...]
424  * option: 'ttl=n' : set the ttl value (for multicast only)
425  * 'localport=n' : set the local port
426  * 'pkt_size=n' : set max packet size
427  * 'reuse=1' : enable reusing the socket
428  * 'overrun_nonfatal=1': survive in case of circular buffer overrun
429  *
430  * @param h media file context
431  * @param uri of the remote server
432  * @return zero if no error.
433  */
434 int ff_udp_set_remote_url(URLContext *h, const char *uri)
435 {
436  UDPContext *s = h->priv_data;
437  char hostname[256], buf[10];
438  int port;
439  const char *p;
440 
441  av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
442 
443  /* set the destination address */
444  s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port);
445  if (s->dest_addr_len < 0) {
446  return AVERROR(EIO);
447  }
448  s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
449  p = strchr(uri, '?');
450  if (p) {
451  if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
452  int was_connected = s->is_connected;
453  s->is_connected = strtol(buf, NULL, 10);
454  if (s->is_connected && !was_connected) {
455  if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
456  s->dest_addr_len)) {
457  s->is_connected = 0;
458  ff_log_net_error(h, AV_LOG_ERROR, "connect");
459  return AVERROR(EIO);
460  }
461  }
462  }
463  }
464 
465  return 0;
466 }
467 
468 /**
469  * This function is identical to ff_udp_set_remote_url, except that it takes a sockaddr directly.
470  */
471 int ff_udp_set_remote_addr(URLContext *h, const struct sockaddr *dest_addr, socklen_t dest_addr_len, int do_connect)
472 {
473  UDPContext *s = h->priv_data;
474 
475  /* set the destination address */
476  if (dest_addr_len < 0 || dest_addr_len > sizeof(s->dest_addr))
477  return AVERROR(EIO);
478  s->dest_addr_len = dest_addr_len;
479  memcpy(&s->dest_addr, dest_addr, dest_addr_len);
480 
481  s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
482  if (do_connect >= 0) {
483  int was_connected = s->is_connected;
484  s->is_connected = do_connect;
485  if (s->is_connected && !was_connected) {
486  if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
487  s->dest_addr_len)) {
488  s->is_connected = 0;
489  ff_log_net_error(h, AV_LOG_ERROR, "connect");
490  return AVERROR(EIO);
491  }
492  }
493  }
494 
495  return 0;
496 }
497 
498 /**
499  * Return the local port used by the UDP connection
500  * @param h media file context
501  * @return the local port number
502  */
504 {
505  UDPContext *s = h->priv_data;
506  return s->local_port;
507 }
508 
509 void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len)
510 {
511  UDPContext *s = h->priv_data;
512  *addr = s->last_recv_addr;
513  *addr_len = s->last_recv_addr_len;
514 }
515 
516 /**
517  * Return the udp file handle for select() usage to wait for several RTP
518  * streams at the same time.
519  * @param h media file context
520  */
522 {
523  UDPContext *s = h->priv_data;
524  return s->udp_fd;
525 }
526 
527 #if HAVE_PTHREAD_CANCEL
528 static void *circular_buffer_task_rx( void *_URLContext)
529 {
530  URLContext *h = _URLContext;
531  UDPContext *s = h->priv_data;
532  int old_cancelstate;
533 
534  ff_thread_setname("udp-rx");
535 
537  pthread_mutex_lock(&s->mutex);
538  if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
539  av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
540  s->circular_buffer_error = AVERROR(EIO);
541  goto end;
542  }
543  while(1) {
544  UDPQueuedPacketHeader pkt_header;
545 
546  pthread_mutex_unlock(&s->mutex);
547  /* Blocking operations are always cancellation points;
548  see "General Information" / "Thread Cancelation Overview"
549  in Single Unix. */
551  pkt_header.pkt_size = recvfrom(s->udp_fd, s->tmp + sizeof(pkt_header), sizeof(s->tmp) - sizeof(pkt_header), 0, (struct sockaddr *)&pkt_header.addr, &pkt_header.addr_len);
553  pthread_mutex_lock(&s->mutex);
554  if (pkt_header.pkt_size < 0) {
555  if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
556  s->circular_buffer_error = ff_neterrno();
557  goto end;
558  }
559  continue;
560  }
561  if (ff_ip_check_source_lists(&pkt_header.addr, &s->filters))
562  continue;
563  memcpy(s->tmp, &pkt_header, sizeof(pkt_header));
564 
565  if (av_fifo_can_write(s->rx_fifo) < pkt_header.pkt_size + sizeof(pkt_header)) {
566  /* No Space left */
567  if (s->overrun_nonfatal) {
568  av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
569  "Surviving due to overrun_nonfatal option\n");
570  continue;
571  } else {
572  av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
573  "To avoid, increase fifo_size URL option. "
574  "To survive in such case, use overrun_nonfatal option\n");
575  s->circular_buffer_error = AVERROR(EIO);
576  goto end;
577  }
578  }
579  av_fifo_write(s->rx_fifo, s->tmp, pkt_header.pkt_size + sizeof(pkt_header));
580  pthread_cond_signal(&s->cond);
581  }
582 
583 end:
584  pthread_cond_signal(&s->cond);
585  pthread_mutex_unlock(&s->mutex);
586  return NULL;
587 }
588 
589 static void *circular_buffer_task_tx( void *_URLContext)
590 {
591  URLContext *h = _URLContext;
592  UDPContext *s = h->priv_data;
593  int64_t target_timestamp = av_gettime_relative();
594  int64_t start_timestamp = av_gettime_relative();
595  int64_t sent_bits = 0;
596  int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
597  int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
598 
599  ff_thread_setname("udp-tx");
600 
601  pthread_mutex_lock(&s->mutex);
602 
603  if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
604  av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
605  s->circular_buffer_error = AVERROR(EIO);
606  goto end;
607  }
608 
609  for(;;) {
610  int len;
611  const uint8_t *p;
612  uint8_t tmp[4];
613  int64_t timestamp;
614 
615  len = av_fifo_can_read(s->tx_fifo);
616 
617  while (len<4) {
618  if (s->close_req)
619  goto end;
620  pthread_cond_wait(&s->cond, &s->mutex);
621  len = av_fifo_can_read(s->tx_fifo);
622  }
623 
624  av_fifo_read(s->tx_fifo, tmp, 4);
625  len = AV_RL32(tmp);
626 
627  av_assert0(len >= 0);
628  av_assert0(len <= sizeof(s->tmp));
629 
630  av_fifo_read(s->tx_fifo, s->tmp, len);
631 
632  pthread_mutex_unlock(&s->mutex);
633 
634  if (s->bitrate) {
635  timestamp = av_gettime_relative();
636  if (timestamp < target_timestamp) {
637  int64_t delay = target_timestamp - timestamp;
638  if (delay > max_delay) {
639  delay = max_delay;
640  start_timestamp = timestamp + delay;
641  sent_bits = 0;
642  }
643  av_usleep(delay);
644  } else {
645  if (timestamp - burst_interval > target_timestamp) {
646  start_timestamp = timestamp - burst_interval;
647  sent_bits = 0;
648  }
649  }
650  sent_bits += len * 8;
651  target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
652  }
653 
654  p = s->tmp;
655  while (len) {
656  int ret;
657  av_assert0(len > 0);
658  if (!s->is_connected) {
659  ret = sendto (s->udp_fd, p, len, 0,
660  (struct sockaddr *) &s->dest_addr,
661  s->dest_addr_len);
662  } else
663  ret = send(s->udp_fd, p, len, 0);
664  if (ret >= 0) {
665  len -= ret;
666  p += ret;
667  } else {
668  ret = ff_neterrno();
669  if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
670  pthread_mutex_lock(&s->mutex);
671  s->circular_buffer_error = ret;
672  pthread_mutex_unlock(&s->mutex);
673  return NULL;
674  }
675  }
676  }
677 
678  pthread_mutex_lock(&s->mutex);
679  }
680 
681 end:
682  pthread_mutex_unlock(&s->mutex);
683  return NULL;
684 }
685 
686 
687 #endif
688 
689 /* put it in UDP context */
690 /* return non zero if error */
691 static int udp_open(URLContext *h, const char *uri, int flags)
692 {
693  char hostname[1024];
694  int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
695  UDPContext *s = h->priv_data;
696  int is_output;
697  const char *p;
698  char buf[256];
699  struct sockaddr_storage my_addr;
700  socklen_t len;
701  int ret;
702 
703  h->is_streamed = 1;
704 
705  is_output = !(flags & AVIO_FLAG_READ);
706  if (s->buffer_size < 0)
707  s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE;
708 
709  if (s->sources) {
710  if ((ret = ff_ip_parse_sources(h, s->sources, &s->filters)) < 0)
711  goto fail;
712  }
713 
714  if (s->block) {
715  if ((ret = ff_ip_parse_blocks(h, s->block, &s->filters)) < 0)
716  goto fail;
717  }
718 
719  p = strchr(uri, '?');
720  if (p) {
721  if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
722  char *endptr = NULL;
723  s->reuse_socket = strtol(buf, &endptr, 10);
724  /* assume if no digits were found it is a request to enable it */
725  if (buf == endptr)
726  s->reuse_socket = 1;
727  }
728  if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
729  char *endptr = NULL;
730  s->overrun_nonfatal = strtol(buf, &endptr, 10);
731  /* assume if no digits were found it is a request to enable it */
732  if (buf == endptr)
733  s->overrun_nonfatal = 1;
734  if (!HAVE_PTHREAD_CANCEL)
736  "'overrun_nonfatal' option was set but it is not supported "
737  "on this build (pthread support is required)\n");
738  }
739  if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
740  s->ttl = strtol(buf, NULL, 10);
741  if (s->ttl < 0 || s->ttl > 255) {
742  av_log(h, AV_LOG_ERROR, "ttl(%d) should be in range [0,255]\n", s->ttl);
743  ret = AVERROR(EINVAL);
744  goto fail;
745  }
746  }
747  if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
748  s->udplite_coverage = strtol(buf, NULL, 10);
749  }
750  if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
751  s->local_port = strtol(buf, NULL, 10);
752  }
753  if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
754  s->pkt_size = strtol(buf, NULL, 10);
755  }
756  if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
757  s->buffer_size = strtol(buf, NULL, 10);
758  }
759  if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
760  s->is_connected = strtol(buf, NULL, 10);
761  }
762  if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
763  dscp = strtol(buf, NULL, 10);
764  }
765  if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
766  s->circular_buffer_size = strtol(buf, NULL, 10);
767  if (!HAVE_PTHREAD_CANCEL)
769  "'circular_buffer_size' option was set but it is not supported "
770  "on this build (pthread support is required)\n");
771  }
772  if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
773  s->bitrate = strtoll(buf, NULL, 10);
774  if (!HAVE_PTHREAD_CANCEL)
776  "'bitrate' option was set but it is not supported "
777  "on this build (pthread support is required)\n");
778  }
779  if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
780  s->burst_bits = strtoll(buf, NULL, 10);
781  }
782  if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
783  av_freep(&s->localaddr);
784  s->localaddr = av_strdup(buf);
785  if (!s->localaddr) {
786  ret = AVERROR(ENOMEM);
787  goto fail;
788  }
789  }
790  if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
791  if ((ret = ff_ip_parse_sources(h, buf, &s->filters)) < 0)
792  goto fail;
793  }
794  if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
795  if ((ret = ff_ip_parse_blocks(h, buf, &s->filters)) < 0)
796  goto fail;
797  }
798  if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
799  s->timeout = strtol(buf, NULL, 10);
800  if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
801  s->is_broadcast = strtol(buf, NULL, 10);
802  }
803  /* handling needed to support options picking from both AVOption and URL */
804  s->circular_buffer_size *= 188;
805  if (flags & AVIO_FLAG_WRITE) {
806  h->max_packet_size = s->pkt_size;
807  } else {
808  h->max_packet_size = UDP_MAX_PKT_SIZE;
809  }
810  h->rw_timeout = s->timeout;
811 
812  /* fill the dest addr */
813  av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
814 
815  /* XXX: fix av_url_split */
816  if (hostname[0] == '\0' || hostname[0] == '?') {
817  /* only accepts null hostname if input */
818  if (!(flags & AVIO_FLAG_READ)) {
819  ret = AVERROR(EINVAL);
820  goto fail;
821  }
822  } else {
823  if ((ret = ff_udp_set_remote_url(h, uri)) < 0)
824  goto fail;
825  }
826 
827  if ((s->is_multicast || s->local_port < 0) && (h->flags & AVIO_FLAG_READ))
828  s->local_port = port;
829 
830  udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr);
831  if (udp_fd < 0) {
832  ret = AVERROR(EIO);
833  goto fail;
834  }
835 
836  s->local_addr_storage=my_addr; //store for future multicast join
837 
838  /* Follow the requested reuse option, unless it's multicast in which
839  * case enable reuse unless explicitly disabled.
840  */
841  if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
842  s->reuse_socket = 1;
843  if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) {
844  ret = ff_neterrno();
845  goto fail;
846  }
847  }
848 
849  if (s->is_broadcast) {
850 #ifdef SO_BROADCAST
851  if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) {
852  ret = ff_neterrno();
853  goto fail;
854  }
855 #else
856  ret = AVERROR(ENOSYS);
857  goto fail;
858 #endif
859  }
860 
861  /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
862  * The receiver coverage has to be less than or equal to the sender coverage.
863  * Otherwise, the receiver will drop all packets.
864  */
865  if (s->udplite_coverage) {
866  if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
867  av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
868 
869  if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
870  av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
871  }
872 
873  if (dscp >= 0) {
874  dscp <<= 2;
875  if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) {
876  ret = ff_neterrno();
877  goto fail;
878  }
879  }
880 
881  /* If multicast, try binding the multicast address first, to avoid
882  * receiving UDP packets from other sources aimed at the same UDP
883  * port. This fails on windows. This makes sending to the same address
884  * using sendto() fail, so only do it if we're opened in read-only mode. */
885  if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) {
886  bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
887  }
888  /* bind to the local address if not multicast or if the multicast
889  * bind failed */
890  /* the bind is needed to give a port to the socket now */
891  if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
892  ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
893  ret = ff_neterrno();
894  goto fail;
895  }
896 
897  len = sizeof(my_addr);
898  getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
899  s->local_port = udp_port(&my_addr, len);
900 
901  if (s->is_multicast) {
902  if (h->flags & AVIO_FLAG_WRITE) {
903  /* output */
904  if ((ret = udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr, h)) < 0)
905  goto fail;
906  }
907  if (h->flags & AVIO_FLAG_READ) {
908  /* input */
909  if (s->filters.nb_include_addrs) {
910  if ((ret = udp_set_multicast_sources(h, udp_fd,
911  (struct sockaddr *)&s->dest_addr,
912  s->dest_addr_len, &s->local_addr_storage,
913  s->filters.include_addrs,
914  s->filters.nb_include_addrs, 1)) < 0)
915  goto fail;
916  } else {
917  if ((ret = udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,
918  (struct sockaddr *)&s->local_addr_storage, h)) < 0)
919  goto fail;
920  }
921  if (s->filters.nb_exclude_addrs) {
922  if ((ret = udp_set_multicast_sources(h, udp_fd,
923  (struct sockaddr *)&s->dest_addr,
924  s->dest_addr_len, &s->local_addr_storage,
925  s->filters.exclude_addrs,
926  s->filters.nb_exclude_addrs, 0)) < 0)
927  goto fail;
928  }
929  }
930  }
931 
932  if (is_output) {
933  /* limit the tx buf size to limit latency */
934  tmp = s->buffer_size;
935  if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
936  ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
937  ret = ff_neterrno();
938  goto fail;
939  }
940  } else {
941  /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */
942  tmp = s->buffer_size;
943  if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
944  ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
945  }
946  len = sizeof(tmp);
947  if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
948  ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
949  } else {
950  av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
951  if(tmp < s->buffer_size)
952  av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s->buffer_size, tmp);
953  }
954 
955  /* make the socket non-blocking */
956  ff_socket_nonblock(udp_fd, 1);
957  }
958  if (s->is_connected) {
959  if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
960  ff_log_net_error(h, AV_LOG_ERROR, "connect");
961  ret = ff_neterrno();
962  goto fail;
963  }
964  }
965 
966  s->udp_fd = udp_fd;
967 
968 #if HAVE_PTHREAD_CANCEL
969  /*
970  Create thread in case of:
971  1. Input and circular_buffer_size is set
972  2. Output and bitrate and circular_buffer_size is set
973  */
974 
975  if (is_output && s->bitrate && !s->circular_buffer_size) {
976  /* Warn user in case of 'circular_buffer_size' is not set */
977  av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
978  }
979 
980  if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
981  /* start the task going */
982  AVFifo *fifo = av_fifo_alloc2(s->circular_buffer_size, 1, 0);
983  if (!fifo) {
984  ret = AVERROR(ENOMEM);
985  goto fail;
986  }
987  if (is_output)
988  s->tx_fifo = fifo;
989  else
990  s->rx_fifo = fifo;
991  ret = pthread_mutex_init(&s->mutex, NULL);
992  if (ret != 0) {
993  av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
994  ret = AVERROR(ret);
995  goto fail;
996  }
997  ret = pthread_cond_init(&s->cond, NULL);
998  if (ret != 0) {
999  av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
1000  ret = AVERROR(ret);
1001  goto cond_fail;
1002  }
1003  ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
1004  if (ret != 0) {
1005  av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
1006  ret = AVERROR(ret);
1007  goto thread_fail;
1008  }
1009  s->thread_started = 1;
1010  }
1011 #endif
1012 
1013  return 0;
1014 #if HAVE_PTHREAD_CANCEL
1015  thread_fail:
1016  pthread_cond_destroy(&s->cond);
1017  cond_fail:
1018  pthread_mutex_destroy(&s->mutex);
1019 #endif
1020  fail:
1021  if (udp_fd >= 0)
1022  closesocket(udp_fd);
1023  av_fifo_freep2(&s->rx_fifo);
1024  av_fifo_freep2(&s->tx_fifo);
1025  ff_ip_reset_filters(&s->filters);
1026  return ret;
1027 }
1028 
1029 static int udplite_open(URLContext *h, const char *uri, int flags)
1030 {
1031  UDPContext *s = h->priv_data;
1032 
1033  // set default checksum coverage
1034  s->udplite_coverage = UDP_HEADER_SIZE;
1035 
1036  return udp_open(h, uri, flags);
1037 }
1038 
1039 static int udp_read(URLContext *h, uint8_t *buf, int size)
1040 {
1041  UDPContext *s = h->priv_data;
1042  int ret;
1043 #if HAVE_PTHREAD_CANCEL
1044  int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
1045 
1046  if (s->rx_fifo) {
1047  pthread_mutex_lock(&s->mutex);
1048  do {
1049  avail = av_fifo_can_read(s->rx_fifo);
1050  if (avail) { // >=size) {
1052 
1053  av_fifo_read(s->rx_fifo, &header, sizeof(header));
1054 
1055  s->last_recv_addr = header.addr;
1056  s->last_recv_addr_len = header.addr_len;
1057 
1058  avail = header.pkt_size;
1059  if(avail > size){
1060  av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
1061  avail = size;
1062  }
1063 
1064  av_fifo_read(s->rx_fifo, buf, avail);
1065  av_fifo_drain2(s->rx_fifo, header.pkt_size - avail);
1066  pthread_mutex_unlock(&s->mutex);
1067  return avail;
1068  } else if(s->circular_buffer_error){
1069  int err = s->circular_buffer_error;
1070  pthread_mutex_unlock(&s->mutex);
1071  return err;
1072  } else if(nonblock) {
1073  pthread_mutex_unlock(&s->mutex);
1074  return AVERROR(EAGAIN);
1075  } else {
1076  /* FIXME: using the monotonic clock would be better,
1077  but it does not exist on all supported platforms. */
1078  int64_t t = av_gettime() + 100000;
1079  struct timespec tv = { .tv_sec = t / 1000000,
1080  .tv_nsec = (t % 1000000) * 1000 };
1081  int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
1082  if (err) {
1083  pthread_mutex_unlock(&s->mutex);
1084  return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
1085  }
1086  nonblock = 1;
1087  }
1088  } while(1);
1089  }
1090 #endif
1091 
1092  if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1093  ret = ff_network_wait_fd(s->udp_fd, 0);
1094  if (ret < 0)
1095  return ret;
1096  }
1097  s->last_recv_addr_len = sizeof(s->last_recv_addr);
1098  ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&s->last_recv_addr, &s->last_recv_addr_len);
1099  if (ret < 0)
1100  return ff_neterrno();
1101  if (ff_ip_check_source_lists(&s->last_recv_addr, &s->filters))
1102  return AVERROR(EINTR);
1103  return ret;
1104 }
1105 
1106 static int udp_write(URLContext *h, const uint8_t *buf, int size)
1107 {
1108  UDPContext *s = h->priv_data;
1109  int ret;
1110 
1111 #if HAVE_PTHREAD_CANCEL
1112  if (s->tx_fifo) {
1113  uint8_t tmp[4];
1114 
1115  pthread_mutex_lock(&s->mutex);
1116 
1117  /*
1118  Return error if last tx failed.
1119  Here we can't know on which packet error was, but it needs to know that error exists.
1120  */
1121  if (s->circular_buffer_error<0) {
1122  int err = s->circular_buffer_error;
1123  pthread_mutex_unlock(&s->mutex);
1124  return err;
1125  }
1126 
1127  if (av_fifo_can_write(s->tx_fifo) < size + 4) {
1128  /* What about a partial packet tx ? */
1129  pthread_mutex_unlock(&s->mutex);
1130  return AVERROR(ENOMEM);
1131  }
1132  AV_WL32(tmp, size);
1133  av_fifo_write(s->tx_fifo, tmp, 4); /* size of packet */
1134  av_fifo_write(s->tx_fifo, buf, size); /* the data */
1135  pthread_cond_signal(&s->cond);
1136  pthread_mutex_unlock(&s->mutex);
1137  return size;
1138  }
1139 #endif
1140  if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1141  ret = ff_network_wait_fd(s->udp_fd, 1);
1142  if (ret < 0)
1143  return ret;
1144  }
1145 
1146  if (!s->is_connected) {
1147  ret = sendto (s->udp_fd, buf, size, 0,
1148  (struct sockaddr *) &s->dest_addr,
1149  s->dest_addr_len);
1150  } else
1151  ret = send(s->udp_fd, buf, size, 0);
1152 
1153  return ret < 0 ? ff_neterrno() : ret;
1154 }
1155 
1156 static int udp_close(URLContext *h)
1157 {
1158  UDPContext *s = h->priv_data;
1159 
1160 #if HAVE_PTHREAD_CANCEL
1161  // Request close once writing is finished
1162  if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
1163  pthread_mutex_lock(&s->mutex);
1164  s->close_req = 1;
1165  pthread_cond_signal(&s->cond);
1166  pthread_mutex_unlock(&s->mutex);
1167  }
1168 #endif
1169 
1170  if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
1171  udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,
1172  (struct sockaddr *)&s->local_addr_storage, h);
1173 #if HAVE_PTHREAD_CANCEL
1174  if (s->thread_started) {
1175  int ret;
1176  // Cancel only read, as write has been signaled as success to the user
1177  if (h->flags & AVIO_FLAG_READ) {
1178 #ifdef _WIN32
1179  /* recvfrom() is not a cancellation point for win32, so we shutdown
1180  * the socket and abort pending IO, subsequent recvfrom() calls
1181  * will fail with WSAESHUTDOWN causing the thread to exit. */
1182  shutdown(s->udp_fd, SD_RECEIVE);
1183  CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL);
1184 #else
1185  pthread_cancel(s->circular_buffer_thread);
1186 #endif
1187  }
1188  ret = pthread_join(s->circular_buffer_thread, NULL);
1189  if (ret != 0)
1190  av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
1191  pthread_mutex_destroy(&s->mutex);
1192  pthread_cond_destroy(&s->cond);
1193  }
1194 #endif
1195  closesocket(s->udp_fd);
1196  av_fifo_freep2(&s->rx_fifo);
1197  av_fifo_freep2(&s->tx_fifo);
1198  ff_ip_reset_filters(&s->filters);
1199  return 0;
1200 }
1201 
1203  .name = "udp",
1204  .url_open = udp_open,
1205  .url_read = udp_read,
1206  .url_write = udp_write,
1207  .url_close = udp_close,
1208  .url_get_file_handle = udp_get_file_handle,
1209  .priv_data_size = sizeof(UDPContext),
1210  .priv_data_class = &udp_class,
1212 };
1213 
1215  .name = "udplite",
1216  .url_open = udplite_open,
1217  .url_read = udp_read,
1218  .url_write = udp_write,
1219  .url_close = udp_close,
1220  .url_get_file_handle = udp_get_file_handle,
1221  .priv_data_size = sizeof(UDPContext),
1222  .priv_data_class = &udplite_context_class,
1224 };
error
static void error(const char *err)
Definition: target_bsf_fuzzer.c:32
flags
const SwsFlags flags[]
Definition: swscale.c:61
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
PTHREAD_CANCEL_ENABLE
#define PTHREAD_CANCEL_ENABLE
Definition: w32pthreads.h:65
av_fifo_drain2
void av_fifo_drain2(AVFifo *f, size_t size)
Discard the specified amount of data from an AVFifo.
Definition: fifo.c:266
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
ff_udp_get_local_port
int ff_udp_get_local_port(URLContext *h)
Return the local port used by the UDP connection.
Definition: udp.c:503
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:216
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
level
uint8_t level
Definition: svq3.c:208
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
av_find_info_tag
int av_find_info_tag(char *arg, int arg_size, const char *tag1, const char *info)
Attempt to find a specific tag in a URL.
Definition: parseutils.c:756
UDPContext::pkt_size
int pkt_size
Definition: udp.c:93
AV_WL32
#define AV_WL32(p, v)
Definition: intreadwrite.h:422
UDPContext::is_connected
int is_connected
Definition: udp.c:101
PTHREAD_CANCEL_DISABLE
#define PTHREAD_CANCEL_DISABLE
Definition: w32pthreads.h:66
URL_PROTOCOL_FLAG_NETWORK
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:33
ff_ip_parse_sources
int ff_ip_parse_sources(void *log_ctx, const char *buf, IPSourceFilters *filters)
Parses the address[,address] source list in buf and adds it to the filters in the IPSourceFilters str...
Definition: ip.c:145
udp_socket_create
static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len, const char *localaddr)
Definition: udp.c:362
thread.h
E
#define E
Definition: udp.c:131
UDPContext::tmp
uint8_t tmp[UDP_MAX_PKT_SIZE+sizeof(UDPQueuedPacketHeader)]
Definition: udp.c:117
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
IPPROTO_UDPLITE
#define IPPROTO_UDPLITE
Definition: udp.c:59
int64_t
long long int64_t
Definition: coverity.c:34
udp_set_multicast_ttl
static int udp_set_multicast_ttl(int sockfd, int mcastTTL, struct sockaddr *addr, void *logctx)
Definition: udp.c:168
NI_NUMERICSERV
#define NI_NUMERICSERV
Definition: network.h:203
UDPQueuedPacketHeader
Definition: udp.c:81
OFFSET
#define OFFSET(x)
Definition: udp.c:129
sources
Note except for filters that can have queued frames and sources
Definition: filter_design.txt:286
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
AVOption
AVOption.
Definition: opt.h:429
ff_log_net_error
void ff_log_net_error(void *ctx, int level, const char *prefix)
Definition: network.c:579
udp_port
static int udp_port(struct sockaddr_storage *addr, int addr_len)
Definition: udp.c:404
ff_udp_set_remote_url
int ff_udp_set_remote_url(URLContext *h, const char *uri)
If no filename is given to av_open_input_file because you want to get the local port first,...
Definition: udp.c:434
UDPLITE_RECV_CSCOV
#define UDPLITE_RECV_CSCOV
Definition: udp.c:55
udplite_context_class
static const AVClass udplite_context_class
Definition: udp.c:161
URLProtocol
Definition: url.h:51
os_support.h
UDPContext::bitrate
int64_t bitrate
Definition: udp.c:108
UDPContext::local_addr_storage
struct sockaddr_storage local_addr_storage
Definition: udp.c:121
UDPContext::is_broadcast
int is_broadcast
Definition: udp.c:95
ff_udp_set_remote_addr
int ff_udp_set_remote_addr(URLContext *h, const struct sockaddr *dest_addr, socklen_t dest_addr_len, int do_connect)
This function is identical to ff_udp_set_remote_url, except that it takes a sockaddr directly.
Definition: udp.c:471
sockaddr_storage
Definition: network.h:111
UDPContext::is_multicast
int is_multicast
Definition: udp.c:94
freeaddrinfo
#define freeaddrinfo
Definition: network.h:218
fifo.h
ff_ip_reset_filters
void ff_ip_reset_filters(IPSourceFilters *filters)
Resets the IP filter list and frees the internal fields of an IPSourceFilters structure.
Definition: ip.c:155
ff_udp_protocol
const URLProtocol ff_udp_protocol
Definition: udp.c:1202
udp_leave_multicast_group
static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr, struct sockaddr *local_addr, void *logctx)
Definition: udp.c:243
fail
#define fail()
Definition: checkasm.h:196
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
UDPContext::filters
IPSourceFilters filters
Definition: udp.c:124
udp_set_url
static int udp_set_url(URLContext *h, struct sockaddr_storage *addr, const char *hostname, int port)
Definition: udp.c:346
mutex
static AVMutex mutex
Definition: resman.c:61
avassert.h
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
intreadwrite.h
s
#define s(width, name)
Definition: cbs_vp9.c:198
UDPContext::remaining_in_dg
int remaining_in_dg
Definition: udp.c:118
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
bitrate
int64_t bitrate
Definition: av1_levels.c:47
UDPContext::close_req
int close_req
Definition: udp.c:110
AV_OPT_TYPE_INT64
@ AV_OPT_TYPE_INT64
Underlying C type is int64_t.
Definition: opt.h:263
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:41
UDPContext::dest_addr_len
int dest_addr_len
Definition: udp.c:100
AVIO_FLAG_WRITE
#define AVIO_FLAG_WRITE
write-only
Definition: avio.h:618
AV_LOG_DEBUG
#define AV_LOG_DEBUG
Stuff which is only useful for libav* developers.
Definition: log.h:231
av_usleep
int av_usleep(unsigned usec)
Sleep for a period of time.
Definition: time.c:84
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
UDPContext::reuse_socket
int reuse_socket
Definition: udp.c:97
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
NULL
#define NULL
Definition: coverity.c:32
ff_ip_parse_blocks
int ff_ip_parse_blocks(void *log_ctx, const char *buf, IPSourceFilters *filters)
Parses the address[,address] source block list in buf and adds it to the filters in the IPSourceFilte...
Definition: ip.c:150
tmp
static uint8_t tmp[20]
Definition: aes_ctr.c:47
UDPContext::circular_buffer_size
int circular_buffer_size
Definition: udp.c:104
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:240
UDPContext::overrun_nonfatal
int overrun_nonfatal
Definition: udp.c:98
parseutils.h
ff_is_multicast_address
int ff_is_multicast_address(struct sockaddr *addr)
Definition: network.c:142
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
options
Definition: swscale.c:43
UDPContext::localaddr
char * localaddr
Definition: udp.c:119
time.h
ff_neterrno
#define ff_neterrno()
Definition: network.h:68
addrinfo::ai_addr
struct sockaddr * ai_addr
Definition: network.h:143
UDPContext::buffer_size
int buffer_size
Definition: udp.c:92
ff_ip_resolve_host
struct addrinfo * ff_ip_resolve_host(void *log_ctx, const char *hostname, int port, int type, int family, int flags)
Resolves hostname into an addrinfo structure.
Definition: ip.c:65
options
static const AVOption options[]
Definition: udp.c:132
addrinfo::ai_family
int ai_family
Definition: network.h:139
UDPContext::udp_fd
int udp_fd
Definition: udp.c:89
AVFifo
Definition: fifo.c:35
UDPContext::circular_buffer_error
int circular_buffer_error
Definition: udp.c:107
UDPContext
Definition: udp.c:87
size
int size
Definition: twinvq_data.h:10344
UDP_RX_BUF_SIZE
#define UDP_RX_BUF_SIZE
Definition: udp.c:77
URLProtocol::name
const char * name
Definition: url.h:52
UDPContext::burst_bits
int64_t burst_bits
Definition: udp.c:109
UDPContext::tx_fifo
AVFifo * tx_fifo
Definition: udp.c:106
ff_socket_nonblock
int ff_socket_nonblock(int socket, int enable)
header
static const uint8_t header[24]
Definition: sdr2.c:68
UDPContext::block
char * block
Definition: udp.c:123
ff_udp_get_last_recv_addr
void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len)
Definition: udp.c:509
udplite_open
static int udplite_open(URLContext *h, const char *uri, int flags)
Definition: udp.c:1029
gai_strerror
#define gai_strerror
Definition: network.h:225
UDPContext::local_port
int local_port
Definition: udp.c:96
pthread_t
Definition: os2threads.h:44
UDPContext::last_recv_addr
struct sockaddr_storage last_recv_addr
Definition: udp.c:125
UDPContext::ttl
int ttl
Definition: udp.c:90
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
addrinfo::ai_next
struct addrinfo * ai_next
Definition: network.h:145
addrinfo::ai_addrlen
int ai_addrlen
Definition: network.h:142
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
udp_close
static int udp_close(URLContext *h)
Definition: udp.c:1156
UDP_MAX_PKT_SIZE
#define UDP_MAX_PKT_SIZE
Definition: udp.c:78
URLContext
Definition: url.h:35
log.h
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
getnameinfo
#define getnameinfo
Definition: network.h:219
ip.h
UDPContext::dest_addr
struct sockaddr_storage dest_addr
Definition: udp.c:99
av_url_split
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
Definition: utils.c:354
udp_join_multicast_group
static int udp_join_multicast_group(int sockfd, struct sockaddr *addr, struct sockaddr *local_addr, void *logctx)
Definition: udp.c:209
url.h
len
int len
Definition: vorbis_enc_data.h:426
pthread_cond_t
Definition: os2threads.h:58
ff_udplite_protocol
const URLProtocol ff_udplite_protocol
Definition: udp.c:1214
pthread_setcancelstate
static int pthread_setcancelstate(int state, int *oldstate)
Definition: w32pthreads.h:207
udp_read
static int udp_read(URLContext *h, uint8_t *buf, int size)
Definition: udp.c:1039
udp_set_multicast_sources
static int udp_set_multicast_sources(URLContext *h, int sockfd, struct sockaddr *addr, int addr_len, struct sockaddr_storage *local_addr, struct sockaddr_storage *sources, int nb_sources, int include)
Definition: udp.c:277
ret
ret
Definition: filter_design.txt:187
UDP_TX_BUF_SIZE
#define UDP_TX_BUF_SIZE
Definition: udp.c:76
D
#define D
Definition: udp.c:130
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
avformat.h
UDPContext::timeout
int timeout
Definition: udp.c:120
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
network.h
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
AV_RL32
uint64_t_TMPL AV_WL64 unsigned int_TMPL AV_RL32
Definition: bytestream.h:92
UDPQueuedPacketHeader::addr
struct sockaddr_storage addr
Definition: udp.c:83
UDPQueuedPacketHeader::pkt_size
int pkt_size
Definition: udp.c:82
IPSourceFilters
Structure for storing IP (UDP) source filters or block lists.
Definition: ip.h:29
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Underlying C type is int.
Definition: opt.h:259
IPV6_DROP_MEMBERSHIP
#define IPV6_DROP_MEMBERSHIP
Definition: udp.c:73
udp_write
static int udp_write(URLContext *h, const uint8_t *buf, int size)
Definition: udp.c:1106
ff_ip_check_source_lists
int ff_ip_check_source_lists(struct sockaddr_storage *source_addr_ptr, IPSourceFilters *s)
Checks the source address against a given IP source filter.
Definition: ip.c:46
UDPQueuedPacketHeader::addr_len
socklen_t addr_len
Definition: udp.c:84
udp_open
static int udp_open(URLContext *h, const char *uri, int flags)
Definition: udp.c:691
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
UDPContext::rx_fifo
AVFifo * rx_fifo
Definition: udp.c:105
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
AVIO_FLAG_READ
#define AVIO_FLAG_READ
read-only
Definition: avio.h:617
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
mem.h
udp_get_file_handle
static int udp_get_file_handle(URLContext *h)
Return the udp file handle for select() usage to wait for several RTP streams at the same time.
Definition: udp.c:521
AV_OPT_TYPE_BOOL
@ AV_OPT_TYPE_BOOL
Underlying C type is int.
Definition: opt.h:327
AVIO_FLAG_NONBLOCK
#define AVIO_FLAG_NONBLOCK
Use non-blocking mode.
Definition: avio.h:636
UDPContext::last_recv_addr_len
socklen_t last_recv_addr_len
Definition: udp.c:126
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
UDPContext::sources
char * sources
Definition: udp.c:122
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
block
The exact code depends on how similar the blocks are and how related they are to the block
Definition: filter_design.txt:207
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
UDPContext::udplite_coverage
int udplite_coverage
Definition: udp.c:91
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
h
h
Definition: vp9dsp_template.c:2070
ff_socket
int ff_socket(int af, int type, int proto, void *logctx)
Definition: network.c:180
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Underlying C type is a uint8_t* that is either NULL or points to a C string allocated with the av_mal...
Definition: opt.h:276
addrinfo
Definition: network.h:137
IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP
Definition: udp.c:72
UDP_HEADER_SIZE
#define UDP_HEADER_SIZE
Definition: udp.c:79
cond
int(* cond)(enum AVPixelFormat pix_fmt)
Definition: pixdesc_query.c:28
udp_class
static const AVClass udp_class
Definition: udp.c:154
AI_PASSIVE
#define AI_PASSIVE
Definition: network.h:179
UDPLITE_SEND_CSCOV
#define UDPLITE_SEND_CSCOV
Definition: udp.c:54
ff_thread_setname
static int ff_thread_setname(const char *name)
Definition: thread.h:216
ff_network_wait_fd
int ff_network_wait_fd(int fd, int write)
Definition: network.c:66