projects
/
buftee
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
288b2e9
)
Drain writers after SIGTERM.
author
Emil Mikulic
<emikulic@gmail.com>
Sun, 24 Mar 2013 17:32:28 +0000
(
04:32
+1100)
committer
Emil Mikulic
<emikulic@gmail.com>
Sun, 24 Mar 2013 17:32:28 +0000
(
04:32
+1100)
buftee.c
patch
|
blob
|
history
diff --git
a/buftee.c
b/buftee.c
index
b69b318
..
7ab731b
100644
(file)
--- a/
buftee.c
+++ b/
buftee.c
@@
-87,6
+87,9
@@
struct writer_thread {
// When a writer runs out of work to do, it sleeps on a shared condition.
pthread_cond_t* cond;
// When a writer runs out of work to do, it sleeps on a shared condition.
pthread_cond_t* cond;
+ // The writer thread exits when it's not running and its queue is empty.
+ volatile int running;
+
STAILQ_ENTRY(writer_thread) entries;
};
STAILQ_ENTRY(writer_thread) entries;
};
@@
-305,18
+308,23
@@
static void unlock(pthread_mutex_t* mutex) {
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
+
+ nblogx("writer thread for fd %d starting", my->fd);
lock(my->mutex);
for (;;) {
lock(my->mutex);
for (;;) {
- while (
!stopp
ing && STAILQ_EMPTY(&my->queue)) {
+ while (
my->runn
ing && STAILQ_EMPTY(&my->queue)) {
// Sleep.
pthread_cond_wait(my->cond, my->mutex);
}
// Sleep.
pthread_cond_wait(my->cond, my->mutex);
}
- if (!STAILQ_EMPTY(&my->queue))
+ if (!STAILQ_EMPTY(&my->queue))
{
buf = dequeue(&my->queue);
buf = dequeue(&my->queue);
+ }
unlock(my->mutex);
unlock(my->mutex);
- if (stopping) break;
- assert(buf != NULL);
+ if (buf == NULL) {
+ assert(!my->running);
+ break;
+ }
// Write.
int write_ret = xwrite(my->fd, buf);
// Write.
int write_ret = xwrite(my->fd, buf);
@@
-328,6
+336,7
@@
static void* writer_routine(void *arg) {
// Unreference buffer, freeing it if we have to.
lock(my->mutex);
unref_buf(buf);
// Unreference buffer, freeing it if we have to.
lock(my->mutex);
unref_buf(buf);
+ buf = NULL;
}
nblogx("thread exiting cleanly");
return NULL;
}
nblogx("thread exiting cleanly");
return NULL;
@@
-342,6
+351,7
@@
static void add_writer_thread(struct writer_thread_list* list,
writer->fd = fd;
writer->mutex = shared_queue_mutex;
writer->cond = shared_wakeup_cond;
writer->fd = fd;
writer->mutex = shared_queue_mutex;
writer->cond = shared_wakeup_cond;
+ writer->running = 1;
STAILQ_INIT(&writer->queue);
STAILQ_INSERT_TAIL(list, writer, entries);
xpthread_create(&writer->thread, writer_routine, writer);
STAILQ_INIT(&writer->queue);
STAILQ_INSERT_TAIL(list, writer, entries);
xpthread_create(&writer->thread, writer_routine, writer);
@@
-363,6
+373,7
@@
static void init_logger_thread(void) {
pthread_mutex_init(logger->mutex, NULL);
logger->cond = malloc(sizeof(*logger->cond));
pthread_cond_init(logger->cond, NULL);
pthread_mutex_init(logger->mutex, NULL);
logger->cond = malloc(sizeof(*logger->cond));
pthread_cond_init(logger->cond, NULL);
+ logger->running = 1;
STAILQ_INIT(&logger->queue);
xpthread_create(&logger->thread, writer_routine, logger);
}
STAILQ_INIT(&logger->queue);
xpthread_create(&logger->thread, writer_routine, logger);
}
@@
-480,9
+491,14
@@
int main(int argc, char **argv) {
unlock(&shared_queue_mutex);
}
unlock(&shared_queue_mutex);
}
- // Wake and join threads.
+ nblogx("stopping: draining writer threads");
lock(&shared_queue_mutex);
lock(&shared_queue_mutex);
- stopping = 1;
+ {
+ struct writer_thread* writer;
+ STAILQ_FOREACH(writer, &writers, entries) {
+ writer->running = 0;
+ }
+ }
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
@@
-501,8
+517,9
@@
int main(int argc, char **argv) {
}
// Clean up logger thread.
}
// Clean up logger thread.
- nblogx("writer threads stopped");
+ nblogx("writer threads stopped
, stopping logger
");
lock(logger->mutex);
lock(logger->mutex);
+ logger->running = 0;
xpthread_cond_broadcast(logger->cond);
unlock(logger->mutex);
xpthread_join(logger->thread);
xpthread_cond_broadcast(logger->cond);
unlock(logger->mutex);
xpthread_join(logger->thread);