// *** (end globals) ***********************************************************
-static void signal_handler(int _ignored_ __attribute__((__unused__))) {
+static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
stopping = 1;
}
return (int)read_ret;
}
+static void wait_until_writable(const int fd) {
+ fd_set write_fds;
+ FD_ZERO(&write_fds);
+ FD_SET(fd, &write_fds);
+ int select_ret = select(fd + 1, NULL, &write_fds, NULL, NULL);
+ if (select_ret == -1) {
+ if (errno == EINTR) {
+ assert(stopping); // that should have been SIGTERM
+ return;
+ }
+ err(1, "select(write fd = %d) failed", fd);
+ }
+ if (!FD_ISSET(fd, &write_fds))
+ errx(1, "select() did not return writable fd = %d", fd);
+}
+
static int xwrite(const int fd, struct buf* const buf) {
ssize_t write_ret;
int saved_errno;
struct timespec t0;
struct timespec t1;
- get_mono_time(&t0);
- write_ret = write(fd, buf->data, (size_t)buf->len);
- saved_errno = errno;
- get_mono_time(&t1);
- warn_time("write()", &t0, &t1);
-
- errno = saved_errno;
- if (write_ret == -1)
- err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
- //FIXME: EAGAIN?
- if (write_ret == 0)
- return 0;
- assert(write_ret >= 0);
- if (write_ret < buf->len)
- err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
- fd, buf->len, (int)write_ret);
- // FIXME: handle this
- assert(write_ret == buf->len);
- return (int)write_ret;
+ for (;;) {
+ get_mono_time(&t0);
+ write_ret = write(fd, buf->data, (size_t)buf->len);
+ saved_errno = errno;
+ get_mono_time(&t1);
+ warn_time("write()", &t0, &t1);
+
+ errno = saved_errno;
+ if (write_ret == -1) {
+ if (errno == EAGAIN) {
+ warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
+ wait_until_writable(fd);
+ continue;
+ }
+ err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+ }
+ if (write_ret == 0)
+ return 0; // EOF
+ assert(write_ret >= 0);
+ if (write_ret < buf->len)
+ err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
+ fd, buf->len, (int)write_ret);
+ // FIXME: handle this
+ assert(write_ret == buf->len);
+ return (int)write_ret;
+ }
}
-static int max(const int a, const int b) { return (a > b) ? a : b; }
-
static void wait_until_readable(const int fd) {
- int select_ret;
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(fd, &read_fds);
- if ((select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL)) == -1)
- err(1, "select() failed");
+ int select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL);
+ if (select_ret == -1) {
+ if (errno == EINTR) {
+ assert(stopping); // that should have been SIGTERM
+ return;
+ }
+ err(1, "select(read fd = %d) failed", fd);
+ }
if (!FD_ISSET(fd, &read_fds))
errx(1, "select() did not return readable fd = %d", fd);
}
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
+ struct buf* buf = NULL;
+ lock(&shared_queue_mutex);
for (;;) {
- // FIXME: less locking
- struct buf* buf = NULL;
-
- lock(&shared_queue_mutex);
while (!stopping && STAILQ_EMPTY(&my->queue)) {
// Sleep.
pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
// Unreference buffer, freeing it if we have to.
lock(&shared_queue_mutex);
unref_buf(buf);
- unlock(&shared_queue_mutex);
}
warnx("thread exiting cleanly");
return NULL;
err(1, "pthread_join(%lu) failed", thread);
}
+static void sig_continue(int _ignored_ __attribute__((__unused__))) {
+ set_nonblocking(STDERR_FILENO);
+}
+
int main(int argc, char **argv) {
struct writer_thread_list writers;
STAILQ_INIT(&writers);
- if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed");
- if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed");
+ if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+ if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+ if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
//if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
- set_nonblocking(STDERR_FILENO);
+ sig_continue(0);
// On Linux, making STDOUT non-blocking has the side-effect of
// also making STDIN nonblocking.