const char* format,
...); // Forward.
-#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
+//#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
static void warn_time(const char* desc,
errx(1, "select() did not return writable fd = %d", fd);
}
-static int xwrite(const int fd, struct buf* const buf) {
- ssize_t write_ret;
- int saved_errno;
- struct timespec t0;
- struct timespec t1;
+// Returns zero on EOF, dies on error, retries on short write.
+static int write_buf(const int fd, struct buf* const buf) {
+ char* data = buf->data;
+ size_t left = (size_t)buf->len;
for (;;) {
+ struct timespec t0;
+ struct timespec t1;
get_mono_time(&t0);
- write_ret = write(fd, buf->data, (size_t)buf->len);
- saved_errno = errno;
+ ssize_t write_ret = write(fd, data, left);
+ int saved_errno = errno;
get_mono_time(&t1);
warn_time("write()", &t0, &t1);
wait_until_writable(fd);
continue;
}
- err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+ err(1, "write(fd = %d, count = %d) failed", fd, (int)left);
}
- if (write_ret == 0)
+ if (write_ret == 0) {
return 0; // EOF
+ }
assert(write_ret >= 0);
- if (write_ret < buf->len)
- err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
- fd, buf->len, (int)write_ret);
- // FIXME: handle this
- assert(write_ret == buf->len);
- return (int)write_ret;
+ if (write_ret < (ssize_t)left) {
+ nblogx("short write(fd = %d, count = %d): wrote %d",
+ fd, (int)left, (int)write_ret);
+ left -= write_ret;
+ data += write_ret;
+ continue;
+ }
+ assert(write_ret == (ssize_t)left);
+ return 1;
}
}
}
// Write.
- int write_ret = xwrite(my->fd, buf);
- if (write_ret == 0) {
+ if (write_buf(my->fd, buf) == 0) {
errx(1, "fd %d hit EOF", my->fd);
}
- assert(write_ret == buf->len);
// Unreference buffer, freeing it if we have to.
lock(my->mutex);
unref_buf(buf);
buf = NULL;
}
- nblogx("thread exiting cleanly");
+ if (arg != logger) {
+ nblogx("thread exiting cleanly");
+ }
return NULL;
}
const char* func,
const char* format,
...) {
- int saved_errno;
- if (want_errno)
- saved_errno = errno;
-
+ int saved_errno = errno;
va_list va;
va_start(va, format);
// Prefix.
char buf[512];
- size_t len;
+ int len;
extern char *__progname; // This is where glibc stashes argv[0].
len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
__progname, line, func, gettid(),
while (!STAILQ_EMPTY(&writers)) {
struct writer_thread* writer = STAILQ_FIRST(&writers);
STAILQ_REMOVE_HEAD(&writers, entries);
- // FIXME: free its queue?
+ if (!STAILQ_EMPTY(&writer->queue)) {
+ nblogx("queue for fd %d is not empty", writer->fd);
+ }
free(writer);
}
xpthread_cond_broadcast(logger->cond);
unlock(logger->mutex);
xpthread_join(logger->thread);
+ pthread_mutex_destroy(logger->mutex);
+ free(logger->mutex);
+ pthread_cond_destroy(logger->cond);
+ free(logger->cond);
free(logger);
warnx("exiting cleanly");