2 * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
19 // buftee: like tee(1) but buffers in memory.
21 // Read from stdin into memory all the time, so that the writer doesn't
22 // block. Our memory usage is unbounded.
24 // Write to a number of file descriptors whenever we're able to.
25 // Because write() can take a long time even with O_NONBLOCK set on the
26 // fd, we have to do this from a separate kernel scheduling entity (we use
29 // We use a thread per output filedescriptor so that one slow output (e.g. disk)
30 // doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
31 // flow control on the terminal!)
33 #define _GNU_SOURCE // for clock_gettime()
35 #include <sys/queue.h>
36 #include <sys/select.h>
37 #include <sys/syscall.h>
52 #define READ_BUF_SIZE 4096
53 #define SLOW_NSEC 4000
55 // Asserted on receipt of SIGTERM, SIGINT.
56 static volatile int stopping
= 0; // Global.
58 static void sig_stopping(int _ignored_
__attribute__((__unused__
))) {
62 // Reference-counted buffer, contains data that was read in the main thread.
70 struct buf_queue_elem
{
72 STAILQ_ENTRY(buf_queue_elem
) entries
;
74 STAILQ_HEAD(buf_queue
, buf_queue_elem
); // struct buf_queue
76 // Context for a writer thread.
77 struct writer_thread
{
81 // Each writer has its own queue.
82 struct buf_queue queue
;
84 // Pointer to a shared mutex which protects all queues.
85 pthread_mutex_t
* mutex
;
87 // When a writer runs out of work to do, it sleeps on a shared condition.
90 // The writer thread exits when it's not running and its queue is empty.
93 STAILQ_ENTRY(writer_thread
) entries
;
96 // A list of writer threads.
97 STAILQ_HEAD(writer_thread_list
, writer_thread
); // struct writer_thread_list
99 static struct writer_thread
* logger
= NULL
; // Global.
100 static struct timespec logger_start
; // Global.
102 static struct buf
* alloc_buf(const char* const data
, const int len
) {
104 struct buf
* buf
= malloc(sizeof(*buf
));
105 buf
->data
= malloc((size_t)len
);
106 memcpy(buf
->data
, data
, (size_t)len
);
112 static void unref_buf(struct buf
* buf
) {
113 assert(buf
->refcount
> 0);
114 if ((--buf
->refcount
) == 0) {
120 static void enqueue(struct buf_queue
* restrict queue
,
121 struct buf
* restrict buf
) {
122 struct buf_queue_elem
* elem
= malloc(sizeof(*elem
));
125 STAILQ_INSERT_TAIL(queue
, elem
, entries
);
128 static struct buf
* dequeue(struct buf_queue
* const queue
) {
129 assert(!STAILQ_EMPTY(queue
));
130 struct buf_queue_elem
* head
;
131 head
= STAILQ_FIRST(queue
);
132 STAILQ_REMOVE_HEAD(queue
, entries
);
133 struct buf
* buf
= head
->buf
;
138 static void xpthread_create(pthread_t
* thread
,
139 void* (*start_routine
)(void*),
141 int ret
= pthread_create(thread
, NULL
, start_routine
, arg
);
142 if (ret
== 0) return;
144 err(1, "pthread_create(%p) failed", thread
);
147 static void set_nonblocking(const int fd
) {
149 if ((flags
= fcntl(fd
, F_GETFL
)) == -1)
150 err(1, "fcntl(fd = %d, F_GETFL) failed", fd
);
151 if (fcntl(fd
, F_SETFL
, flags
| O_NONBLOCK
) == -1)
152 err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd
);
155 static int make_file(const char* filename
) {
156 int fd
= open(filename
, O_CREAT
| O_EXCL
| O_NONBLOCK
| O_WRONLY
, 0666);
158 err(1, "failed to open(\"%s\")", filename
);
162 static void get_mono_time(struct timespec
* t
) {
163 if (clock_gettime(CLOCK_MONOTONIC
, t
) == -1)
164 err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
167 static void time_diff(const struct timespec
* restrict start
,
168 const struct timespec
* restrict end
,
169 struct timespec
* restrict out
) {
170 out
->tv_sec
= end
->tv_sec
- start
->tv_sec
;
171 out
->tv_nsec
= end
->tv_nsec
- start
->tv_nsec
;
172 if (out
->tv_nsec
< 0) {
174 out
->tv_nsec
+= 1000000000;
176 assert(out
->tv_sec
>= 0);
177 assert(out
->tv_nsec
>= 0);
178 assert(out
->tv_nsec
< 1000000000);
181 static int gettid(void) {
182 return (int)syscall(SYS_gettid
);
185 static void _nblog_helper(int want_errno
,
191 //#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
192 #define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
194 static void warn_time(const char* desc
,
195 const struct timespec
* restrict start
,
196 const struct timespec
* restrict end
) {
197 if (pthread_self() == logger
->thread
) {
200 struct timespec diff
;
201 time_diff(start
, end
, &diff
);
202 if (diff
.tv_sec
> 0 || diff
.tv_nsec
> SLOW_NSEC
) {
203 nblogx("%s took %d.%09d secs", desc
, (int)diff
.tv_sec
, (int)diff
.tv_nsec
);
207 static int xread(const int fd
, char* const restrict buf
, const int count
) {
214 read_ret
= read(fd
, buf
, (size_t)count
);
217 warn_time("read()", &t0
, &t1
);
221 err(1, "read(fd = %d, count = %d) failed", fd
, count
);
224 assert(read_ret
>= 0);
225 return (int)read_ret
;
228 static void wait_until_writable(const int fd
) {
231 FD_SET(fd
, &write_fds
);
232 int select_ret
= select(fd
+ 1, NULL
, &write_fds
, NULL
, NULL
);
233 if (select_ret
== -1) {
234 if (errno
== EINTR
) {
235 assert(stopping
); // that should have been SIGTERM
238 err(1, "select(write fd = %d) failed", fd
);
240 if (!FD_ISSET(fd
, &write_fds
))
241 errx(1, "select() did not return writable fd = %d", fd
);
244 // Returns zero on EOF, dies on error, retries on short write.
245 static int write_buf(const int fd
, struct buf
* const buf
) {
246 char* data
= buf
->data
;
247 size_t left
= (size_t)buf
->len
;
253 ssize_t write_ret
= write(fd
, data
, left
);
254 int saved_errno
= errno
;
256 warn_time("write()", &t0
, &t1
);
259 if (write_ret
== -1) {
260 if (errno
== EAGAIN
) {
261 nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd
);
262 wait_until_writable(fd
);
265 err(1, "write(fd = %d, count = %d) failed", fd
, (int)left
);
267 if (write_ret
== 0) {
270 assert(write_ret
>= 0);
271 if (write_ret
< (ssize_t
)left
) {
272 nblogx("short write(fd = %d, count = %d): wrote %d",
273 fd
, (int)left
, (int)write_ret
);
278 assert(write_ret
== (ssize_t
)left
);
283 static void wait_until_readable(const int fd
) {
286 FD_SET(fd
, &read_fds
);
287 int select_ret
= select(fd
+ 1, &read_fds
, NULL
, NULL
, NULL
);
288 if (select_ret
== -1) {
289 if (errno
== EINTR
) {
290 assert(stopping
); // that should have been SIGTERM
293 err(1, "select(read fd = %d) failed", fd
);
295 if (!FD_ISSET(fd
, &read_fds
))
296 errx(1, "select() did not return readable fd = %d", fd
);
299 static void lock(pthread_mutex_t
* mutex
) {
300 int ret
= pthread_mutex_lock(mutex
);
301 if (ret
== 0) return;
303 err(1, "pthread_mutex_lock(%p) failed", mutex
);
306 static void unlock(pthread_mutex_t
* mutex
) {
307 int ret
= pthread_mutex_unlock(mutex
);
308 if (ret
== 0) return;
310 err(1, "pthread_mutex_unlock(%p) failed", mutex
);
313 static void* writer_routine(void *arg
) {
314 struct writer_thread
* my
= arg
;
315 struct buf
* buf
= NULL
;
317 nblogx("writer thread for fd %d starting", my
->fd
);
320 while (my
->running
&& STAILQ_EMPTY(&my
->queue
)) {
322 pthread_cond_wait(my
->cond
, my
->mutex
);
324 if (!STAILQ_EMPTY(&my
->queue
)) {
325 buf
= dequeue(&my
->queue
);
330 assert(!my
->running
);
335 if (write_buf(my
->fd
, buf
) == 0) {
336 errx(1, "fd %d hit EOF", my
->fd
);
339 // Unreference buffer, freeing it if we have to.
345 nblogx("thread exiting cleanly");
350 static void add_writer_thread(struct writer_thread_list
* list
,
352 pthread_mutex_t
* shared_queue_mutex
,
353 pthread_cond_t
* shared_wakeup_cond
) {
354 struct writer_thread
* writer
= malloc(sizeof(*writer
));
357 writer
->mutex
= shared_queue_mutex
;
358 writer
->cond
= shared_wakeup_cond
;
360 STAILQ_INIT(&writer
->queue
);
361 STAILQ_INSERT_TAIL(list
, writer
, entries
);
362 xpthread_create(&writer
->thread
, writer_routine
, writer
);
365 static void xpthread_cond_broadcast(pthread_cond_t
* cond
) {
366 int ret
= pthread_cond_broadcast(cond
);
367 if (ret
== 0) return;
369 err(1, "pthread_cond_broadcast(%p) failed", cond
);
372 static void init_logger_thread(void) {
373 assert(logger
== NULL
);
374 get_mono_time(&logger_start
);
375 logger
= malloc(sizeof(*logger
));
376 logger
->fd
= STDERR_FILENO
;
377 logger
->mutex
= malloc(sizeof(*logger
->mutex
));
378 pthread_mutex_init(logger
->mutex
, NULL
);
379 logger
->cond
= malloc(sizeof(*logger
->cond
));
380 pthread_cond_init(logger
->cond
, NULL
);
382 STAILQ_INIT(&logger
->queue
);
383 xpthread_create(&logger
->thread
, writer_routine
, logger
);
386 static void _nblog_helper(int want_errno
,
391 int saved_errno
= errno
;
393 va_start(va
, format
);
397 struct timespec diff
;
399 time_diff(&logger_start
, &now
, &diff
);
404 extern char *__progname
; // This is where glibc stashes argv[0].
405 len
= snprintf(buf
, sizeof(buf
), "%s:%d:%s(): tid %d at %d.%09d: ",
406 __progname
, line
, func
, gettid(),
407 (int)diff
.tv_sec
, (int)diff
.tv_nsec
);
410 len
+= vsnprintf(buf
+ len
, sizeof(buf
) - len
, format
, va
);
413 len
+= snprintf(buf
+ len
, sizeof(buf
) - len
, ": %s (errno = %d)",
414 strerror(saved_errno
), saved_errno
);
417 len
+= snprintf(buf
+ len
, sizeof(buf
) - len
, "\n");
418 struct buf
* b
= alloc_buf(buf
, len
);
422 enqueue(&logger
->queue
, b
);
423 xpthread_cond_broadcast(logger
->cond
);
424 unlock(logger
->mutex
);
427 static void xpthread_join(pthread_t thread
) {
428 int ret
= pthread_join(thread
, NULL
);
429 if (ret
== 0) return;
431 err(1, "pthread_join(%lu) failed", thread
);
434 static void sig_continue(int _ignored_
__attribute__((__unused__
))) {
435 set_nonblocking(STDERR_FILENO
);
438 int main(int argc
, char **argv
) {
439 init_logger_thread();
442 pthread_mutex_t shared_queue_mutex
= PTHREAD_MUTEX_INITIALIZER
;
443 pthread_cond_t shared_wakeup_cond
= PTHREAD_COND_INITIALIZER
;
445 struct writer_thread_list writers
;
446 STAILQ_INIT(&writers
);
448 if (signal(SIGINT
, sig_stopping
) == SIG_ERR
) err(1, "signal() failed");
449 if (signal(SIGTERM
, sig_stopping
) == SIG_ERR
) err(1, "signal() failed");
450 if (signal(SIGCONT
, sig_continue
) == SIG_ERR
) err(1, "signal() failed");
451 //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
454 // On Linux, making STDOUT non-blocking has the side-effect of
455 // also making STDIN nonblocking.
456 add_writer_thread(&writers
,
459 &shared_wakeup_cond
);
461 // Process cmdline args.
462 for (int i
= 1; i
< argc
; i
++) {
463 add_writer_thread(&writers
,
466 &shared_wakeup_cond
);
471 wait_until_readable(STDIN_FILENO
);
473 nblogx("stopping after select()");
478 char data
[READ_BUF_SIZE
];
479 int read_ret
= xread(STDIN_FILENO
, data
, sizeof(data
));
481 nblogx("stdin hit EOF");
484 struct buf
* buf
= alloc_buf(data
, read_ret
);
487 lock(&shared_queue_mutex
);
488 struct writer_thread
* writer
;
489 STAILQ_FOREACH(writer
, &writers
, entries
) {
490 enqueue(&(writer
->queue
), buf
);
492 xpthread_cond_broadcast(&shared_wakeup_cond
);
493 unlock(&shared_queue_mutex
);
496 nblogx("stopping: draining writer threads");
497 lock(&shared_queue_mutex
);
499 struct writer_thread
* writer
;
500 STAILQ_FOREACH(writer
, &writers
, entries
) {
504 xpthread_cond_broadcast(&shared_wakeup_cond
);
505 unlock(&shared_queue_mutex
);
507 struct writer_thread
* writer
;
508 STAILQ_FOREACH(writer
, &writers
, entries
) {
509 xpthread_join(writer
->thread
);
514 while (!STAILQ_EMPTY(&writers
)) {
515 struct writer_thread
* writer
= STAILQ_FIRST(&writers
);
516 STAILQ_REMOVE_HEAD(&writers
, entries
);
517 if (!STAILQ_EMPTY(&writer
->queue
)) {
518 nblogx("queue for fd %d is not empty", writer
->fd
);
523 // Clean up logger thread.
524 nblogx("writer threads stopped, stopping logger");
527 xpthread_cond_broadcast(logger
->cond
);
528 unlock(logger
->mutex
);
529 xpthread_join(logger
->thread
);
530 pthread_mutex_destroy(logger
->mutex
);
532 pthread_cond_destroy(logger
->cond
);
536 warnx("exiting cleanly");
539 // vim:set ts=2 sw=2 tw=80 et: