4b03ecaad04bc850c6079d5dfe3b7b9e960b7ee9
[buftee] / buftee.c
1 /*-
2 * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 *
16 * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
17 */
18
19 // buftee: like tee(1) but buffers in memory.
20 //
21 // Read from stdin into memory all the time, so that the writer doesn't
22 // block. Our memory usage is unbounded.
23 //
24 // Write to a number of file descriptors whenever we're able to.
25 // Because write() can take a long time even with O_NONBLOCK set on the
26 // fd, we have to do this from a separate kernel scheduling entity (we use
27 // pthreads)
28 //
29 // We use a thread per output filedescriptor so that one slow output (e.g. disk)
30 // doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
31 // flow control on the terminal!)
32
33 #define _GNU_SOURCE // for clock_gettime()
34
35 #include <sys/queue.h>
36 #include <sys/select.h>
37 #include <sys/syscall.h>
38
39 #include <assert.h>
40 #include <err.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <pthread.h>
44 #include <signal.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <time.h>
49 #include <unistd.h>
50
51 #define READ_BUF_SIZE 4096
52 #define SLOW_NSEC 4000
53
54 // *** GLOBALS *****************************************************************
55
56 // All queues are locked through one global mutex.
57 static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
58
59 // When a writer runs out of work to do, it sleeps on this global cond.
60 static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
61
62 // Asserted on receipt of SIGTERM, SIGINT.
63 static volatile int stopping = 0;
64
65 // *** (end globals) ***********************************************************
66
67 static void signal_handler(int _ignored_ __attribute__((__unused__))) {
68 stopping = 1;
69 }
70
71 // Reference-counted buffer, contains data that was read in the main thread.
72 struct buf {
73 char* data;
74 int len;
75 int refcount;
76 };
77
78 // Queue of buffers.
79 struct buf_queue_elem {
80 struct buf* buf;
81 STAILQ_ENTRY(buf_queue_elem) entries;
82 };
83 STAILQ_HEAD(buf_queue, buf_queue_elem); // struct buf_queue
84
85 // Context for a writer thread.
86 struct writer_thread {
87 pthread_t thread;
88 int fd;
89
90 // Each writer has its own queue.
91 struct buf_queue queue;
92
93 STAILQ_ENTRY(writer_thread) entries;
94 };
95
96 // A list of writer threads.
97 STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list
98
99 static struct buf* alloc_buf(const char* const data, const int len) {
100 assert(len > 0);
101 struct buf* buf = malloc(sizeof(*buf));
102 buf->data = malloc((size_t)len);
103 memcpy(buf->data, data, (size_t)len);
104 buf->len = len;
105 buf->refcount = 0;
106 return buf;
107 }
108
109 static void unref_buf(struct buf* buf) {
110 assert(buf->refcount > 0);
111 if ((--buf->refcount) == 0) {
112 free(buf->data);
113 free(buf);
114 }
115 }
116
117 static void enqueue(struct buf_queue* restrict queue,
118 struct buf* restrict buf) {
119 struct buf_queue_elem* elem = malloc(sizeof(*elem));
120 elem->buf = buf;
121 buf->refcount++;
122 STAILQ_INSERT_TAIL(queue, elem, entries);
123 }
124
125 static struct buf* dequeue(struct buf_queue* const queue) {
126 assert(!STAILQ_EMPTY(queue));
127 struct buf_queue_elem* head;
128 head = STAILQ_FIRST(queue);
129 STAILQ_REMOVE_HEAD(queue, entries);
130 struct buf* buf = head->buf;
131 free(head);
132 return buf;
133 }
134
135 static void xpthread_create(pthread_t* thread,
136 void* (*start_routine)(void*),
137 void* arg) {
138 int ret = pthread_create(thread, NULL, start_routine, arg);
139 if (ret == 0) return;
140 errno = ret;
141 err(1, "pthread_create(%p) failed", thread);
142 }
143
144 static void set_nonblocking(const int fd) {
145 int flags;
146 if ((flags = fcntl(fd, F_GETFL)) == -1)
147 err(1, "fcntl(fd = %d, F_GETFL) failed", fd);
148 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
149 err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd);
150 }
151
152 static int make_file(const char* filename) {
153 int fd = open(filename, O_CREAT | O_EXCL | O_NONBLOCK | O_WRONLY, 0666);
154 if (fd == -1)
155 err(1, "failed to open(\"%s\")", filename);
156 return fd;
157 }
158
159 static void get_mono_time(struct timespec* t) {
160 if (clock_gettime(CLOCK_MONOTONIC, t) == -1)
161 err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
162 }
163
164 static void time_diff(const struct timespec* restrict start,
165 const struct timespec* restrict end,
166 struct timespec* restrict out) {
167 out->tv_sec = end->tv_sec - start->tv_sec;
168 out->tv_nsec = end->tv_nsec - start->tv_nsec;
169 if (out->tv_nsec < 0) {
170 out->tv_sec -= 1;
171 out->tv_nsec += 1000000000;
172 }
173 assert(out->tv_sec >= 0);
174 assert(out->tv_nsec >= 0);
175 assert(out->tv_nsec < 1000000000);
176 }
177
178 static void warn_time(const char* desc,
179 const struct timespec* restrict start,
180 const struct timespec* restrict end) {
181 struct timespec diff;
182 time_diff(start, end, &diff);
183 if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
184 char buf[128];
185 extern char *__progname; // This is where glibc stashes argv[0].
186 snprintf(buf, sizeof(buf), "%d: %s: %s took %d.%09d secs\n",
187 (int)getpid(), __progname, desc,
188 (int)diff.tv_sec, (int)diff.tv_nsec);
189 // Best effort write to a non-blocking stderr.
190 (void)write(STDERR_FILENO, buf, strlen(buf));
191 }
192 }
193
194 static int xread(const int fd, char* const restrict buf, const int count) {
195 ssize_t read_ret;
196 int saved_errno;
197 struct timespec t0;
198 struct timespec t1;
199
200 get_mono_time(&t0);
201 read_ret = read(fd, buf, (size_t)count);
202 saved_errno = errno;
203 get_mono_time(&t1);
204 warn_time("read()", &t0, &t1);
205
206 errno = saved_errno;
207 if (read_ret == -1)
208 err(1, "read(fd = %d, count = %d) failed", fd, count);
209 //FIXME: EAGAIN?
210
211 assert(read_ret >= 0);
212 return (int)read_ret;
213 }
214
215 static int xwrite(const int fd, struct buf* const buf) {
216 ssize_t write_ret;
217 int saved_errno;
218 struct timespec t0;
219 struct timespec t1;
220
221 get_mono_time(&t0);
222 write_ret = write(fd, buf->data, (size_t)buf->len);
223 saved_errno = errno;
224 get_mono_time(&t1);
225 warn_time("write()", &t0, &t1);
226
227 errno = saved_errno;
228 if (write_ret == -1)
229 err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
230 //FIXME: EAGAIN?
231 if (write_ret == 0)
232 return 0;
233 assert(write_ret >= 0);
234 if (write_ret < buf->len)
235 err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
236 fd, buf->len, (int)write_ret);
237 // FIXME: handle this
238 assert(write_ret == buf->len);
239 return (int)write_ret;
240 }
241
242 static int max(const int a, const int b) { return (a > b) ? a : b; }
243
244 static void wait_until_readable(const int fd) {
245 int select_ret;
246 fd_set read_fds;
247 FD_ZERO(&read_fds);
248 FD_SET(fd, &read_fds);
249 if ((select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL)) == -1)
250 err(1, "select() failed");
251 if (!FD_ISSET(fd, &read_fds))
252 errx(1, "select() did not return readable fd = %d", fd);
253 }
254
255 static void lock(pthread_mutex_t* mutex) {
256 int ret = pthread_mutex_lock(mutex);
257 if (ret == 0) return;
258 errno = ret;
259 err(1, "pthread_mutex_lock(%p) failed", mutex);
260 }
261
262 static void unlock(pthread_mutex_t* mutex) {
263 int ret = pthread_mutex_unlock(mutex);
264 if (ret == 0) return;
265 errno = ret;
266 err(1, "pthread_mutex_unlock(%p) failed", mutex);
267 }
268
269 static void* writer_routine(void *arg) {
270 struct writer_thread* my = arg;
271 for (;;) {
272 // FIXME: less locking
273 struct buf* buf = NULL;
274
275 lock(&shared_queue_mutex);
276 while (!stopping && STAILQ_EMPTY(&my->queue)) {
277 // Sleep.
278 pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
279 }
280 if (!STAILQ_EMPTY(&my->queue))
281 buf = dequeue(&my->queue);
282 unlock(&shared_queue_mutex);
283
284 if (stopping) break;
285 assert(buf != NULL);
286
287 // Write.
288 int write_ret = xwrite(my->fd, buf);
289 if (write_ret == 0) {
290 errx(1, "fd %d hit EOF", my->fd);
291 }
292 assert(write_ret == buf->len);
293
294 // Unreference buffer, freeing it if we have to.
295 lock(&shared_queue_mutex);
296 unref_buf(buf);
297 unlock(&shared_queue_mutex);
298 }
299 warnx("thread exiting cleanly");
300 return NULL;
301 }
302
303 static void add_writer_thread(struct writer_thread_list* list, const int fd) {
304 set_nonblocking(fd);
305 struct writer_thread* writer = malloc(sizeof(*writer));
306 writer->fd = fd;
307 STAILQ_INIT(&(writer->queue));
308 STAILQ_INSERT_TAIL(list, writer, entries);
309 xpthread_create(&(writer->thread), writer_routine, writer);
310 }
311
312 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
313 int ret = pthread_cond_broadcast(cond);
314 if (ret == 0) return;
315 errno = ret;
316 err(1, "pthread_cond_broadcast(%p) failed", cond);
317 }
318
319 static void xpthread_join(pthread_t thread) {
320 int ret = pthread_join(thread, NULL);
321 if (ret == 0) return;
322 errno = ret;
323 err(1, "pthread_join(%lu) failed", thread);
324 }
325
326 int main(int argc, char **argv) {
327 struct writer_thread_list writers;
328 STAILQ_INIT(&writers);
329
330 if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed");
331 if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed");
332 //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
333 set_nonblocking(STDERR_FILENO);
334
335 // On Linux, making STDOUT non-blocking has the side-effect of
336 // also making STDIN nonblocking.
337 add_writer_thread(&writers, STDOUT_FILENO);
338
339 // Process cmdline args.
340 for (int i = 1; i < argc; i++) {
341 add_writer_thread(&writers, make_file(argv[i]));
342 }
343
344 // Reader loop.
345 while (!stopping) {
346 wait_until_readable(STDIN_FILENO);
347 if (stopping) {
348 warnx("stopping after select()");
349 break;
350 }
351
352 // Read.
353 char data[READ_BUF_SIZE];
354 int read_ret = xread(STDIN_FILENO, data, sizeof(data));
355 if (read_ret == 0) {
356 warnx("stdin hit EOF");
357 break;
358 }
359 struct buf* buf = alloc_buf(data, read_ret);
360
361 // Enqueue.
362 lock(&shared_queue_mutex);
363 struct writer_thread* writer;
364 STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf);
365 xpthread_cond_broadcast(&shared_wakeup_cond);
366 unlock(&shared_queue_mutex);
367 }
368
369 // Wake and join threads.
370 stopping = 1;
371 lock(&shared_queue_mutex);
372 xpthread_cond_broadcast(&shared_wakeup_cond);
373 unlock(&shared_queue_mutex);
374 {
375 struct writer_thread* writer;
376 STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread);
377 }
378
379 // Free writer list.
380 while (!STAILQ_EMPTY(&writers)) {
381 struct writer_thread* writer = STAILQ_FIRST(&writers);
382 STAILQ_REMOVE_HEAD(&writers, entries);
383 // FIXME: free its queue?
384 free(writer);
385 }
386
387 warnx("exiting cleanly");
388 return 0;
389 }
390 // vim:set ts=2 sw=2 tw=80 et: