2 * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
19 // buftee: like tee(1) but buffers in memory.
21 // Read from stdin into memory all the time, so that the writer doesn't
22 // block. Our memory usage is unbounded.
24 // Write to a number of file descriptors whenever we're able to.
25 // Because write() can take a long time even with O_NONBLOCK set on the
26 // fd, we have to do this from a separate kernel scheduling entity (we use
29 // We use a thread per output filedescriptor so that one slow output (e.g. disk)
30 // doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
31 // flow control on the terminal!)
33 #define _GNU_SOURCE // for clock_gettime()
35 #include <sys/queue.h>
36 #include <sys/select.h>
37 #include <sys/syscall.h>
51 #define READ_BUF_SIZE 4096
52 #define SLOW_NSEC 4000
54 // *** GLOBALS *****************************************************************
56 // All queues are locked through one global mutex.
57 static pthread_mutex_t shared_queue_mutex
= PTHREAD_MUTEX_INITIALIZER
;
59 // When a writer runs out of work to do, it sleeps on this global cond.
60 static pthread_cond_t shared_wakeup_cond
= PTHREAD_COND_INITIALIZER
;
62 // Asserted on receipt of SIGTERM, SIGINT.
63 static volatile int stopping
= 0;
65 // *** (end globals) ***********************************************************
67 static void signal_handler(int _ignored_
__attribute__((__unused__
))) {
71 // Reference-counted buffer, contains data that was read in the main thread.
79 struct buf_queue_elem
{
81 STAILQ_ENTRY(buf_queue_elem
) entries
;
83 STAILQ_HEAD(buf_queue
, buf_queue_elem
); // struct buf_queue
85 // Context for a writer thread.
86 struct writer_thread
{
90 // Each writer has its own queue.
91 struct buf_queue queue
;
93 STAILQ_ENTRY(writer_thread
) entries
;
96 // A list of writer threads.
97 STAILQ_HEAD(writer_thread_list
, writer_thread
); // struct writer_thread_list
99 static struct buf
* alloc_buf(const char* const data
, const int len
) {
101 struct buf
* buf
= malloc(sizeof(*buf
));
102 buf
->data
= malloc((size_t)len
);
103 memcpy(buf
->data
, data
, (size_t)len
);
109 static void unref_buf(struct buf
* buf
) {
110 assert(buf
->refcount
> 0);
111 if ((--buf
->refcount
) == 0) {
117 static void enqueue(struct buf_queue
* restrict queue
,
118 struct buf
* restrict buf
) {
119 struct buf_queue_elem
* elem
= malloc(sizeof(*elem
));
122 STAILQ_INSERT_TAIL(queue
, elem
, entries
);
125 static struct buf
* dequeue(struct buf_queue
* const queue
) {
126 assert(!STAILQ_EMPTY(queue
));
127 struct buf_queue_elem
* head
;
128 head
= STAILQ_FIRST(queue
);
129 STAILQ_REMOVE_HEAD(queue
, entries
);
130 struct buf
* buf
= head
->buf
;
135 static void xpthread_create(pthread_t
* thread
,
136 void* (*start_routine
)(void*),
138 int ret
= pthread_create(thread
, NULL
, start_routine
, arg
);
139 if (ret
== 0) return;
141 err(1, "pthread_create(%p) failed", thread
);
144 static void set_nonblocking(const int fd
) {
146 if ((flags
= fcntl(fd
, F_GETFL
)) == -1)
147 err(1, "fcntl(fd = %d, F_GETFL) failed", fd
);
148 if (fcntl(fd
, F_SETFL
, flags
| O_NONBLOCK
) == -1)
149 err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd
);
152 static int make_file(const char* filename
) {
153 int fd
= open(filename
, O_CREAT
| O_EXCL
| O_NONBLOCK
| O_WRONLY
, 0666);
155 err(1, "failed to open(\"%s\")", filename
);
159 static void get_mono_time(struct timespec
* t
) {
160 if (clock_gettime(CLOCK_MONOTONIC
, t
) == -1)
161 err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
164 static void time_diff(const struct timespec
* restrict start
,
165 const struct timespec
* restrict end
,
166 struct timespec
* restrict out
) {
167 out
->tv_sec
= end
->tv_sec
- start
->tv_sec
;
168 out
->tv_nsec
= end
->tv_nsec
- start
->tv_nsec
;
169 if (out
->tv_nsec
< 0) {
171 out
->tv_nsec
+= 1000000000;
173 assert(out
->tv_sec
>= 0);
174 assert(out
->tv_nsec
>= 0);
175 assert(out
->tv_nsec
< 1000000000);
178 static void warn_time(const char* desc
,
179 const struct timespec
* restrict start
,
180 const struct timespec
* restrict end
) {
181 struct timespec diff
;
182 time_diff(start
, end
, &diff
);
183 if (diff
.tv_sec
> 0 || diff
.tv_nsec
> SLOW_NSEC
) {
185 extern char *__progname
; // This is where glibc stashes argv[0].
186 snprintf(buf
, sizeof(buf
), "%d: %s: %s took %d.%09d secs\n",
187 (int)getpid(), __progname
, desc
,
188 (int)diff
.tv_sec
, (int)diff
.tv_nsec
);
189 // Best effort write to a non-blocking stderr.
190 (void)write(STDERR_FILENO
, buf
, strlen(buf
));
194 static int xread(const int fd
, char* const restrict buf
, const int count
) {
201 read_ret
= read(fd
, buf
, (size_t)count
);
204 warn_time("read()", &t0
, &t1
);
208 err(1, "read(fd = %d, count = %d) failed", fd
, count
);
211 assert(read_ret
>= 0);
212 return (int)read_ret
;
215 static int xwrite(const int fd
, struct buf
* const buf
) {
222 write_ret
= write(fd
, buf
->data
, (size_t)buf
->len
);
225 warn_time("write()", &t0
, &t1
);
229 err(1, "write(fd = %d, count = %d) failed", fd
, buf
->len
);
233 assert(write_ret
>= 0);
234 if (write_ret
< buf
->len
)
235 err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
236 fd
, buf
->len
, (int)write_ret
);
237 // FIXME: handle this
238 assert(write_ret
== buf
->len
);
239 return (int)write_ret
;
242 static int max(const int a
, const int b
) { return (a
> b
) ? a
: b
; }
244 static void wait_until_readable(const int fd
) {
248 FD_SET(fd
, &read_fds
);
249 if ((select_ret
= select(fd
+ 1, &read_fds
, NULL
, NULL
, NULL
)) == -1)
250 err(1, "select() failed");
251 if (!FD_ISSET(fd
, &read_fds
))
252 errx(1, "select() did not return readable fd = %d", fd
);
255 static void lock(pthread_mutex_t
* mutex
) {
256 int ret
= pthread_mutex_lock(mutex
);
257 if (ret
== 0) return;
259 err(1, "pthread_mutex_lock(%p) failed", mutex
);
262 static void unlock(pthread_mutex_t
* mutex
) {
263 int ret
= pthread_mutex_unlock(mutex
);
264 if (ret
== 0) return;
266 err(1, "pthread_mutex_unlock(%p) failed", mutex
);
269 static void* writer_routine(void *arg
) {
270 struct writer_thread
* my
= arg
;
272 // FIXME: less locking
273 struct buf
* buf
= NULL
;
275 lock(&shared_queue_mutex
);
276 while (!stopping
&& STAILQ_EMPTY(&my
->queue
)) {
278 pthread_cond_wait(&shared_wakeup_cond
, &shared_queue_mutex
);
280 if (!STAILQ_EMPTY(&my
->queue
))
281 buf
= dequeue(&my
->queue
);
282 unlock(&shared_queue_mutex
);
288 int write_ret
= xwrite(my
->fd
, buf
);
289 if (write_ret
== 0) {
290 errx(1, "fd %d hit EOF", my
->fd
);
292 assert(write_ret
== buf
->len
);
294 // Unreference buffer, freeing it if we have to.
295 lock(&shared_queue_mutex
);
297 unlock(&shared_queue_mutex
);
299 warnx("thread exiting cleanly");
303 static void add_writer_thread(struct writer_thread_list
* list
, const int fd
) {
305 struct writer_thread
* writer
= malloc(sizeof(*writer
));
307 STAILQ_INIT(&(writer
->queue
));
308 STAILQ_INSERT_TAIL(list
, writer
, entries
);
309 xpthread_create(&(writer
->thread
), writer_routine
, writer
);
312 static void xpthread_cond_broadcast(pthread_cond_t
* cond
) {
313 int ret
= pthread_cond_broadcast(cond
);
314 if (ret
== 0) return;
316 err(1, "pthread_cond_broadcast(%p) failed", cond
);
319 static void xpthread_join(pthread_t thread
) {
320 int ret
= pthread_join(thread
, NULL
);
321 if (ret
== 0) return;
323 err(1, "pthread_join(%lu) failed", thread
);
326 int main(int argc
, char **argv
) {
327 struct writer_thread_list writers
;
328 STAILQ_INIT(&writers
);
330 if (signal(SIGINT
, signal_handler
) == SIG_ERR
) err(1, "signal() failed");
331 if (signal(SIGTERM
, signal_handler
) == SIG_ERR
) err(1, "signal() failed");
332 //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
333 set_nonblocking(STDERR_FILENO
);
335 // On Linux, making STDOUT non-blocking has the side-effect of
336 // also making STDIN nonblocking.
337 add_writer_thread(&writers
, STDOUT_FILENO
);
339 // Process cmdline args.
340 for (int i
= 1; i
< argc
; i
++) {
341 add_writer_thread(&writers
, make_file(argv
[i
]));
346 wait_until_readable(STDIN_FILENO
);
348 warnx("stopping after select()");
353 char data
[READ_BUF_SIZE
];
354 int read_ret
= xread(STDIN_FILENO
, data
, sizeof(data
));
356 warnx("stdin hit EOF");
359 struct buf
* buf
= alloc_buf(data
, read_ret
);
362 lock(&shared_queue_mutex
);
363 struct writer_thread
* writer
;
364 STAILQ_FOREACH(writer
, &writers
, entries
) enqueue(&(writer
->queue
), buf
);
365 xpthread_cond_broadcast(&shared_wakeup_cond
);
366 unlock(&shared_queue_mutex
);
369 // Wake and join threads.
371 lock(&shared_queue_mutex
);
372 xpthread_cond_broadcast(&shared_wakeup_cond
);
373 unlock(&shared_queue_mutex
);
375 struct writer_thread
* writer
;
376 STAILQ_FOREACH(writer
, &writers
, entries
) xpthread_join(writer
->thread
);
380 while (!STAILQ_EMPTY(&writers
)) {
381 struct writer_thread
* writer
= STAILQ_FIRST(&writers
);
382 STAILQ_REMOVE_HEAD(&writers
, entries
);
383 // FIXME: free its queue?
387 warnx("exiting cleanly");
390 // vim:set ts=2 sw=2 tw=80 et: