Drain writers after SIGTERM.
[buftee] / buftee.c
1 /*-
2 * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 *
16 * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
17 */
18
19 // buftee: like tee(1) but buffers in memory.
20 //
21 // Read from stdin into memory all the time, so that the writer doesn't
22 // block. Our memory usage is unbounded.
23 //
24 // Write to a number of file descriptors whenever we're able to.
25 // Because write() can take a long time even with O_NONBLOCK set on the
26 // fd, we have to do this from a separate kernel scheduling entity (we use
27 // pthreads)
28 //
29 // We use a thread per output filedescriptor so that one slow output (e.g. disk)
30 // doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
31 // flow control on the terminal!)
32
33 #define _GNU_SOURCE // for clock_gettime()
34
35 #include <sys/queue.h>
36 #include <sys/select.h>
37 #include <sys/syscall.h>
38
39 #include <assert.h>
40 #include <err.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <pthread.h>
44 #include <signal.h>
45 #include <stdarg.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <string.h>
49 #include <time.h>
50 #include <unistd.h>
51
52 #define READ_BUF_SIZE 4096
53 #define SLOW_NSEC 4000
54
55 // Asserted on receipt of SIGTERM, SIGINT.
56 static volatile int stopping = 0; // Global.
57
58 static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
59 stopping = 1;
60 }
61
62 // Reference-counted buffer, contains data that was read in the main thread.
63 struct buf {
64 char* data;
65 int len;
66 int refcount;
67 };
68
69 // Queue of buffers.
70 struct buf_queue_elem {
71 struct buf* buf;
72 STAILQ_ENTRY(buf_queue_elem) entries;
73 };
74 STAILQ_HEAD(buf_queue, buf_queue_elem); // struct buf_queue
75
76 // Context for a writer thread.
77 struct writer_thread {
78 pthread_t thread;
79 int fd;
80
81 // Each writer has its own queue.
82 struct buf_queue queue;
83
84 // Pointer to a shared mutex which protects all queues.
85 pthread_mutex_t* mutex;
86
87 // When a writer runs out of work to do, it sleeps on a shared condition.
88 pthread_cond_t* cond;
89
90 // The writer thread exits when it's not running and its queue is empty.
91 volatile int running;
92
93 STAILQ_ENTRY(writer_thread) entries;
94 };
95
96 // A list of writer threads.
97 STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list
98
99 static struct writer_thread* logger = NULL; // Global.
100 static struct timespec logger_start; // Global.
101
102 static struct buf* alloc_buf(const char* const data, const int len) {
103 assert(len > 0);
104 struct buf* buf = malloc(sizeof(*buf));
105 buf->data = malloc((size_t)len);
106 memcpy(buf->data, data, (size_t)len);
107 buf->len = len;
108 buf->refcount = 0;
109 return buf;
110 }
111
112 static void unref_buf(struct buf* buf) {
113 assert(buf->refcount > 0);
114 if ((--buf->refcount) == 0) {
115 free(buf->data);
116 free(buf);
117 }
118 }
119
120 static void enqueue(struct buf_queue* restrict queue,
121 struct buf* restrict buf) {
122 struct buf_queue_elem* elem = malloc(sizeof(*elem));
123 elem->buf = buf;
124 buf->refcount++;
125 STAILQ_INSERT_TAIL(queue, elem, entries);
126 }
127
128 static struct buf* dequeue(struct buf_queue* const queue) {
129 assert(!STAILQ_EMPTY(queue));
130 struct buf_queue_elem* head;
131 head = STAILQ_FIRST(queue);
132 STAILQ_REMOVE_HEAD(queue, entries);
133 struct buf* buf = head->buf;
134 free(head);
135 return buf;
136 }
137
138 static void xpthread_create(pthread_t* thread,
139 void* (*start_routine)(void*),
140 void* arg) {
141 int ret = pthread_create(thread, NULL, start_routine, arg);
142 if (ret == 0) return;
143 errno = ret;
144 err(1, "pthread_create(%p) failed", thread);
145 }
146
147 static void set_nonblocking(const int fd) {
148 int flags;
149 if ((flags = fcntl(fd, F_GETFL)) == -1)
150 err(1, "fcntl(fd = %d, F_GETFL) failed", fd);
151 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
152 err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd);
153 }
154
155 static int make_file(const char* filename) {
156 int fd = open(filename, O_CREAT | O_EXCL | O_NONBLOCK | O_WRONLY, 0666);
157 if (fd == -1)
158 err(1, "failed to open(\"%s\")", filename);
159 return fd;
160 }
161
162 static void get_mono_time(struct timespec* t) {
163 if (clock_gettime(CLOCK_MONOTONIC, t) == -1)
164 err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
165 }
166
167 static void time_diff(const struct timespec* restrict start,
168 const struct timespec* restrict end,
169 struct timespec* restrict out) {
170 out->tv_sec = end->tv_sec - start->tv_sec;
171 out->tv_nsec = end->tv_nsec - start->tv_nsec;
172 if (out->tv_nsec < 0) {
173 out->tv_sec -= 1;
174 out->tv_nsec += 1000000000;
175 }
176 assert(out->tv_sec >= 0);
177 assert(out->tv_nsec >= 0);
178 assert(out->tv_nsec < 1000000000);
179 }
180
181 static int gettid(void) {
182 return (int)syscall(SYS_gettid);
183 }
184
185 static void _nblog_helper(int want_errno,
186 int line,
187 const char* func,
188 const char* format,
189 ...); // Forward.
190
191 #define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
192 #define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
193
194 static void warn_time(const char* desc,
195 const struct timespec* restrict start,
196 const struct timespec* restrict end) {
197 if (pthread_self() == logger->thread) {
198 return;
199 }
200 struct timespec diff;
201 time_diff(start, end, &diff);
202 if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
203 nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec);
204 }
205 }
206
207 static int xread(const int fd, char* const restrict buf, const int count) {
208 ssize_t read_ret;
209 int saved_errno;
210 struct timespec t0;
211 struct timespec t1;
212
213 get_mono_time(&t0);
214 read_ret = read(fd, buf, (size_t)count);
215 saved_errno = errno;
216 get_mono_time(&t1);
217 warn_time("read()", &t0, &t1);
218
219 errno = saved_errno;
220 if (read_ret == -1)
221 err(1, "read(fd = %d, count = %d) failed", fd, count);
222 //FIXME: EAGAIN?
223
224 assert(read_ret >= 0);
225 return (int)read_ret;
226 }
227
228 static void wait_until_writable(const int fd) {
229 fd_set write_fds;
230 FD_ZERO(&write_fds);
231 FD_SET(fd, &write_fds);
232 int select_ret = select(fd + 1, NULL, &write_fds, NULL, NULL);
233 if (select_ret == -1) {
234 if (errno == EINTR) {
235 assert(stopping); // that should have been SIGTERM
236 return;
237 }
238 err(1, "select(write fd = %d) failed", fd);
239 }
240 if (!FD_ISSET(fd, &write_fds))
241 errx(1, "select() did not return writable fd = %d", fd);
242 }
243
244 static int xwrite(const int fd, struct buf* const buf) {
245 ssize_t write_ret;
246 int saved_errno;
247 struct timespec t0;
248 struct timespec t1;
249
250 for (;;) {
251 get_mono_time(&t0);
252 write_ret = write(fd, buf->data, (size_t)buf->len);
253 saved_errno = errno;
254 get_mono_time(&t1);
255 warn_time("write()", &t0, &t1);
256
257 errno = saved_errno;
258 if (write_ret == -1) {
259 if (errno == EAGAIN) {
260 nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
261 wait_until_writable(fd);
262 continue;
263 }
264 err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
265 }
266 if (write_ret == 0)
267 return 0; // EOF
268 assert(write_ret >= 0);
269 if (write_ret < buf->len)
270 err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
271 fd, buf->len, (int)write_ret);
272 // FIXME: handle this
273 assert(write_ret == buf->len);
274 return (int)write_ret;
275 }
276 }
277
278 static void wait_until_readable(const int fd) {
279 fd_set read_fds;
280 FD_ZERO(&read_fds);
281 FD_SET(fd, &read_fds);
282 int select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL);
283 if (select_ret == -1) {
284 if (errno == EINTR) {
285 assert(stopping); // that should have been SIGTERM
286 return;
287 }
288 err(1, "select(read fd = %d) failed", fd);
289 }
290 if (!FD_ISSET(fd, &read_fds))
291 errx(1, "select() did not return readable fd = %d", fd);
292 }
293
294 static void lock(pthread_mutex_t* mutex) {
295 int ret = pthread_mutex_lock(mutex);
296 if (ret == 0) return;
297 errno = ret;
298 err(1, "pthread_mutex_lock(%p) failed", mutex);
299 }
300
301 static void unlock(pthread_mutex_t* mutex) {
302 int ret = pthread_mutex_unlock(mutex);
303 if (ret == 0) return;
304 errno = ret;
305 err(1, "pthread_mutex_unlock(%p) failed", mutex);
306 }
307
308 static void* writer_routine(void *arg) {
309 struct writer_thread* my = arg;
310 struct buf* buf = NULL;
311
312 nblogx("writer thread for fd %d starting", my->fd);
313 lock(my->mutex);
314 for (;;) {
315 while (my->running && STAILQ_EMPTY(&my->queue)) {
316 // Sleep.
317 pthread_cond_wait(my->cond, my->mutex);
318 }
319 if (!STAILQ_EMPTY(&my->queue)) {
320 buf = dequeue(&my->queue);
321 }
322 unlock(my->mutex);
323
324 if (buf == NULL) {
325 assert(!my->running);
326 break;
327 }
328
329 // Write.
330 int write_ret = xwrite(my->fd, buf);
331 if (write_ret == 0) {
332 errx(1, "fd %d hit EOF", my->fd);
333 }
334 assert(write_ret == buf->len);
335
336 // Unreference buffer, freeing it if we have to.
337 lock(my->mutex);
338 unref_buf(buf);
339 buf = NULL;
340 }
341 nblogx("thread exiting cleanly");
342 return NULL;
343 }
344
345 static void add_writer_thread(struct writer_thread_list* list,
346 int fd,
347 pthread_mutex_t* shared_queue_mutex,
348 pthread_cond_t* shared_wakeup_cond) {
349 struct writer_thread* writer = malloc(sizeof(*writer));
350 set_nonblocking(fd);
351 writer->fd = fd;
352 writer->mutex = shared_queue_mutex;
353 writer->cond = shared_wakeup_cond;
354 writer->running = 1;
355 STAILQ_INIT(&writer->queue);
356 STAILQ_INSERT_TAIL(list, writer, entries);
357 xpthread_create(&writer->thread, writer_routine, writer);
358 }
359
360 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
361 int ret = pthread_cond_broadcast(cond);
362 if (ret == 0) return;
363 errno = ret;
364 err(1, "pthread_cond_broadcast(%p) failed", cond);
365 }
366
367 static void init_logger_thread(void) {
368 assert(logger == NULL);
369 get_mono_time(&logger_start);
370 logger = malloc(sizeof(*logger));
371 logger->fd = STDERR_FILENO;
372 logger->mutex = malloc(sizeof(*logger->mutex));
373 pthread_mutex_init(logger->mutex, NULL);
374 logger->cond = malloc(sizeof(*logger->cond));
375 pthread_cond_init(logger->cond, NULL);
376 logger->running = 1;
377 STAILQ_INIT(&logger->queue);
378 xpthread_create(&logger->thread, writer_routine, logger);
379 }
380
381 static void _nblog_helper(int want_errno,
382 int line,
383 const char* func,
384 const char* format,
385 ...) {
386 int saved_errno;
387 if (want_errno)
388 saved_errno = errno;
389
390 va_list va;
391 va_start(va, format);
392
393 // Timing.
394 struct timespec now;
395 struct timespec diff;
396 get_mono_time(&now);
397 time_diff(&logger_start, &now, &diff);
398
399 // Prefix.
400 char buf[512];
401 size_t len;
402 extern char *__progname; // This is where glibc stashes argv[0].
403 len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
404 __progname, line, func, gettid(),
405 (int)diff.tv_sec, (int)diff.tv_nsec);
406
407 // Format message.
408 len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
409
410 if (want_errno) {
411 len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)",
412 strerror(saved_errno), saved_errno);
413 }
414
415 len += snprintf(buf + len, sizeof(buf) - len, "\n");
416 struct buf* b = alloc_buf(buf, len);
417
418 // Enqueue.
419 lock(logger->mutex);
420 enqueue(&logger->queue, b);
421 xpthread_cond_broadcast(logger->cond);
422 unlock(logger->mutex);
423 }
424
425 static void xpthread_join(pthread_t thread) {
426 int ret = pthread_join(thread, NULL);
427 if (ret == 0) return;
428 errno = ret;
429 err(1, "pthread_join(%lu) failed", thread);
430 }
431
432 static void sig_continue(int _ignored_ __attribute__((__unused__))) {
433 set_nonblocking(STDERR_FILENO);
434 }
435
436 int main(int argc, char **argv) {
437 init_logger_thread();
438 nblogx("starting");
439
440 pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
441 pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
442
443 struct writer_thread_list writers;
444 STAILQ_INIT(&writers);
445
446 if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
447 if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
448 if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
449 //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
450 sig_continue(0);
451
452 // On Linux, making STDOUT non-blocking has the side-effect of
453 // also making STDIN nonblocking.
454 add_writer_thread(&writers,
455 STDOUT_FILENO,
456 &shared_queue_mutex,
457 &shared_wakeup_cond);
458
459 // Process cmdline args.
460 for (int i = 1; i < argc; i++) {
461 add_writer_thread(&writers,
462 make_file(argv[i]),
463 &shared_queue_mutex,
464 &shared_wakeup_cond);
465 }
466
467 // Reader loop.
468 while (!stopping) {
469 wait_until_readable(STDIN_FILENO);
470 if (stopping) {
471 nblogx("stopping after select()");
472 break;
473 }
474
475 // Read.
476 char data[READ_BUF_SIZE];
477 int read_ret = xread(STDIN_FILENO, data, sizeof(data));
478 if (read_ret == 0) {
479 nblogx("stdin hit EOF");
480 break;
481 }
482 struct buf* buf = alloc_buf(data, read_ret);
483
484 // Enqueue.
485 lock(&shared_queue_mutex);
486 struct writer_thread* writer;
487 STAILQ_FOREACH(writer, &writers, entries) {
488 enqueue(&(writer->queue), buf);
489 }
490 xpthread_cond_broadcast(&shared_wakeup_cond);
491 unlock(&shared_queue_mutex);
492 }
493
494 nblogx("stopping: draining writer threads");
495 lock(&shared_queue_mutex);
496 {
497 struct writer_thread* writer;
498 STAILQ_FOREACH(writer, &writers, entries) {
499 writer->running = 0;
500 }
501 }
502 xpthread_cond_broadcast(&shared_wakeup_cond);
503 unlock(&shared_queue_mutex);
504 {
505 struct writer_thread* writer;
506 STAILQ_FOREACH(writer, &writers, entries) {
507 xpthread_join(writer->thread);
508 }
509 }
510
511 // Free writer list.
512 while (!STAILQ_EMPTY(&writers)) {
513 struct writer_thread* writer = STAILQ_FIRST(&writers);
514 STAILQ_REMOVE_HEAD(&writers, entries);
515 // FIXME: free its queue?
516 free(writer);
517 }
518
519 // Clean up logger thread.
520 nblogx("writer threads stopped, stopping logger");
521 lock(logger->mutex);
522 logger->running = 0;
523 xpthread_cond_broadcast(logger->cond);
524 unlock(logger->mutex);
525 xpthread_join(logger->thread);
526 free(logger);
527
528 warnx("exiting cleanly");
529 return 0;
530 }
531 // vim:set ts=2 sw=2 tw=80 et: