Move shared mutex and cond from global to per-writer.
authorEmil Mikulic <emikulic@gmail.com>
Sun, 24 Mar 2013 08:26:55 +0000 (19:26 +1100)
committerEmil Mikulic <emikulic@gmail.com>
Sun, 24 Mar 2013 15:49:17 +0000 (02:49 +1100)
buftee.c

index 8e430b6..1f22a6a 100644 (file)
--- a/buftee.c
+++ b/buftee.c
 #define READ_BUF_SIZE 4096
 #define SLOW_NSEC 4000
 
 #define READ_BUF_SIZE 4096
 #define SLOW_NSEC 4000
 
-// *** GLOBALS *****************************************************************
-
-// All queues are locked through one global mutex.
-static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// When a writer runs out of work to do, it sleeps on this global cond.
-static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
-
 // Asserted on receipt of SIGTERM, SIGINT.
 static volatile int stopping = 0;
 
 // Asserted on receipt of SIGTERM, SIGINT.
 static volatile int stopping = 0;
 
-// *** (end globals) ***********************************************************
-
 static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
   stopping = 1;
 }
 static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
   stopping = 1;
 }
@@ -90,6 +80,12 @@ struct writer_thread {
   // Each writer has its own queue.
   struct buf_queue queue;
 
   // Each writer has its own queue.
   struct buf_queue queue;
 
+  // Pointer to a shared mutex which protects all queues.
+  pthread_mutex_t* mutex;
+
+  // When a writer runs out of work to do, it sleeps on a shared condition.
+  pthread_cond_t* cond;
+
   STAILQ_ENTRY(writer_thread) entries;
 };
 
   STAILQ_ENTRY(writer_thread) entries;
 };
 
@@ -298,15 +294,15 @@ static void unlock(pthread_mutex_t* mutex) {
 static void* writer_routine(void *arg) {
   struct writer_thread* my = arg;
   struct buf* buf = NULL;
 static void* writer_routine(void *arg) {
   struct writer_thread* my = arg;
   struct buf* buf = NULL;
-  lock(&shared_queue_mutex);
+  lock(my->mutex);
   for (;;) {
     while (!stopping && STAILQ_EMPTY(&my->queue)) {
       // Sleep.
   for (;;) {
     while (!stopping && STAILQ_EMPTY(&my->queue)) {
       // Sleep.
-      pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
+      pthread_cond_wait(my->cond, my->mutex);
     }
     if (!STAILQ_EMPTY(&my->queue))
       buf = dequeue(&my->queue);
     }
     if (!STAILQ_EMPTY(&my->queue))
       buf = dequeue(&my->queue);
-    unlock(&shared_queue_mutex);
+    unlock(my->mutex);
 
     if (stopping) break;
     assert(buf != NULL);
 
     if (stopping) break;
     assert(buf != NULL);
@@ -319,20 +315,25 @@ static void* writer_routine(void *arg) {
     assert(write_ret == buf->len);
 
     // Unreference buffer, freeing it if we have to.
     assert(write_ret == buf->len);
 
     // Unreference buffer, freeing it if we have to.
-    lock(&shared_queue_mutex);
+    lock(my->mutex);
     unref_buf(buf);
   }
   warnx("thread exiting cleanly");
   return NULL;
 }
 
     unref_buf(buf);
   }
   warnx("thread exiting cleanly");
   return NULL;
 }
 
-static void add_writer_thread(struct writer_thread_list* list, const int fd) {
-  set_nonblocking(fd);
+static void add_writer_thread(struct writer_thread_list* list,
+                              int fd,
+                              pthread_mutex_t* shared_queue_mutex,
+                              pthread_cond_t* shared_wakeup_cond) {
   struct writer_thread* writer = malloc(sizeof(*writer));
   struct writer_thread* writer = malloc(sizeof(*writer));
+  set_nonblocking(fd);
   writer->fd = fd;
   writer->fd = fd;
-  STAILQ_INIT(&(writer->queue));
+  writer->mutex = shared_queue_mutex;
+  writer->cond = shared_wakeup_cond;
+  STAILQ_INIT(&writer->queue);
   STAILQ_INSERT_TAIL(list, writer, entries);
   STAILQ_INSERT_TAIL(list, writer, entries);
-  xpthread_create(&(writer->thread), writer_routine, writer);
+  xpthread_create(&writer->thread, writer_routine, writer);
 }
 
 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
 }
 
 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
@@ -354,6 +355,9 @@ static void sig_continue(int _ignored_ __attribute__((__unused__))) {
 }
 
 int main(int argc, char **argv) {
 }
 
 int main(int argc, char **argv) {
+  pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+  pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
+
   struct writer_thread_list writers;
   STAILQ_INIT(&writers);
 
   struct writer_thread_list writers;
   STAILQ_INIT(&writers);
 
@@ -365,11 +369,17 @@ int main(int argc, char **argv) {
 
   // On Linux, making STDOUT non-blocking has the side-effect of
   // also making STDIN nonblocking.
 
   // On Linux, making STDOUT non-blocking has the side-effect of
   // also making STDIN nonblocking.
-  add_writer_thread(&writers, STDOUT_FILENO);
+  add_writer_thread(&writers,
+                    STDOUT_FILENO,
+                    &shared_queue_mutex,
+                    &shared_wakeup_cond);
 
   // Process cmdline args.
   for (int i = 1; i < argc; i++) {
 
   // Process cmdline args.
   for (int i = 1; i < argc; i++) {
-    add_writer_thread(&writers, make_file(argv[i]));
+    add_writer_thread(&writers,
+                      make_file(argv[i]),
+                      &shared_queue_mutex,
+                      &shared_wakeup_cond);
   }
 
   // Reader loop.
   }
 
   // Reader loop.
@@ -392,19 +402,23 @@ int main(int argc, char **argv) {
     // Enqueue.
     lock(&shared_queue_mutex);
     struct writer_thread* writer;
     // Enqueue.
     lock(&shared_queue_mutex);
     struct writer_thread* writer;
-    STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf);
+    STAILQ_FOREACH(writer, &writers, entries) {
+      enqueue(&(writer->queue), buf);
+    }
     xpthread_cond_broadcast(&shared_wakeup_cond);
     unlock(&shared_queue_mutex);
   }
 
   // Wake and join threads.
     xpthread_cond_broadcast(&shared_wakeup_cond);
     unlock(&shared_queue_mutex);
   }
 
   // Wake and join threads.
-  stopping = 1;
   lock(&shared_queue_mutex);
   lock(&shared_queue_mutex);
+  stopping = 1;
   xpthread_cond_broadcast(&shared_wakeup_cond);
   unlock(&shared_queue_mutex);
   {
     struct writer_thread* writer;
   xpthread_cond_broadcast(&shared_wakeup_cond);
   unlock(&shared_queue_mutex);
   {
     struct writer_thread* writer;
-    STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread);
+    STAILQ_FOREACH(writer, &writers, entries) {
+      xpthread_join(writer->thread);
+    }
   }
 
   // Free writer list.
   }
 
   // Free writer list.