From: Emil Mikulic Date: Sun, 24 Mar 2013 08:26:55 +0000 (+1100) Subject: Move shared mutex and cond from global to per-writer. X-Git-Url: https://unix4lyfe.org/gitweb/buftee/commitdiff_plain/c97833ec6583e70cdeb67257667302683adc8ee3 Move shared mutex and cond from global to per-writer. --- diff --git a/buftee.c b/buftee.c index 8e430b6..1f22a6a 100644 --- a/buftee.c +++ b/buftee.c @@ -51,19 +51,9 @@ #define READ_BUF_SIZE 4096 #define SLOW_NSEC 4000 -// *** GLOBALS ***************************************************************** - -// All queues are locked through one global mutex. -static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER; - -// When a writer runs out of work to do, it sleeps on this global cond. -static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER; - // Asserted on receipt of SIGTERM, SIGINT. static volatile int stopping = 0; -// *** (end globals) *********************************************************** - static void sig_stopping(int _ignored_ __attribute__((__unused__))) { stopping = 1; } @@ -90,6 +80,12 @@ struct writer_thread { // Each writer has its own queue. struct buf_queue queue; + // Pointer to a shared mutex which protects all queues. + pthread_mutex_t* mutex; + + // When a writer runs out of work to do, it sleeps on a shared condition. + pthread_cond_t* cond; + STAILQ_ENTRY(writer_thread) entries; }; @@ -298,15 +294,15 @@ static void unlock(pthread_mutex_t* mutex) { static void* writer_routine(void *arg) { struct writer_thread* my = arg; struct buf* buf = NULL; - lock(&shared_queue_mutex); + lock(my->mutex); for (;;) { while (!stopping && STAILQ_EMPTY(&my->queue)) { // Sleep. - pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex); + pthread_cond_wait(my->cond, my->mutex); } if (!STAILQ_EMPTY(&my->queue)) buf = dequeue(&my->queue); - unlock(&shared_queue_mutex); + unlock(my->mutex); if (stopping) break; assert(buf != NULL); @@ -319,20 +315,25 @@ static void* writer_routine(void *arg) { assert(write_ret == buf->len); // Unreference buffer, freeing it if we have to. - lock(&shared_queue_mutex); + lock(my->mutex); unref_buf(buf); } warnx("thread exiting cleanly"); return NULL; } -static void add_writer_thread(struct writer_thread_list* list, const int fd) { - set_nonblocking(fd); +static void add_writer_thread(struct writer_thread_list* list, + int fd, + pthread_mutex_t* shared_queue_mutex, + pthread_cond_t* shared_wakeup_cond) { struct writer_thread* writer = malloc(sizeof(*writer)); + set_nonblocking(fd); writer->fd = fd; - STAILQ_INIT(&(writer->queue)); + writer->mutex = shared_queue_mutex; + writer->cond = shared_wakeup_cond; + STAILQ_INIT(&writer->queue); STAILQ_INSERT_TAIL(list, writer, entries); - xpthread_create(&(writer->thread), writer_routine, writer); + xpthread_create(&writer->thread, writer_routine, writer); } static void xpthread_cond_broadcast(pthread_cond_t* cond) { @@ -354,6 +355,9 @@ static void sig_continue(int _ignored_ __attribute__((__unused__))) { } int main(int argc, char **argv) { + pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER; + struct writer_thread_list writers; STAILQ_INIT(&writers); @@ -365,11 +369,17 @@ int main(int argc, char **argv) { // On Linux, making STDOUT non-blocking has the side-effect of // also making STDIN nonblocking. - add_writer_thread(&writers, STDOUT_FILENO); + add_writer_thread(&writers, + STDOUT_FILENO, + &shared_queue_mutex, + &shared_wakeup_cond); // Process cmdline args. for (int i = 1; i < argc; i++) { - add_writer_thread(&writers, make_file(argv[i])); + add_writer_thread(&writers, + make_file(argv[i]), + &shared_queue_mutex, + &shared_wakeup_cond); } // Reader loop. @@ -392,19 +402,23 @@ int main(int argc, char **argv) { // Enqueue. lock(&shared_queue_mutex); struct writer_thread* writer; - STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf); + STAILQ_FOREACH(writer, &writers, entries) { + enqueue(&(writer->queue), buf); + } xpthread_cond_broadcast(&shared_wakeup_cond); unlock(&shared_queue_mutex); } // Wake and join threads. - stopping = 1; lock(&shared_queue_mutex); + stopping = 1; xpthread_cond_broadcast(&shared_wakeup_cond); unlock(&shared_queue_mutex); { struct writer_thread* writer; - STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread); + STAILQ_FOREACH(writer, &writers, entries) { + xpthread_join(writer->thread); + } } // Free writer list.