// When a writer runs out of work to do, it sleeps on a shared condition.
pthread_cond_t* cond;
+ // The writer thread exits when it's not running and its queue is empty.
+ volatile int running;
+
STAILQ_ENTRY(writer_thread) entries;
};
const char* format,
...); // Forward.
-#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
+//#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
static void warn_time(const char* desc,
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
+
+ nblogx("writer thread for fd %d starting", my->fd);
lock(my->mutex);
for (;;) {
- while (!stopping && STAILQ_EMPTY(&my->queue)) {
+ while (my->running && STAILQ_EMPTY(&my->queue)) {
// Sleep.
pthread_cond_wait(my->cond, my->mutex);
}
- if (!STAILQ_EMPTY(&my->queue))
+ if (!STAILQ_EMPTY(&my->queue)) {
buf = dequeue(&my->queue);
+ }
unlock(my->mutex);
- if (stopping) break;
- assert(buf != NULL);
+ if (buf == NULL) {
+ assert(!my->running);
+ break;
+ }
// Write.
int write_ret = xwrite(my->fd, buf);
// Unreference buffer, freeing it if we have to.
lock(my->mutex);
unref_buf(buf);
+ buf = NULL;
}
nblogx("thread exiting cleanly");
return NULL;
writer->fd = fd;
writer->mutex = shared_queue_mutex;
writer->cond = shared_wakeup_cond;
+ writer->running = 1;
STAILQ_INIT(&writer->queue);
STAILQ_INSERT_TAIL(list, writer, entries);
xpthread_create(&writer->thread, writer_routine, writer);
pthread_mutex_init(logger->mutex, NULL);
logger->cond = malloc(sizeof(*logger->cond));
pthread_cond_init(logger->cond, NULL);
+ logger->running = 1;
STAILQ_INIT(&logger->queue);
xpthread_create(&logger->thread, writer_routine, logger);
}
const char* func,
const char* format,
...) {
- int saved_errno;
- if (want_errno)
- saved_errno = errno;
-
+ int saved_errno = errno;
va_list va;
va_start(va, format);
// Prefix.
char buf[512];
- size_t len;
+ int len;
extern char *__progname; // This is where glibc stashes argv[0].
len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
__progname, line, func, gettid(),
unlock(&shared_queue_mutex);
}
- // Wake and join threads.
+ nblogx("stopping: draining writer threads");
lock(&shared_queue_mutex);
- stopping = 1;
+ {
+ struct writer_thread* writer;
+ STAILQ_FOREACH(writer, &writers, entries) {
+ writer->running = 0;
+ }
+ }
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
}
// Clean up logger thread.
- nblogx("writer threads stopped");
+ nblogx("writer threads stopped, stopping logger");
lock(logger->mutex);
+ logger->running = 0;
xpthread_cond_broadcast(logger->cond);
unlock(logger->mutex);
xpthread_join(logger->thread);