1f22a6ad6ac0b4e00760726d4fab44123fca1864
[buftee] / buftee.c
1 /*-
2 * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 *
16 * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
17 */
18
19 // buftee: like tee(1) but buffers in memory.
20 //
21 // Read from stdin into memory all the time, so that the writer doesn't
22 // block. Our memory usage is unbounded.
23 //
24 // Write to a number of file descriptors whenever we're able to.
25 // Because write() can take a long time even with O_NONBLOCK set on the
26 // fd, we have to do this from a separate kernel scheduling entity (we use
27 // pthreads)
28 //
29 // We use a thread per output filedescriptor so that one slow output (e.g. disk)
30 // doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
31 // flow control on the terminal!)
32
33 #define _GNU_SOURCE // for clock_gettime()
34
35 #include <sys/queue.h>
36 #include <sys/select.h>
37 #include <sys/syscall.h>
38
39 #include <assert.h>
40 #include <err.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <pthread.h>
44 #include <signal.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <time.h>
49 #include <unistd.h>
50
51 #define READ_BUF_SIZE 4096
52 #define SLOW_NSEC 4000
53
54 // Asserted on receipt of SIGTERM, SIGINT.
55 static volatile int stopping = 0;
56
57 static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
58 stopping = 1;
59 }
60
61 // Reference-counted buffer, contains data that was read in the main thread.
62 struct buf {
63 char* data;
64 int len;
65 int refcount;
66 };
67
68 // Queue of buffers.
69 struct buf_queue_elem {
70 struct buf* buf;
71 STAILQ_ENTRY(buf_queue_elem) entries;
72 };
73 STAILQ_HEAD(buf_queue, buf_queue_elem); // struct buf_queue
74
75 // Context for a writer thread.
76 struct writer_thread {
77 pthread_t thread;
78 int fd;
79
80 // Each writer has its own queue.
81 struct buf_queue queue;
82
83 // Pointer to a shared mutex which protects all queues.
84 pthread_mutex_t* mutex;
85
86 // When a writer runs out of work to do, it sleeps on a shared condition.
87 pthread_cond_t* cond;
88
89 STAILQ_ENTRY(writer_thread) entries;
90 };
91
92 // A list of writer threads.
93 STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list
94
95 static struct buf* alloc_buf(const char* const data, const int len) {
96 assert(len > 0);
97 struct buf* buf = malloc(sizeof(*buf));
98 buf->data = malloc((size_t)len);
99 memcpy(buf->data, data, (size_t)len);
100 buf->len = len;
101 buf->refcount = 0;
102 return buf;
103 }
104
105 static void unref_buf(struct buf* buf) {
106 assert(buf->refcount > 0);
107 if ((--buf->refcount) == 0) {
108 free(buf->data);
109 free(buf);
110 }
111 }
112
113 static void enqueue(struct buf_queue* restrict queue,
114 struct buf* restrict buf) {
115 struct buf_queue_elem* elem = malloc(sizeof(*elem));
116 elem->buf = buf;
117 buf->refcount++;
118 STAILQ_INSERT_TAIL(queue, elem, entries);
119 }
120
121 static struct buf* dequeue(struct buf_queue* const queue) {
122 assert(!STAILQ_EMPTY(queue));
123 struct buf_queue_elem* head;
124 head = STAILQ_FIRST(queue);
125 STAILQ_REMOVE_HEAD(queue, entries);
126 struct buf* buf = head->buf;
127 free(head);
128 return buf;
129 }
130
131 static void xpthread_create(pthread_t* thread,
132 void* (*start_routine)(void*),
133 void* arg) {
134 int ret = pthread_create(thread, NULL, start_routine, arg);
135 if (ret == 0) return;
136 errno = ret;
137 err(1, "pthread_create(%p) failed", thread);
138 }
139
140 static void set_nonblocking(const int fd) {
141 int flags;
142 if ((flags = fcntl(fd, F_GETFL)) == -1)
143 err(1, "fcntl(fd = %d, F_GETFL) failed", fd);
144 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
145 err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd);
146 }
147
148 static int make_file(const char* filename) {
149 int fd = open(filename, O_CREAT | O_EXCL | O_NONBLOCK | O_WRONLY, 0666);
150 if (fd == -1)
151 err(1, "failed to open(\"%s\")", filename);
152 return fd;
153 }
154
155 static void get_mono_time(struct timespec* t) {
156 if (clock_gettime(CLOCK_MONOTONIC, t) == -1)
157 err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
158 }
159
160 static void time_diff(const struct timespec* restrict start,
161 const struct timespec* restrict end,
162 struct timespec* restrict out) {
163 out->tv_sec = end->tv_sec - start->tv_sec;
164 out->tv_nsec = end->tv_nsec - start->tv_nsec;
165 if (out->tv_nsec < 0) {
166 out->tv_sec -= 1;
167 out->tv_nsec += 1000000000;
168 }
169 assert(out->tv_sec >= 0);
170 assert(out->tv_nsec >= 0);
171 assert(out->tv_nsec < 1000000000);
172 }
173
174 static int gettid(void) {
175 return (int)syscall(SYS_gettid);
176 }
177
178 static void warn_time(const char* desc,
179 const struct timespec* restrict start,
180 const struct timespec* restrict end) {
181 struct timespec diff;
182 time_diff(start, end, &diff);
183 if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
184 char buf[128];
185 extern char *__progname; // This is where glibc stashes argv[0].
186 snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n",
187 __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec);
188 // Best effort write to a non-blocking stderr.
189 (void)write(STDERR_FILENO, buf, strlen(buf));
190 }
191 }
192
193 static int xread(const int fd, char* const restrict buf, const int count) {
194 ssize_t read_ret;
195 int saved_errno;
196 struct timespec t0;
197 struct timespec t1;
198
199 get_mono_time(&t0);
200 read_ret = read(fd, buf, (size_t)count);
201 saved_errno = errno;
202 get_mono_time(&t1);
203 warn_time("read()", &t0, &t1);
204
205 errno = saved_errno;
206 if (read_ret == -1)
207 err(1, "read(fd = %d, count = %d) failed", fd, count);
208 //FIXME: EAGAIN?
209
210 assert(read_ret >= 0);
211 return (int)read_ret;
212 }
213
214 static void wait_until_writable(const int fd) {
215 fd_set write_fds;
216 FD_ZERO(&write_fds);
217 FD_SET(fd, &write_fds);
218 int select_ret = select(fd + 1, NULL, &write_fds, NULL, NULL);
219 if (select_ret == -1) {
220 if (errno == EINTR) {
221 assert(stopping); // that should have been SIGTERM
222 return;
223 }
224 err(1, "select(write fd = %d) failed", fd);
225 }
226 if (!FD_ISSET(fd, &write_fds))
227 errx(1, "select() did not return writable fd = %d", fd);
228 }
229
230 static int xwrite(const int fd, struct buf* const buf) {
231 ssize_t write_ret;
232 int saved_errno;
233 struct timespec t0;
234 struct timespec t1;
235
236 for (;;) {
237 get_mono_time(&t0);
238 write_ret = write(fd, buf->data, (size_t)buf->len);
239 saved_errno = errno;
240 get_mono_time(&t1);
241 warn_time("write()", &t0, &t1);
242
243 errno = saved_errno;
244 if (write_ret == -1) {
245 if (errno == EAGAIN) {
246 warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
247 wait_until_writable(fd);
248 continue;
249 }
250 err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
251 }
252 if (write_ret == 0)
253 return 0; // EOF
254 assert(write_ret >= 0);
255 if (write_ret < buf->len)
256 err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
257 fd, buf->len, (int)write_ret);
258 // FIXME: handle this
259 assert(write_ret == buf->len);
260 return (int)write_ret;
261 }
262 }
263
264 static void wait_until_readable(const int fd) {
265 fd_set read_fds;
266 FD_ZERO(&read_fds);
267 FD_SET(fd, &read_fds);
268 int select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL);
269 if (select_ret == -1) {
270 if (errno == EINTR) {
271 assert(stopping); // that should have been SIGTERM
272 return;
273 }
274 err(1, "select(read fd = %d) failed", fd);
275 }
276 if (!FD_ISSET(fd, &read_fds))
277 errx(1, "select() did not return readable fd = %d", fd);
278 }
279
280 static void lock(pthread_mutex_t* mutex) {
281 int ret = pthread_mutex_lock(mutex);
282 if (ret == 0) return;
283 errno = ret;
284 err(1, "pthread_mutex_lock(%p) failed", mutex);
285 }
286
287 static void unlock(pthread_mutex_t* mutex) {
288 int ret = pthread_mutex_unlock(mutex);
289 if (ret == 0) return;
290 errno = ret;
291 err(1, "pthread_mutex_unlock(%p) failed", mutex);
292 }
293
294 static void* writer_routine(void *arg) {
295 struct writer_thread* my = arg;
296 struct buf* buf = NULL;
297 lock(my->mutex);
298 for (;;) {
299 while (!stopping && STAILQ_EMPTY(&my->queue)) {
300 // Sleep.
301 pthread_cond_wait(my->cond, my->mutex);
302 }
303 if (!STAILQ_EMPTY(&my->queue))
304 buf = dequeue(&my->queue);
305 unlock(my->mutex);
306
307 if (stopping) break;
308 assert(buf != NULL);
309
310 // Write.
311 int write_ret = xwrite(my->fd, buf);
312 if (write_ret == 0) {
313 errx(1, "fd %d hit EOF", my->fd);
314 }
315 assert(write_ret == buf->len);
316
317 // Unreference buffer, freeing it if we have to.
318 lock(my->mutex);
319 unref_buf(buf);
320 }
321 warnx("thread exiting cleanly");
322 return NULL;
323 }
324
325 static void add_writer_thread(struct writer_thread_list* list,
326 int fd,
327 pthread_mutex_t* shared_queue_mutex,
328 pthread_cond_t* shared_wakeup_cond) {
329 struct writer_thread* writer = malloc(sizeof(*writer));
330 set_nonblocking(fd);
331 writer->fd = fd;
332 writer->mutex = shared_queue_mutex;
333 writer->cond = shared_wakeup_cond;
334 STAILQ_INIT(&writer->queue);
335 STAILQ_INSERT_TAIL(list, writer, entries);
336 xpthread_create(&writer->thread, writer_routine, writer);
337 }
338
339 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
340 int ret = pthread_cond_broadcast(cond);
341 if (ret == 0) return;
342 errno = ret;
343 err(1, "pthread_cond_broadcast(%p) failed", cond);
344 }
345
346 static void xpthread_join(pthread_t thread) {
347 int ret = pthread_join(thread, NULL);
348 if (ret == 0) return;
349 errno = ret;
350 err(1, "pthread_join(%lu) failed", thread);
351 }
352
353 static void sig_continue(int _ignored_ __attribute__((__unused__))) {
354 set_nonblocking(STDERR_FILENO);
355 }
356
357 int main(int argc, char **argv) {
358 pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
359 pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
360
361 struct writer_thread_list writers;
362 STAILQ_INIT(&writers);
363
364 if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
365 if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
366 if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
367 //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
368 sig_continue(0);
369
370 // On Linux, making STDOUT non-blocking has the side-effect of
371 // also making STDIN nonblocking.
372 add_writer_thread(&writers,
373 STDOUT_FILENO,
374 &shared_queue_mutex,
375 &shared_wakeup_cond);
376
377 // Process cmdline args.
378 for (int i = 1; i < argc; i++) {
379 add_writer_thread(&writers,
380 make_file(argv[i]),
381 &shared_queue_mutex,
382 &shared_wakeup_cond);
383 }
384
385 // Reader loop.
386 while (!stopping) {
387 wait_until_readable(STDIN_FILENO);
388 if (stopping) {
389 warnx("stopping after select()");
390 break;
391 }
392
393 // Read.
394 char data[READ_BUF_SIZE];
395 int read_ret = xread(STDIN_FILENO, data, sizeof(data));
396 if (read_ret == 0) {
397 warnx("stdin hit EOF");
398 break;
399 }
400 struct buf* buf = alloc_buf(data, read_ret);
401
402 // Enqueue.
403 lock(&shared_queue_mutex);
404 struct writer_thread* writer;
405 STAILQ_FOREACH(writer, &writers, entries) {
406 enqueue(&(writer->queue), buf);
407 }
408 xpthread_cond_broadcast(&shared_wakeup_cond);
409 unlock(&shared_queue_mutex);
410 }
411
412 // Wake and join threads.
413 lock(&shared_queue_mutex);
414 stopping = 1;
415 xpthread_cond_broadcast(&shared_wakeup_cond);
416 unlock(&shared_queue_mutex);
417 {
418 struct writer_thread* writer;
419 STAILQ_FOREACH(writer, &writers, entries) {
420 xpthread_join(writer->thread);
421 }
422 }
423
424 // Free writer list.
425 while (!STAILQ_EMPTY(&writers)) {
426 struct writer_thread* writer = STAILQ_FIRST(&writers);
427 STAILQ_REMOVE_HEAD(&writers, entries);
428 // FIXME: free its queue?
429 free(writer);
430 }
431
432 warnx("exiting cleanly");
433 return 0;
434 }
435 // vim:set ts=2 sw=2 tw=80 et: