Use a separate thread for logging to stderr.
[buftee] / buftee.c
1 /*-
2 * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 *
16 * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
17 */
18
19 // buftee: like tee(1) but buffers in memory.
20 //
21 // Read from stdin into memory all the time, so that the writer doesn't
22 // block. Our memory usage is unbounded.
23 //
24 // Write to a number of file descriptors whenever we're able to.
25 // Because write() can take a long time even with O_NONBLOCK set on the
26 // fd, we have to do this from a separate kernel scheduling entity (we use
27 // pthreads)
28 //
29 // We use a thread per output filedescriptor so that one slow output (e.g. disk)
30 // doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
31 // flow control on the terminal!)
32
33 #define _GNU_SOURCE // for clock_gettime()
34
35 #include <sys/queue.h>
36 #include <sys/select.h>
37 #include <sys/syscall.h>
38
39 #include <assert.h>
40 #include <err.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <pthread.h>
44 #include <signal.h>
45 #include <stdarg.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <string.h>
49 #include <time.h>
50 #include <unistd.h>
51
52 #define READ_BUF_SIZE 4096
53 #define SLOW_NSEC 4000
54
55 // Asserted on receipt of SIGTERM, SIGINT.
56 static volatile int stopping = 0; // Global.
57
58 static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
59 stopping = 1;
60 }
61
62 // Reference-counted buffer, contains data that was read in the main thread.
63 struct buf {
64 char* data;
65 int len;
66 int refcount;
67 };
68
69 // Queue of buffers.
70 struct buf_queue_elem {
71 struct buf* buf;
72 STAILQ_ENTRY(buf_queue_elem) entries;
73 };
74 STAILQ_HEAD(buf_queue, buf_queue_elem); // struct buf_queue
75
76 // Context for a writer thread.
77 struct writer_thread {
78 pthread_t thread;
79 int fd;
80
81 // Each writer has its own queue.
82 struct buf_queue queue;
83
84 // Pointer to a shared mutex which protects all queues.
85 pthread_mutex_t* mutex;
86
87 // When a writer runs out of work to do, it sleeps on a shared condition.
88 pthread_cond_t* cond;
89
90 STAILQ_ENTRY(writer_thread) entries;
91 };
92
93 // A list of writer threads.
94 STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list
95
96 static struct writer_thread* logger = NULL; // Global.
97 static struct timespec logger_start; // Global.
98
99 static struct buf* alloc_buf(const char* const data, const int len) {
100 assert(len > 0);
101 struct buf* buf = malloc(sizeof(*buf));
102 buf->data = malloc((size_t)len);
103 memcpy(buf->data, data, (size_t)len);
104 buf->len = len;
105 buf->refcount = 0;
106 return buf;
107 }
108
109 static void unref_buf(struct buf* buf) {
110 assert(buf->refcount > 0);
111 if ((--buf->refcount) == 0) {
112 free(buf->data);
113 free(buf);
114 }
115 }
116
117 static void enqueue(struct buf_queue* restrict queue,
118 struct buf* restrict buf) {
119 struct buf_queue_elem* elem = malloc(sizeof(*elem));
120 elem->buf = buf;
121 buf->refcount++;
122 STAILQ_INSERT_TAIL(queue, elem, entries);
123 }
124
125 static struct buf* dequeue(struct buf_queue* const queue) {
126 assert(!STAILQ_EMPTY(queue));
127 struct buf_queue_elem* head;
128 head = STAILQ_FIRST(queue);
129 STAILQ_REMOVE_HEAD(queue, entries);
130 struct buf* buf = head->buf;
131 free(head);
132 return buf;
133 }
134
135 static void xpthread_create(pthread_t* thread,
136 void* (*start_routine)(void*),
137 void* arg) {
138 int ret = pthread_create(thread, NULL, start_routine, arg);
139 if (ret == 0) return;
140 errno = ret;
141 err(1, "pthread_create(%p) failed", thread);
142 }
143
144 static void set_nonblocking(const int fd) {
145 int flags;
146 if ((flags = fcntl(fd, F_GETFL)) == -1)
147 err(1, "fcntl(fd = %d, F_GETFL) failed", fd);
148 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
149 err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd);
150 }
151
152 static int make_file(const char* filename) {
153 int fd = open(filename, O_CREAT | O_EXCL | O_NONBLOCK | O_WRONLY, 0666);
154 if (fd == -1)
155 err(1, "failed to open(\"%s\")", filename);
156 return fd;
157 }
158
159 static void get_mono_time(struct timespec* t) {
160 if (clock_gettime(CLOCK_MONOTONIC, t) == -1)
161 err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
162 }
163
164 static void time_diff(const struct timespec* restrict start,
165 const struct timespec* restrict end,
166 struct timespec* restrict out) {
167 out->tv_sec = end->tv_sec - start->tv_sec;
168 out->tv_nsec = end->tv_nsec - start->tv_nsec;
169 if (out->tv_nsec < 0) {
170 out->tv_sec -= 1;
171 out->tv_nsec += 1000000000;
172 }
173 assert(out->tv_sec >= 0);
174 assert(out->tv_nsec >= 0);
175 assert(out->tv_nsec < 1000000000);
176 }
177
178 static int gettid(void) {
179 return (int)syscall(SYS_gettid);
180 }
181
182 static void nblogx(const char* format, ...); // Forward.
183
184 static void warn_time(const char* desc,
185 const struct timespec* restrict start,
186 const struct timespec* restrict end) {
187 if (pthread_self() == logger->thread) {
188 return;
189 }
190 struct timespec diff;
191 time_diff(start, end, &diff);
192 if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
193 nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec);
194 }
195 }
196
197 static int xread(const int fd, char* const restrict buf, const int count) {
198 ssize_t read_ret;
199 int saved_errno;
200 struct timespec t0;
201 struct timespec t1;
202
203 get_mono_time(&t0);
204 read_ret = read(fd, buf, (size_t)count);
205 saved_errno = errno;
206 get_mono_time(&t1);
207 warn_time("read()", &t0, &t1);
208
209 errno = saved_errno;
210 if (read_ret == -1)
211 err(1, "read(fd = %d, count = %d) failed", fd, count);
212 //FIXME: EAGAIN?
213
214 assert(read_ret >= 0);
215 return (int)read_ret;
216 }
217
218 static void wait_until_writable(const int fd) {
219 fd_set write_fds;
220 FD_ZERO(&write_fds);
221 FD_SET(fd, &write_fds);
222 int select_ret = select(fd + 1, NULL, &write_fds, NULL, NULL);
223 if (select_ret == -1) {
224 if (errno == EINTR) {
225 assert(stopping); // that should have been SIGTERM
226 return;
227 }
228 err(1, "select(write fd = %d) failed", fd);
229 }
230 if (!FD_ISSET(fd, &write_fds))
231 errx(1, "select() did not return writable fd = %d", fd);
232 }
233
234 static int xwrite(const int fd, struct buf* const buf) {
235 ssize_t write_ret;
236 int saved_errno;
237 struct timespec t0;
238 struct timespec t1;
239
240 for (;;) {
241 get_mono_time(&t0);
242 write_ret = write(fd, buf->data, (size_t)buf->len);
243 saved_errno = errno;
244 get_mono_time(&t1);
245 warn_time("write()", &t0, &t1);
246
247 errno = saved_errno;
248 if (write_ret == -1) {
249 if (errno == EAGAIN) {
250 nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
251 wait_until_writable(fd);
252 continue;
253 }
254 err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
255 }
256 if (write_ret == 0)
257 return 0; // EOF
258 assert(write_ret >= 0);
259 if (write_ret < buf->len)
260 err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
261 fd, buf->len, (int)write_ret);
262 // FIXME: handle this
263 assert(write_ret == buf->len);
264 return (int)write_ret;
265 }
266 }
267
268 static void wait_until_readable(const int fd) {
269 fd_set read_fds;
270 FD_ZERO(&read_fds);
271 FD_SET(fd, &read_fds);
272 int select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL);
273 if (select_ret == -1) {
274 if (errno == EINTR) {
275 assert(stopping); // that should have been SIGTERM
276 return;
277 }
278 err(1, "select(read fd = %d) failed", fd);
279 }
280 if (!FD_ISSET(fd, &read_fds))
281 errx(1, "select() did not return readable fd = %d", fd);
282 }
283
284 static void lock(pthread_mutex_t* mutex) {
285 int ret = pthread_mutex_lock(mutex);
286 if (ret == 0) return;
287 errno = ret;
288 err(1, "pthread_mutex_lock(%p) failed", mutex);
289 }
290
291 static void unlock(pthread_mutex_t* mutex) {
292 int ret = pthread_mutex_unlock(mutex);
293 if (ret == 0) return;
294 errno = ret;
295 err(1, "pthread_mutex_unlock(%p) failed", mutex);
296 }
297
298 static void* writer_routine(void *arg) {
299 struct writer_thread* my = arg;
300 struct buf* buf = NULL;
301 lock(my->mutex);
302 for (;;) {
303 while (!stopping && STAILQ_EMPTY(&my->queue)) {
304 // Sleep.
305 pthread_cond_wait(my->cond, my->mutex);
306 }
307 if (!STAILQ_EMPTY(&my->queue))
308 buf = dequeue(&my->queue);
309 unlock(my->mutex);
310
311 if (stopping) break;
312 assert(buf != NULL);
313
314 // Write.
315 int write_ret = xwrite(my->fd, buf);
316 if (write_ret == 0) {
317 errx(1, "fd %d hit EOF", my->fd);
318 }
319 assert(write_ret == buf->len);
320
321 // Unreference buffer, freeing it if we have to.
322 lock(my->mutex);
323 unref_buf(buf);
324 }
325 nblogx("thread exiting cleanly");
326 return NULL;
327 }
328
329 static void add_writer_thread(struct writer_thread_list* list,
330 int fd,
331 pthread_mutex_t* shared_queue_mutex,
332 pthread_cond_t* shared_wakeup_cond) {
333 struct writer_thread* writer = malloc(sizeof(*writer));
334 set_nonblocking(fd);
335 writer->fd = fd;
336 writer->mutex = shared_queue_mutex;
337 writer->cond = shared_wakeup_cond;
338 STAILQ_INIT(&writer->queue);
339 STAILQ_INSERT_TAIL(list, writer, entries);
340 xpthread_create(&writer->thread, writer_routine, writer);
341 }
342
343 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
344 int ret = pthread_cond_broadcast(cond);
345 if (ret == 0) return;
346 errno = ret;
347 err(1, "pthread_cond_broadcast(%p) failed", cond);
348 }
349
350 static void init_logger_thread(void) {
351 assert(logger == NULL);
352 get_mono_time(&logger_start);
353 logger = malloc(sizeof(*logger));
354 logger->fd = STDERR_FILENO;
355 logger->mutex = malloc(sizeof(*logger->mutex));
356 pthread_mutex_init(logger->mutex, NULL);
357 logger->cond = malloc(sizeof(*logger->cond));
358 pthread_cond_init(logger->cond, NULL);
359 STAILQ_INIT(&logger->queue);
360 xpthread_create(&logger->thread, writer_routine, logger);
361 }
362
363 static void _nblog_helper(int want_errno,
364 int saved_errno,
365 const char* format,
366 va_list va) {
367 // Timing.
368 struct timespec now;
369 struct timespec diff;
370 get_mono_time(&now);
371 time_diff(&logger_start, &now, &diff);
372
373 // Prefix.
374 char buf[512];
375 size_t len;
376 extern char *__progname; // This is where glibc stashes argv[0].
377 len = snprintf(buf, sizeof(buf), "%s: tid %d at %d.%09d: ",
378 __progname, gettid(), (int)diff.tv_sec, (int)diff.tv_nsec);
379
380 // Format message.
381 len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
382
383 if (want_errno) {
384 len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)",
385 strerror(saved_errno), saved_errno);
386 }
387
388 len += snprintf(buf + len, sizeof(buf) - len, "\n");
389 struct buf* b = alloc_buf(buf, len);
390
391 // Enqueue.
392 lock(logger->mutex);
393 enqueue(&logger->queue, b);
394 xpthread_cond_broadcast(logger->cond);
395 unlock(logger->mutex);
396 }
397
398 // nblog() is like warn() but non-blocking.
399 static void nblog(const char* format, ...) {
400 int saved_errno = errno;
401 va_list va;
402 va_start(va, format);
403 _nblog_helper(1, saved_errno, format, va);
404 va_end(va);
405 }
406
407 static void nblogx(const char* format, ...) {
408 va_list va;
409 va_start(va, format);
410 _nblog_helper(0, 0, format, va);
411 va_end(va);
412 }
413
414 static void xpthread_join(pthread_t thread) {
415 int ret = pthread_join(thread, NULL);
416 if (ret == 0) return;
417 errno = ret;
418 err(1, "pthread_join(%lu) failed", thread);
419 }
420
421 static void sig_continue(int _ignored_ __attribute__((__unused__))) {
422 set_nonblocking(STDERR_FILENO);
423 }
424
425 int main(int argc, char **argv) {
426 init_logger_thread();
427 nblogx("starting");
428
429 pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
430 pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
431
432 struct writer_thread_list writers;
433 STAILQ_INIT(&writers);
434
435 if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
436 if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
437 if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
438 //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
439 sig_continue(0);
440
441 // On Linux, making STDOUT non-blocking has the side-effect of
442 // also making STDIN nonblocking.
443 add_writer_thread(&writers,
444 STDOUT_FILENO,
445 &shared_queue_mutex,
446 &shared_wakeup_cond);
447
448 // Process cmdline args.
449 for (int i = 1; i < argc; i++) {
450 add_writer_thread(&writers,
451 make_file(argv[i]),
452 &shared_queue_mutex,
453 &shared_wakeup_cond);
454 }
455
456 // Reader loop.
457 while (!stopping) {
458 wait_until_readable(STDIN_FILENO);
459 if (stopping) {
460 nblogx("stopping after select()");
461 break;
462 }
463
464 // Read.
465 char data[READ_BUF_SIZE];
466 int read_ret = xread(STDIN_FILENO, data, sizeof(data));
467 if (read_ret == 0) {
468 nblogx("stdin hit EOF");
469 break;
470 }
471 struct buf* buf = alloc_buf(data, read_ret);
472
473 // Enqueue.
474 lock(&shared_queue_mutex);
475 struct writer_thread* writer;
476 STAILQ_FOREACH(writer, &writers, entries) {
477 enqueue(&(writer->queue), buf);
478 }
479 xpthread_cond_broadcast(&shared_wakeup_cond);
480 unlock(&shared_queue_mutex);
481 }
482
483 // Wake and join threads.
484 lock(&shared_queue_mutex);
485 stopping = 1;
486 xpthread_cond_broadcast(&shared_wakeup_cond);
487 unlock(&shared_queue_mutex);
488 {
489 struct writer_thread* writer;
490 STAILQ_FOREACH(writer, &writers, entries) {
491 xpthread_join(writer->thread);
492 }
493 }
494
495 // Free writer list.
496 while (!STAILQ_EMPTY(&writers)) {
497 struct writer_thread* writer = STAILQ_FIRST(&writers);
498 STAILQ_REMOVE_HEAD(&writers, entries);
499 // FIXME: free its queue?
500 free(writer);
501 }
502
503 // Clean up logger thread.
504 nblogx("writer threads stopped");
505 lock(logger->mutex);
506 xpthread_cond_broadcast(logger->cond);
507 unlock(logger->mutex);
508 xpthread_join(logger->thread);
509 free(logger);
510
511 warnx("exiting cleanly");
512 return 0;
513 }
514 // vim:set ts=2 sw=2 tw=80 et: