// When a writer runs out of work to do, it sleeps on a shared condition.
pthread_cond_t* cond;
+ // The writer thread exits when it's not running and its queue is empty.
+ volatile int running;
+
STAILQ_ENTRY(writer_thread) entries;
};
return (int)syscall(SYS_gettid);
}
-static void nblogx(const char* format, ...); // Forward.
+static void _nblog_helper(int want_errno,
+ int line,
+ const char* func,
+ const char* format,
+ ...); // Forward.
+
+#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
+#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
static void warn_time(const char* desc,
const struct timespec* restrict start,
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
+
+ nblogx("writer thread for fd %d starting", my->fd);
lock(my->mutex);
for (;;) {
- while (!stopping && STAILQ_EMPTY(&my->queue)) {
+ while (my->running && STAILQ_EMPTY(&my->queue)) {
// Sleep.
pthread_cond_wait(my->cond, my->mutex);
}
- if (!STAILQ_EMPTY(&my->queue))
+ if (!STAILQ_EMPTY(&my->queue)) {
buf = dequeue(&my->queue);
+ }
unlock(my->mutex);
- if (stopping) break;
- assert(buf != NULL);
+ if (buf == NULL) {
+ assert(!my->running);
+ break;
+ }
// Write.
int write_ret = xwrite(my->fd, buf);
// Unreference buffer, freeing it if we have to.
lock(my->mutex);
unref_buf(buf);
+ buf = NULL;
}
nblogx("thread exiting cleanly");
return NULL;
writer->fd = fd;
writer->mutex = shared_queue_mutex;
writer->cond = shared_wakeup_cond;
+ writer->running = 1;
STAILQ_INIT(&writer->queue);
STAILQ_INSERT_TAIL(list, writer, entries);
xpthread_create(&writer->thread, writer_routine, writer);
pthread_mutex_init(logger->mutex, NULL);
logger->cond = malloc(sizeof(*logger->cond));
pthread_cond_init(logger->cond, NULL);
+ logger->running = 1;
STAILQ_INIT(&logger->queue);
xpthread_create(&logger->thread, writer_routine, logger);
}
static void _nblog_helper(int want_errno,
- int saved_errno,
+ int line,
+ const char* func,
const char* format,
- va_list va) {
+ ...) {
+ int saved_errno;
+ if (want_errno)
+ saved_errno = errno;
+
+ va_list va;
+ va_start(va, format);
+
// Timing.
struct timespec now;
struct timespec diff;
char buf[512];
size_t len;
extern char *__progname; // This is where glibc stashes argv[0].
- len = snprintf(buf, sizeof(buf), "%s: tid %d at %d.%09d: ",
- __progname, gettid(), (int)diff.tv_sec, (int)diff.tv_nsec);
+ len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
+ __progname, line, func, gettid(),
+ (int)diff.tv_sec, (int)diff.tv_nsec);
// Format message.
len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
unlock(logger->mutex);
}
-// nblog() is like warn() but non-blocking.
-static void nblog(const char* format, ...) {
- int saved_errno = errno;
- va_list va;
- va_start(va, format);
- _nblog_helper(1, saved_errno, format, va);
- va_end(va);
-}
-
-static void nblogx(const char* format, ...) {
- va_list va;
- va_start(va, format);
- _nblog_helper(0, 0, format, va);
- va_end(va);
-}
-
static void xpthread_join(pthread_t thread) {
int ret = pthread_join(thread, NULL);
if (ret == 0) return;
unlock(&shared_queue_mutex);
}
- // Wake and join threads.
+ nblogx("stopping: draining writer threads");
lock(&shared_queue_mutex);
- stopping = 1;
+ {
+ struct writer_thread* writer;
+ STAILQ_FOREACH(writer, &writers, entries) {
+ writer->running = 0;
+ }
+ }
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
}
// Clean up logger thread.
- nblogx("writer threads stopped");
+ nblogx("writer threads stopped, stopping logger");
lock(logger->mutex);
+ logger->running = 0;
xpthread_cond_broadcast(logger->cond);
unlock(logger->mutex);
xpthread_join(logger->thread);