#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
+#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define READ_BUF_SIZE 4096
#define SLOW_NSEC 4000
-// *** GLOBALS *****************************************************************
-
-// All queues are locked through one global mutex.
-static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// When a writer runs out of work to do, it sleeps on this global cond.
-static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
-
// Asserted on receipt of SIGTERM, SIGINT.
-static volatile int stopping = 0;
-
-// *** (end globals) ***********************************************************
+static volatile int stopping = 0; // Global.
-static void signal_handler(int _ignored_ __attribute__((__unused__))) {
+static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
stopping = 1;
}
// Each writer has its own queue.
struct buf_queue queue;
+ // Pointer to a shared mutex which protects all queues.
+ pthread_mutex_t* mutex;
+
+ // When a writer runs out of work to do, it sleeps on a shared condition.
+ pthread_cond_t* cond;
+
+ // The writer thread exits when it's not running and its queue is empty.
+ volatile int running;
+
STAILQ_ENTRY(writer_thread) entries;
};
// A list of writer threads.
STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list
+static struct writer_thread* logger = NULL; // Global.
+static struct timespec logger_start; // Global.
+
static struct buf* alloc_buf(const char* const data, const int len) {
assert(len > 0);
struct buf* buf = malloc(sizeof(*buf));
return (int)syscall(SYS_gettid);
}
+static void _nblog_helper(int want_errno,
+ int line,
+ const char* func,
+ const char* format,
+ ...); // Forward.
+
+#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
+#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
+
static void warn_time(const char* desc,
const struct timespec* restrict start,
const struct timespec* restrict end) {
+ if (pthread_self() == logger->thread) {
+ return;
+ }
struct timespec diff;
time_diff(start, end, &diff);
if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
- char buf[128];
- extern char *__progname; // This is where glibc stashes argv[0].
- snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n",
- __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec);
- // Best effort write to a non-blocking stderr.
- (void)write(STDERR_FILENO, buf, strlen(buf));
+ nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec);
}
}
return (int)read_ret;
}
+static void wait_until_writable(const int fd) {
+ fd_set write_fds;
+ FD_ZERO(&write_fds);
+ FD_SET(fd, &write_fds);
+ int select_ret = select(fd + 1, NULL, &write_fds, NULL, NULL);
+ if (select_ret == -1) {
+ if (errno == EINTR) {
+ assert(stopping); // that should have been SIGTERM
+ return;
+ }
+ err(1, "select(write fd = %d) failed", fd);
+ }
+ if (!FD_ISSET(fd, &write_fds))
+ errx(1, "select() did not return writable fd = %d", fd);
+}
+
static int xwrite(const int fd, struct buf* const buf) {
ssize_t write_ret;
int saved_errno;
struct timespec t0;
struct timespec t1;
- get_mono_time(&t0);
- write_ret = write(fd, buf->data, (size_t)buf->len);
- saved_errno = errno;
- get_mono_time(&t1);
- warn_time("write()", &t0, &t1);
-
- errno = saved_errno;
- if (write_ret == -1)
- err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
- //FIXME: EAGAIN?
- if (write_ret == 0)
- return 0;
- assert(write_ret >= 0);
- if (write_ret < buf->len)
- err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
- fd, buf->len, (int)write_ret);
- // FIXME: handle this
- assert(write_ret == buf->len);
- return (int)write_ret;
+ for (;;) {
+ get_mono_time(&t0);
+ write_ret = write(fd, buf->data, (size_t)buf->len);
+ saved_errno = errno;
+ get_mono_time(&t1);
+ warn_time("write()", &t0, &t1);
+
+ errno = saved_errno;
+ if (write_ret == -1) {
+ if (errno == EAGAIN) {
+ nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
+ wait_until_writable(fd);
+ continue;
+ }
+ err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+ }
+ if (write_ret == 0)
+ return 0; // EOF
+ assert(write_ret >= 0);
+ if (write_ret < buf->len)
+ err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
+ fd, buf->len, (int)write_ret);
+ // FIXME: handle this
+ assert(write_ret == buf->len);
+ return (int)write_ret;
+ }
}
static void wait_until_readable(const int fd) {
assert(stopping); // that should have been SIGTERM
return;
}
- err(1, "select() failed");
+ err(1, "select(read fd = %d) failed", fd);
}
if (!FD_ISSET(fd, &read_fds))
errx(1, "select() did not return readable fd = %d", fd);
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
- lock(&shared_queue_mutex);
+
+ nblogx("writer thread for fd %d starting", my->fd);
+ lock(my->mutex);
for (;;) {
- while (!stopping && STAILQ_EMPTY(&my->queue)) {
+ while (my->running && STAILQ_EMPTY(&my->queue)) {
// Sleep.
- pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
+ pthread_cond_wait(my->cond, my->mutex);
}
- if (!STAILQ_EMPTY(&my->queue))
+ if (!STAILQ_EMPTY(&my->queue)) {
buf = dequeue(&my->queue);
- unlock(&shared_queue_mutex);
+ }
+ unlock(my->mutex);
- if (stopping) break;
- assert(buf != NULL);
+ if (buf == NULL) {
+ assert(!my->running);
+ break;
+ }
// Write.
int write_ret = xwrite(my->fd, buf);
assert(write_ret == buf->len);
// Unreference buffer, freeing it if we have to.
- lock(&shared_queue_mutex);
+ lock(my->mutex);
unref_buf(buf);
+ buf = NULL;
}
- warnx("thread exiting cleanly");
+ nblogx("thread exiting cleanly");
return NULL;
}
-static void add_writer_thread(struct writer_thread_list* list, const int fd) {
- set_nonblocking(fd);
+static void add_writer_thread(struct writer_thread_list* list,
+ int fd,
+ pthread_mutex_t* shared_queue_mutex,
+ pthread_cond_t* shared_wakeup_cond) {
struct writer_thread* writer = malloc(sizeof(*writer));
+ set_nonblocking(fd);
writer->fd = fd;
- STAILQ_INIT(&(writer->queue));
+ writer->mutex = shared_queue_mutex;
+ writer->cond = shared_wakeup_cond;
+ writer->running = 1;
+ STAILQ_INIT(&writer->queue);
STAILQ_INSERT_TAIL(list, writer, entries);
- xpthread_create(&(writer->thread), writer_routine, writer);
+ xpthread_create(&writer->thread, writer_routine, writer);
}
static void xpthread_cond_broadcast(pthread_cond_t* cond) {
err(1, "pthread_cond_broadcast(%p) failed", cond);
}
+static void init_logger_thread(void) {
+ assert(logger == NULL);
+ get_mono_time(&logger_start);
+ logger = malloc(sizeof(*logger));
+ logger->fd = STDERR_FILENO;
+ logger->mutex = malloc(sizeof(*logger->mutex));
+ pthread_mutex_init(logger->mutex, NULL);
+ logger->cond = malloc(sizeof(*logger->cond));
+ pthread_cond_init(logger->cond, NULL);
+ logger->running = 1;
+ STAILQ_INIT(&logger->queue);
+ xpthread_create(&logger->thread, writer_routine, logger);
+}
+
+static void _nblog_helper(int want_errno,
+ int line,
+ const char* func,
+ const char* format,
+ ...) {
+ int saved_errno;
+ if (want_errno)
+ saved_errno = errno;
+
+ va_list va;
+ va_start(va, format);
+
+ // Timing.
+ struct timespec now;
+ struct timespec diff;
+ get_mono_time(&now);
+ time_diff(&logger_start, &now, &diff);
+
+ // Prefix.
+ char buf[512];
+ size_t len;
+ extern char *__progname; // This is where glibc stashes argv[0].
+ len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
+ __progname, line, func, gettid(),
+ (int)diff.tv_sec, (int)diff.tv_nsec);
+
+ // Format message.
+ len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
+
+ if (want_errno) {
+ len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)",
+ strerror(saved_errno), saved_errno);
+ }
+
+ len += snprintf(buf + len, sizeof(buf) - len, "\n");
+ struct buf* b = alloc_buf(buf, len);
+
+ // Enqueue.
+ lock(logger->mutex);
+ enqueue(&logger->queue, b);
+ xpthread_cond_broadcast(logger->cond);
+ unlock(logger->mutex);
+}
+
static void xpthread_join(pthread_t thread) {
int ret = pthread_join(thread, NULL);
if (ret == 0) return;
err(1, "pthread_join(%lu) failed", thread);
}
+static void sig_continue(int _ignored_ __attribute__((__unused__))) {
+ set_nonblocking(STDERR_FILENO);
+}
+
int main(int argc, char **argv) {
+ init_logger_thread();
+ nblogx("starting");
+
+ pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+ pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
+
struct writer_thread_list writers;
STAILQ_INIT(&writers);
- if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed");
- if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed");
+ if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+ if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+ if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
//if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
- set_nonblocking(STDERR_FILENO);
+ sig_continue(0);
// On Linux, making STDOUT non-blocking has the side-effect of
// also making STDIN nonblocking.
- add_writer_thread(&writers, STDOUT_FILENO);
+ add_writer_thread(&writers,
+ STDOUT_FILENO,
+ &shared_queue_mutex,
+ &shared_wakeup_cond);
// Process cmdline args.
for (int i = 1; i < argc; i++) {
- add_writer_thread(&writers, make_file(argv[i]));
+ add_writer_thread(&writers,
+ make_file(argv[i]),
+ &shared_queue_mutex,
+ &shared_wakeup_cond);
}
// Reader loop.
while (!stopping) {
wait_until_readable(STDIN_FILENO);
if (stopping) {
- warnx("stopping after select()");
+ nblogx("stopping after select()");
break;
}
char data[READ_BUF_SIZE];
int read_ret = xread(STDIN_FILENO, data, sizeof(data));
if (read_ret == 0) {
- warnx("stdin hit EOF");
+ nblogx("stdin hit EOF");
break;
}
struct buf* buf = alloc_buf(data, read_ret);
// Enqueue.
lock(&shared_queue_mutex);
struct writer_thread* writer;
- STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf);
+ STAILQ_FOREACH(writer, &writers, entries) {
+ enqueue(&(writer->queue), buf);
+ }
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
}
- // Wake and join threads.
- stopping = 1;
+ nblogx("stopping: draining writer threads");
lock(&shared_queue_mutex);
+ {
+ struct writer_thread* writer;
+ STAILQ_FOREACH(writer, &writers, entries) {
+ writer->running = 0;
+ }
+ }
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
struct writer_thread* writer;
- STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread);
+ STAILQ_FOREACH(writer, &writers, entries) {
+ xpthread_join(writer->thread);
+ }
}
// Free writer list.
free(writer);
}
+ // Clean up logger thread.
+ nblogx("writer threads stopped, stopping logger");
+ lock(logger->mutex);
+ logger->running = 0;
+ xpthread_cond_broadcast(logger->cond);
+ unlock(logger->mutex);
+ xpthread_join(logger->thread);
+ free(logger);
+
warnx("exiting cleanly");
return 0;
}