From 455e9225d8e7cdc70d521ca7aa37e76bfe082f2f Mon Sep 17 00:00:00 2001 From: Emil Mikulic Date: Mon, 25 Mar 2013 04:32:28 +1100 Subject: [PATCH] Drain writers after SIGTERM. --- buftee.c | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/buftee.c b/buftee.c index b69b318..7ab731b 100644 --- a/buftee.c +++ b/buftee.c @@ -87,6 +87,9 @@ struct writer_thread { // When a writer runs out of work to do, it sleeps on a shared condition. pthread_cond_t* cond; + // The writer thread exits when it's not running and its queue is empty. + volatile int running; + STAILQ_ENTRY(writer_thread) entries; }; @@ -305,18 +308,23 @@ static void unlock(pthread_mutex_t* mutex) { static void* writer_routine(void *arg) { struct writer_thread* my = arg; struct buf* buf = NULL; + + nblogx("writer thread for fd %d starting", my->fd); lock(my->mutex); for (;;) { - while (!stopping && STAILQ_EMPTY(&my->queue)) { + while (my->running && STAILQ_EMPTY(&my->queue)) { // Sleep. pthread_cond_wait(my->cond, my->mutex); } - if (!STAILQ_EMPTY(&my->queue)) + if (!STAILQ_EMPTY(&my->queue)) { buf = dequeue(&my->queue); + } unlock(my->mutex); - if (stopping) break; - assert(buf != NULL); + if (buf == NULL) { + assert(!my->running); + break; + } // Write. int write_ret = xwrite(my->fd, buf); @@ -328,6 +336,7 @@ static void* writer_routine(void *arg) { // Unreference buffer, freeing it if we have to. lock(my->mutex); unref_buf(buf); + buf = NULL; } nblogx("thread exiting cleanly"); return NULL; @@ -342,6 +351,7 @@ static void add_writer_thread(struct writer_thread_list* list, writer->fd = fd; writer->mutex = shared_queue_mutex; writer->cond = shared_wakeup_cond; + writer->running = 1; STAILQ_INIT(&writer->queue); STAILQ_INSERT_TAIL(list, writer, entries); xpthread_create(&writer->thread, writer_routine, writer); @@ -363,6 +373,7 @@ static void init_logger_thread(void) { pthread_mutex_init(logger->mutex, NULL); logger->cond = malloc(sizeof(*logger->cond)); pthread_cond_init(logger->cond, NULL); + logger->running = 1; STAILQ_INIT(&logger->queue); xpthread_create(&logger->thread, writer_routine, logger); } @@ -480,9 +491,14 @@ int main(int argc, char **argv) { unlock(&shared_queue_mutex); } - // Wake and join threads. + nblogx("stopping: draining writer threads"); lock(&shared_queue_mutex); - stopping = 1; + { + struct writer_thread* writer; + STAILQ_FOREACH(writer, &writers, entries) { + writer->running = 0; + } + } xpthread_cond_broadcast(&shared_wakeup_cond); unlock(&shared_queue_mutex); { @@ -501,8 +517,9 @@ int main(int argc, char **argv) { } // Clean up logger thread. - nblogx("writer threads stopped"); + nblogx("writer threads stopped, stopping logger"); lock(logger->mutex); + logger->running = 0; xpthread_cond_broadcast(logger->cond); unlock(logger->mutex); xpthread_join(logger->thread); -- 2.17.1