Merge remote-tracking branch 'cigaes/master'

* cigaes/master:
  ffmpeg: use thread message API.
  lavu: add thread message API.
  compat/w32pthreads: add return value to pthread_cond_init().

Merged-by: Michael Niedermayer <michaelni@gmx.at>
This commit is contained in:
Michael Niedermayer 2014-05-26 16:35:47 +02:00
commit 2db89765f3
8 changed files with 316 additions and 81 deletions

View File

@ -134,28 +134,29 @@ typedef struct win32_cond_t {
volatile int is_broadcast; volatile int is_broadcast;
} win32_cond_t; } win32_cond_t;
static void pthread_cond_init(pthread_cond_t *cond, const void *unused_attr) static int pthread_cond_init(pthread_cond_t *cond, const void *unused_attr)
{ {
win32_cond_t *win32_cond = NULL; win32_cond_t *win32_cond = NULL;
if (cond_init) { if (cond_init) {
cond_init(cond); cond_init(cond);
return; return 0;
} }
/* non native condition variables */ /* non native condition variables */
win32_cond = av_mallocz(sizeof(win32_cond_t)); win32_cond = av_mallocz(sizeof(win32_cond_t));
if (!win32_cond) if (!win32_cond)
return; return ENOMEM;
cond->ptr = win32_cond; cond->ptr = win32_cond;
win32_cond->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL); win32_cond->semaphore = CreateSemaphore(NULL, 0, 0x7fffffff, NULL);
if (!win32_cond->semaphore) if (!win32_cond->semaphore)
return; return ENOMEM;
win32_cond->waiters_done = CreateEvent(NULL, TRUE, FALSE, NULL); win32_cond->waiters_done = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!win32_cond->waiters_done) if (!win32_cond->waiters_done)
return; return ENOMEM;
pthread_mutex_init(&win32_cond->mtx_waiter_count, NULL); pthread_mutex_init(&win32_cond->mtx_waiter_count, NULL);
pthread_mutex_init(&win32_cond->mtx_broadcast, NULL); pthread_mutex_init(&win32_cond->mtx_broadcast, NULL);
return 0;
} }
static void pthread_cond_destroy(pthread_cond_t *cond) static void pthread_cond_destroy(pthread_cond_t *cond)

View File

@ -15,6 +15,9 @@ libavutil: 2012-10-22
API changes, most recent first: API changes, most recent first:
2014-05-26 - xxxxxxx - lavu 52.87.100 - threadmessage.h
Add thread message queue API.
2014-05-26 - c37d179 - lavf 55.41.100 - avformat.h 2014-05-26 - c37d179 - lavf 55.41.100 - avformat.h
Add format_probesize to AVFormatContext. Add format_probesize to AVFormatContext.

View File

@ -59,6 +59,7 @@
#include "libavutil/timestamp.h" #include "libavutil/timestamp.h"
#include "libavutil/bprint.h" #include "libavutil/bprint.h"
#include "libavutil/time.h" #include "libavutil/time.h"
#include "libavutil/threadmessage.h"
#include "libavformat/os_support.h" #include "libavformat/os_support.h"
#include "libavformat/ffm.h" // not public API #include "libavformat/ffm.h" // not public API
@ -132,11 +133,6 @@ AVIOContext *progress_avio = NULL;
static uint8_t *subtitle_out; static uint8_t *subtitle_out;
#if HAVE_PTHREADS
/* signal to input threads that they should exit; set by the main thread */
static int transcoding_finished;
#endif
#define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass" #define DEFAULT_PASS_LOGFILENAME_PREFIX "ffmpeg2pass"
InputStream **input_streams = NULL; InputStream **input_streams = NULL;
@ -3105,32 +3101,31 @@ static void *input_thread(void *arg)
InputFile *f = arg; InputFile *f = arg;
int ret = 0; int ret = 0;
while (!transcoding_finished && ret >= 0) { while (1) {
AVPacket pkt; AVPacket pkt;
ret = av_read_frame(f->ctx, &pkt); ret = av_read_frame(f->ctx, &pkt);
if (ret == AVERROR(EAGAIN)) { if (ret == AVERROR(EAGAIN)) {
av_usleep(10000); av_usleep(10000);
ret = 0;
continue; continue;
} else if (ret < 0) }
if (ret < 0) {
av_thread_message_queue_set_err_recv(f->in_thread_queue, ret);
break; break;
}
pthread_mutex_lock(&f->fifo_lock);
while (!av_fifo_space(f->fifo))
pthread_cond_wait(&f->fifo_cond, &f->fifo_lock);
av_dup_packet(&pkt); av_dup_packet(&pkt);
av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); ret = av_thread_message_queue_send(f->in_thread_queue, &pkt, 0);
pthread_cond_signal(&f->fifo_cond); if (ret < 0) {
if (ret != AVERROR_EOF)
pthread_mutex_unlock(&f->fifo_lock); av_log(f->ctx, AV_LOG_ERROR,
"Unable to send packet to main thread: %s\n",
av_err2str(ret));
av_free_packet(&pkt);
av_thread_message_queue_set_err_recv(f->in_thread_queue, ret);
break;
}
} }
pthread_mutex_lock(&f->fifo_lock);
f->finished = 1;
pthread_cond_signal(&f->fifo_cond);
pthread_mutex_unlock(&f->fifo_lock);
return NULL; return NULL;
} }
@ -3138,34 +3133,19 @@ static void free_input_threads(void)
{ {
int i; int i;
if (nb_input_files == 1)
return;
transcoding_finished = 1;
for (i = 0; i < nb_input_files; i++) { for (i = 0; i < nb_input_files; i++) {
InputFile *f = input_files[i]; InputFile *f = input_files[i];
AVPacket pkt; AVPacket pkt;
if (!f->fifo || f->joined) if (!f->in_thread_queue)
continue; continue;
av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF);
pthread_mutex_lock(&f->fifo_lock); while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0)
while (av_fifo_size(f->fifo)) {
av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL);
av_free_packet(&pkt); av_free_packet(&pkt);
}
pthread_cond_signal(&f->fifo_cond);
pthread_mutex_unlock(&f->fifo_lock);
pthread_join(f->thread, NULL); pthread_join(f->thread, NULL);
f->joined = 1; f->joined = 1;
av_thread_message_queue_free(&f->in_thread_queue);
while (av_fifo_size(f->fifo)) {
av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL);
av_free_packet(&pkt);
}
av_fifo_freep(&f->fifo);
} }
} }
@ -3179,15 +3159,13 @@ static int init_input_threads(void)
for (i = 0; i < nb_input_files; i++) { for (i = 0; i < nb_input_files; i++) {
InputFile *f = input_files[i]; InputFile *f = input_files[i];
if (!(f->fifo = av_fifo_alloc_array(8, sizeof(AVPacket))))
return AVERROR(ENOMEM);
if (f->ctx->pb ? !f->ctx->pb->seekable : if (f->ctx->pb ? !f->ctx->pb->seekable :
strcmp(f->ctx->iformat->name, "lavfi")) strcmp(f->ctx->iformat->name, "lavfi"))
f->non_blocking = 1; f->non_blocking = 1;
ret = av_thread_message_queue_alloc(&f->in_thread_queue,
pthread_mutex_init(&f->fifo_lock, NULL); 8, sizeof(AVPacket));
pthread_cond_init (&f->fifo_cond, NULL); if (ret < 0)
return ret;
if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) if ((ret = pthread_create(&f->thread, NULL, input_thread, f)))
return AVERROR(ret); return AVERROR(ret);
@ -3197,31 +3175,9 @@ static int init_input_threads(void)
static int get_input_packet_mt(InputFile *f, AVPacket *pkt) static int get_input_packet_mt(InputFile *f, AVPacket *pkt)
{ {
int ret = 0; return av_thread_message_queue_recv(f->in_thread_queue, pkt,
f->non_blocking ?
pthread_mutex_lock(&f->fifo_lock); AV_THREAD_MESSAGE_NONBLOCK : 0);
while (1) {
if (av_fifo_size(f->fifo)) {
av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL);
pthread_cond_signal(&f->fifo_cond);
break;
} else {
if (f->finished) {
ret = AVERROR_EOF;
break;
}
if (f->non_blocking) {
ret = AVERROR(EAGAIN);
break;
}
pthread_cond_wait(&f->fifo_cond, &f->fifo_lock);
}
}
pthread_mutex_unlock(&f->fifo_lock);
return ret;
} }
#endif #endif

View File

@ -44,6 +44,7 @@
#include "libavutil/fifo.h" #include "libavutil/fifo.h"
#include "libavutil/pixfmt.h" #include "libavutil/pixfmt.h"
#include "libavutil/rational.h" #include "libavutil/rational.h"
#include "libavutil/threadmessage.h"
#include "libswresample/swresample.h" #include "libswresample/swresample.h"
@ -336,13 +337,10 @@ typedef struct InputFile {
int accurate_seek; int accurate_seek;
#if HAVE_PTHREADS #if HAVE_PTHREADS
AVThreadMessageQueue *in_thread_queue;
pthread_t thread; /* thread reading from this file */ pthread_t thread; /* thread reading from this file */
int non_blocking; /* reading packets from the thread should not block */ int non_blocking; /* reading packets from the thread should not block */
int finished; /* the thread has exited */
int joined; /* the thread has been joined */ int joined; /* the thread has been joined */
pthread_mutex_t fifo_lock; /* lock for access to fifo */
pthread_cond_t fifo_cond; /* the main thread will signal on this cond after reading from fifo */
AVFifoBuffer *fifo; /* demuxed packets are stored here; freed by the main thread */
#endif #endif
} InputFile; } InputFile;

View File

@ -52,6 +52,7 @@ HEADERS = adler32.h \
sha.h \ sha.h \
sha512.h \ sha512.h \
stereo3d.h \ stereo3d.h \
threadmessage.h \
time.h \ time.h \
timecode.h \ timecode.h \
timestamp.h \ timestamp.h \
@ -119,6 +120,7 @@ OBJS = adler32.o \
sha.o \ sha.o \
sha512.o \ sha512.o \
stereo3d.o \ stereo3d.o \
threadmessage.o \
time.o \ time.o \
timecode.o \ timecode.o \
tree.o \ tree.o \

184
libavutil/threadmessage.c Normal file
View File

@ -0,0 +1,184 @@
/*
* Copyright (c) 2014 Nicolas George
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "fifo.h"
#include "threadmessage.h"
#if HAVE_THREADS
#if HAVE_PTHREADS
#include <pthread.h>
#elif HAVE_W32THREADS
#include "compat/w32pthreads.h"
#elif HAVE_OS2THREADS
#include "compat/os2threads.h"
#else
#error "Unknown threads implementation"
#endif
#endif
struct AVThreadMessageQueue {
#if HAVE_THREADS
AVFifoBuffer *fifo;
pthread_mutex_t lock;
pthread_cond_t cond;
int err_send;
int err_recv;
unsigned elsize;
#else
int dummy;
#endif
};
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
unsigned nelem,
unsigned elsize)
{
#if HAVE_THREADS
AVThreadMessageQueue *rmq;
int ret = 0;
if (nelem > INT_MAX / elsize)
return AVERROR(EINVAL);
if (!(rmq = av_mallocz(sizeof(*rmq))))
return AVERROR(ENOMEM);
if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
av_free(rmq);
return AVERROR(ret);
}
if ((ret = pthread_cond_init(&rmq->cond, NULL))) {
pthread_mutex_destroy(&rmq->lock);
av_free(rmq);
return AVERROR(ret);
}
if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
pthread_cond_destroy(&rmq->cond);
pthread_mutex_destroy(&rmq->lock);
av_free(rmq);
return AVERROR(ret);
}
rmq->elsize = elsize;
*mq = rmq;
return 0;
#else
*mq = NULL;
return AVERROR(ENOSYS);
#endif /* HAVE_THREADS */
}
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
{
#if HAVE_THREADS
if (*mq) {
av_fifo_freep(&(*mq)->fifo);
pthread_cond_destroy(&(*mq)->cond);
pthread_mutex_destroy(&(*mq)->lock);
av_freep(mq);
}
#endif
}
#if HAVE_THREADS
static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
void *msg,
unsigned flags)
{
while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
return AVERROR(EAGAIN);
pthread_cond_wait(&mq->cond, &mq->lock);
}
if (mq->err_send)
return mq->err_send;
av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
pthread_cond_signal(&mq->cond);
return 0;
}
static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
void *msg,
unsigned flags)
{
while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
return AVERROR(EAGAIN);
pthread_cond_wait(&mq->cond, &mq->lock);
}
if (av_fifo_size(mq->fifo) < mq->elsize)
return mq->err_recv;
av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
pthread_cond_signal(&mq->cond);
return 0;
}
#endif /* HAVE_THREADS */
int av_thread_message_queue_send(AVThreadMessageQueue *mq,
void *msg,
unsigned flags)
{
#if HAVE_THREADS
int ret;
pthread_mutex_lock(&mq->lock);
ret = av_thread_message_queue_send_locked(mq, msg, flags);
pthread_mutex_unlock(&mq->lock);
return ret;
#else
return AVERROR(ENOSYS);
#endif /* HAVE_THREADS */
}
int av_thread_message_queue_recv(AVThreadMessageQueue *mq,
void *msg,
unsigned flags)
{
#if HAVE_THREADS
int ret;
pthread_mutex_lock(&mq->lock);
ret = av_thread_message_queue_recv_locked(mq, msg, flags);
pthread_mutex_unlock(&mq->lock);
return ret;
#else
return AVERROR(ENOSYS);
#endif /* HAVE_THREADS */
}
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
int err)
{
#if HAVE_THREADS
pthread_mutex_lock(&mq->lock);
mq->err_send = err;
pthread_cond_broadcast(&mq->cond);
pthread_mutex_unlock(&mq->lock);
#endif /* HAVE_THREADS */
}
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
int err)
{
#if HAVE_THREADS
pthread_mutex_lock(&mq->lock);
mq->err_recv = err;
pthread_cond_broadcast(&mq->cond);
pthread_mutex_unlock(&mq->lock);
#endif /* HAVE_THREADS */
}

91
libavutil/threadmessage.h Normal file
View File

@ -0,0 +1,91 @@
/*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef AVUTIL_THREADMESSAGE_H
#define AVUTIL_THREADMESSAGE_H
typedef struct AVThreadMessageQueue AVThreadMessageQueue;
typedef enum AVThreadMessageFlags {
/**
* Perform non-blocking operation.
* If this flag is set, send and recv operations are non-blocking and
* return AVERROR(EAGAIN) immediately if they can not proceed.
*/
AV_THREAD_MESSAGE_NONBLOCK = 1,
} AVThreadMessageFlags;
/**
* Allocate a new message queue.
*
* @param mq pointer to the message queue
* @param nelem maximum number of elements in the queue
* @param elsize size of each element in the queue
* @return >=0 for success; <0 for error, in particular AVERROR(ENOSYS) if
* lavu was built without thread support
*/
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
unsigned nelem,
unsigned elsize);
/**
* Free a message queue.
*
* The message queue must no longer be in use by another thread.
*/
void av_thread_message_queue_free(AVThreadMessageQueue **mq);
/**
* Send a message on the queue.
*/
int av_thread_message_queue_send(AVThreadMessageQueue *mq,
void *msg,
unsigned flags);
/**
* Receive a message from the queue.
*/
int av_thread_message_queue_recv(AVThreadMessageQueue *mq,
void *msg,
unsigned flags);
/**
* Set the sending error code.
*
* If the error code is set to non-zero, av_thread_message_queue_recv() will
* return it immediately when there are no longer available messages.
* Conventional values, such as AVERROR_EOF or AVERROR(EAGAIN), can be used
* to cause the receiving thread to stop or suspend its operation.
*/
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
int err);
/**
* Set the receiving error code.
*
* If the error code is set to non-zero, av_thread_message_queue_send() will
* return it immediately. Conventional values, such as AVERROR_EOF or
* AVERROR(EAGAIN), can be used to cause the sending thread to stop or
* suspend its operation.
*/
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
int err);
#endif /* AVUTIL_THREADMESSAGE_H */

View File

@ -56,7 +56,7 @@
*/ */
#define LIBAVUTIL_VERSION_MAJOR 52 #define LIBAVUTIL_VERSION_MAJOR 52
#define LIBAVUTIL_VERSION_MINOR 86 #define LIBAVUTIL_VERSION_MINOR 87
#define LIBAVUTIL_VERSION_MICRO 100 #define LIBAVUTIL_VERSION_MICRO 100
#define LIBAVUTIL_VERSION_INT AV_VERSION_INT(LIBAVUTIL_VERSION_MAJOR, \ #define LIBAVUTIL_VERSION_INT AV_VERSION_INT(LIBAVUTIL_VERSION_MAJOR, \