diff --git a/compat/w32pthreads.h b/compat/w32pthreads.h index 7b51c25843..cfb8f6471f 100644 --- a/compat/w32pthreads.h +++ b/compat/w32pthreads.h @@ -134,28 +134,29 @@ typedef struct win32_cond_t { volatile int is_broadcast; } win32_cond_t; -static void pthread_cond_init(pthread_cond_t *cond, const void *unused_attr) +static int pthread_cond_init(pthread_cond_t *cond, const void *unused_attr) { win32_cond_t *win32_cond = NULL; if (cond_init) { cond_init(cond); - return; + return 0; } /* non native condition variables */ win32_cond = av_mallocz(sizeof(win32_cond_t)); if (!win32_cond) - return; + return ENOMEM; cond->ptr = win32_cond; win32_cond->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); if (!win32_cond->semaphore) - return; + return ENOMEM; win32_cond->waiters_done = CreateEvent(NULL, TRUE, FALSE, NULL); if (!win32_cond->waiters_done) - return; + return ENOMEM; pthread_mutex_init(&win32_cond->mtx_waiter_count, NULL); pthread_mutex_init(&win32_cond->mtx_broadcast, NULL); + return 0; } static void pthread_cond_destroy(pthread_cond_t *cond) diff --git a/doc/APIchanges b/doc/APIchanges index 4a47291917..395083c091 100644 --- a/doc/APIchanges +++ b/doc/APIchanges @@ -15,6 +15,9 @@ libavutil: 2012-10-22 API changes, most recent first: +2014-05-26 - xxxxxxx - lavu 52.87.100 - threadmessage.h + Add thread message queue API. + 2014-05-26 - c37d179 - lavf 55.41.100 - avformat.h Add format_probesize to AVFormatContext. diff --git a/ffmpeg.c b/ffmpeg.c index 5299f0ef0f..a0e2be294f 100644 --- a/ffmpeg.c +++ b/ffmpeg.c @@ -59,6 +59,7 @@ #include "libavutil/timestamp.h" #include "libavutil/bprint.h" #include "libavutil/time.h" +#include "libavutil/threadmessage.h" #include "libavformat/os_support.h" #include "libavformat/ffm.h" // not public API @@ -132,11 +133,6 @@ AVIOContext *progress_avio = NULL; static uint8_t *subtitle_out; -#if HAVE_PTHREADS -/* signal to input threads that they should exit; set by the main thread */ -static int transcoding_finished; -#endif - #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass" InputStream **input_streams = NULL; @@ -3105,32 +3101,31 @@ static void *input_thread(void *arg) InputFile *f = arg; int ret = 0; - while (!transcoding_finished && ret >= 0) { + while (1) { AVPacket pkt; ret = av_read_frame(f->ctx, &pkt); if (ret == AVERROR(EAGAIN)) { av_usleep(10000); - ret = 0; continue; - } else if (ret < 0) + } + if (ret < 0) { + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); break; - - pthread_mutex_lock(&f->fifo_lock); - while (!av_fifo_space(f->fifo)) - pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); - + } av_dup_packet(&pkt); - av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); - pthread_cond_signal(&f->fifo_cond); - - pthread_mutex_unlock(&f->fifo_lock); + ret = av_thread_message_queue_send(f->in_thread_queue, &pkt, 0); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(f->ctx, AV_LOG_ERROR, + "Unable to send packet to main thread: %s\n", + av_err2str(ret)); + av_free_packet(&pkt); + av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); + break; + } } - pthread_mutex_lock(&f->fifo_lock); - f->finished = 1; - pthread_cond_signal(&f->fifo_cond); - pthread_mutex_unlock(&f->fifo_lock); return NULL; } @@ -3138,34 +3133,19 @@ static void free_input_threads(void) { int i; - if (nb_input_files == 1) - return; - - transcoding_finished = 1; - for (i = 0; i < nb_input_files; i++) { InputFile *f = input_files[i]; AVPacket pkt; - if (!f->fifo || f->joined) + if (!f->in_thread_queue) continue; - - pthread_mutex_lock(&f->fifo_lock); - while (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF); + while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0) av_free_packet(&pkt); - } - pthread_cond_signal(&f->fifo_cond); - pthread_mutex_unlock(&f->fifo_lock); pthread_join(f->thread, NULL); f->joined = 1; - - while (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); - av_free_packet(&pkt); - } - av_fifo_freep(&f->fifo); + av_thread_message_queue_free(&f->in_thread_queue); } } @@ -3179,15 +3159,13 @@ static int init_input_threads(void) for (i = 0; i < nb_input_files; i++) { InputFile *f = input_files[i]; - if (!(f->fifo = av_fifo_alloc_array(8, sizeof(AVPacket)))) - return AVERROR(ENOMEM); - if (f->ctx->pb ? !f->ctx->pb->seekable : strcmp(f->ctx->iformat->name, "lavfi")) f->non_blocking = 1; - - pthread_mutex_init(&f->fifo_lock, NULL); - pthread_cond_init (&f->fifo_cond, NULL); + ret = av_thread_message_queue_alloc(&f->in_thread_queue, + 8, sizeof(AVPacket)); + if (ret < 0) + return ret; if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) return AVERROR(ret); @@ -3197,31 +3175,9 @@ static int init_input_threads(void) static int get_input_packet_mt(InputFile *f, AVPacket *pkt) { - int ret = 0; - - pthread_mutex_lock(&f->fifo_lock); - - while (1) { - if (av_fifo_size(f->fifo)) { - av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL); - pthread_cond_signal(&f->fifo_cond); - break; - } else { - if (f->finished) { - ret = AVERROR_EOF; - break; - } - if (f->non_blocking) { - ret = AVERROR(EAGAIN); - break; - } - pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); - } - } - - pthread_mutex_unlock(&f->fifo_lock); - - return ret; + return av_thread_message_queue_recv(f->in_thread_queue, pkt, + f->non_blocking ? + AV_THREAD_MESSAGE_NONBLOCK : 0); } #endif diff --git a/ffmpeg.h b/ffmpeg.h index 41d666340d..3783e2cc50 100644 --- a/ffmpeg.h +++ b/ffmpeg.h @@ -44,6 +44,7 @@ #include "libavutil/fifo.h" #include "libavutil/pixfmt.h" #include "libavutil/rational.h" +#include "libavutil/threadmessage.h" #include "libswresample/swresample.h" @@ -336,13 +337,10 @@ typedef struct InputFile { int accurate_seek; #if HAVE_PTHREADS + AVThreadMessageQueue *in_thread_queue; pthread_t thread; /* thread reading from this file */ int non_blocking; /* reading packets from the thread should not block */ - int finished; /* the thread has exited */ int joined; /* the thread has been joined */ - pthread_mutex_t fifo_lock; /* lock for access to fifo */ - pthread_cond_t fifo_cond; /* the main thread will signal on this cond after reading from fifo */ - AVFifoBuffer *fifo; /* demuxed packets are stored here; freed by the main thread */ #endif } InputFile; diff --git a/libavutil/Makefile b/libavutil/Makefile index 8832715909..5cc978b021 100644 --- a/libavutil/Makefile +++ b/libavutil/Makefile @@ -52,6 +52,7 @@ HEADERS = adler32.h \ sha.h \ sha512.h \ stereo3d.h \ + threadmessage.h \ time.h \ timecode.h \ timestamp.h \ @@ -119,6 +120,7 @@ OBJS = adler32.o \ sha.o \ sha512.o \ stereo3d.o \ + threadmessage.o \ time.o \ timecode.o \ tree.o \ diff --git a/libavutil/threadmessage.c b/libavutil/threadmessage.c new file mode 100644 index 0000000000..b7fcbe28c0 --- /dev/null +++ b/libavutil/threadmessage.c @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2014 Nicolas George + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with FFmpeg; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include "fifo.h" +#include "threadmessage.h" +#if HAVE_THREADS +#if HAVE_PTHREADS +#include +#elif HAVE_W32THREADS +#include "compat/w32pthreads.h" +#elif HAVE_OS2THREADS +#include "compat/os2threads.h" +#else +#error "Unknown threads implementation" +#endif +#endif + +struct AVThreadMessageQueue { +#if HAVE_THREADS + AVFifoBuffer *fifo; + pthread_mutex_t lock; + pthread_cond_t cond; + int err_send; + int err_recv; + unsigned elsize; +#else + int dummy; +#endif +}; + +int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, + unsigned nelem, + unsigned elsize) +{ +#if HAVE_THREADS + AVThreadMessageQueue *rmq; + int ret = 0; + + if (nelem > INT_MAX / elsize) + return AVERROR(EINVAL); + if (!(rmq = av_mallocz(sizeof(*rmq)))) + return AVERROR(ENOMEM); + if ((ret = pthread_mutex_init(&rmq->lock, NULL))) { + av_free(rmq); + return AVERROR(ret); + } + if ((ret = pthread_cond_init(&rmq->cond, NULL))) { + pthread_mutex_destroy(&rmq->lock); + av_free(rmq); + return AVERROR(ret); + } + if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) { + pthread_cond_destroy(&rmq->cond); + pthread_mutex_destroy(&rmq->lock); + av_free(rmq); + return AVERROR(ret); + } + rmq->elsize = elsize; + *mq = rmq; + return 0; +#else + *mq = NULL; + return AVERROR(ENOSYS); +#endif /* HAVE_THREADS */ +} + +void av_thread_message_queue_free(AVThreadMessageQueue **mq) +{ +#if HAVE_THREADS + if (*mq) { + av_fifo_freep(&(*mq)->fifo); + pthread_cond_destroy(&(*mq)->cond); + pthread_mutex_destroy(&(*mq)->lock); + av_freep(mq); + } +#endif +} + +#if HAVE_THREADS + +static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq, + void *msg, + unsigned flags) +{ + while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) { + if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) + return AVERROR(EAGAIN); + pthread_cond_wait(&mq->cond, &mq->lock); + } + if (mq->err_send) + return mq->err_send; + av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL); + pthread_cond_signal(&mq->cond); + return 0; +} + +static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq, + void *msg, + unsigned flags) +{ + while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) { + if ((flags & AV_THREAD_MESSAGE_NONBLOCK)) + return AVERROR(EAGAIN); + pthread_cond_wait(&mq->cond, &mq->lock); + } + if (av_fifo_size(mq->fifo) < mq->elsize) + return mq->err_recv; + av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL); + pthread_cond_signal(&mq->cond); + return 0; +} + +#endif /* HAVE_THREADS */ + +int av_thread_message_queue_send(AVThreadMessageQueue *mq, + void *msg, + unsigned flags) +{ +#if HAVE_THREADS + int ret; + + pthread_mutex_lock(&mq->lock); + ret = av_thread_message_queue_send_locked(mq, msg, flags); + pthread_mutex_unlock(&mq->lock); + return ret; +#else + return AVERROR(ENOSYS); +#endif /* HAVE_THREADS */ +} + +int av_thread_message_queue_recv(AVThreadMessageQueue *mq, + void *msg, + unsigned flags) +{ +#if HAVE_THREADS + int ret; + + pthread_mutex_lock(&mq->lock); + ret = av_thread_message_queue_recv_locked(mq, msg, flags); + pthread_mutex_unlock(&mq->lock); + return ret; +#else + return AVERROR(ENOSYS); +#endif /* HAVE_THREADS */ +} + +void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, + int err) +{ +#if HAVE_THREADS + pthread_mutex_lock(&mq->lock); + mq->err_send = err; + pthread_cond_broadcast(&mq->cond); + pthread_mutex_unlock(&mq->lock); +#endif /* HAVE_THREADS */ +} + +void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, + int err) +{ +#if HAVE_THREADS + pthread_mutex_lock(&mq->lock); + mq->err_recv = err; + pthread_cond_broadcast(&mq->cond); + pthread_mutex_unlock(&mq->lock); +#endif /* HAVE_THREADS */ +} diff --git a/libavutil/threadmessage.h b/libavutil/threadmessage.h new file mode 100644 index 0000000000..a8481d8ec3 --- /dev/null +++ b/libavutil/threadmessage.h @@ -0,0 +1,91 @@ +/* + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with FFmpeg; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef AVUTIL_THREADMESSAGE_H +#define AVUTIL_THREADMESSAGE_H + +typedef struct AVThreadMessageQueue AVThreadMessageQueue; + +typedef enum AVThreadMessageFlags { + + /** + * Perform non-blocking operation. + * If this flag is set, send and recv operations are non-blocking and + * return AVERROR(EAGAIN) immediately if they can not proceed. + */ + AV_THREAD_MESSAGE_NONBLOCK = 1, + +} AVThreadMessageFlags; + +/** + * Allocate a new message queue. + * + * @param mq pointer to the message queue + * @param nelem maximum number of elements in the queue + * @param elsize size of each element in the queue + * @return >=0 for success; <0 for error, in particular AVERROR(ENOSYS) if + * lavu was built without thread support + */ +int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, + unsigned nelem, + unsigned elsize); + +/** + * Free a message queue. + * + * The message queue must no longer be in use by another thread. + */ +void av_thread_message_queue_free(AVThreadMessageQueue **mq); + +/** + * Send a message on the queue. + */ +int av_thread_message_queue_send(AVThreadMessageQueue *mq, + void *msg, + unsigned flags); + +/** + * Receive a message from the queue. + */ +int av_thread_message_queue_recv(AVThreadMessageQueue *mq, + void *msg, + unsigned flags); + +/** + * Set the sending error code. + * + * If the error code is set to non-zero, av_thread_message_queue_recv() will + * return it immediately when there are no longer available messages. + * Conventional values, such as AVERROR_EOF or AVERROR(EAGAIN), can be used + * to cause the receiving thread to stop or suspend its operation. + */ +void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, + int err); + +/** + * Set the receiving error code. + * + * If the error code is set to non-zero, av_thread_message_queue_send() will + * return it immediately. Conventional values, such as AVERROR_EOF or + * AVERROR(EAGAIN), can be used to cause the sending thread to stop or + * suspend its operation. + */ +void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, + int err); + +#endif /* AVUTIL_THREADMESSAGE_H */ diff --git a/libavutil/version.h b/libavutil/version.h index b134615a5b..4dad89a2fb 100644 --- a/libavutil/version.h +++ b/libavutil/version.h @@ -56,7 +56,7 @@ */ #define LIBAVUTIL_VERSION_MAJOR 52 -#define LIBAVUTIL_VERSION_MINOR 86 +#define LIBAVUTIL_VERSION_MINOR 87 #define LIBAVUTIL_VERSION_MICRO 100 #define LIBAVUTIL_VERSION_INT AV_VERSION_INT(LIBAVUTIL_VERSION_MAJOR, \