projects
/
buftee
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
a79b289
)
Move shared mutex and cond from global to per-writer.
author
Emil Mikulic
<emikulic@gmail.com>
Sun, 24 Mar 2013 08:26:55 +0000
(19:26 +1100)
committer
Emil Mikulic
<emikulic@gmail.com>
Sun, 24 Mar 2013 15:49:17 +0000
(
02:49
+1100)
buftee.c
patch
|
blob
|
history
diff --git
a/buftee.c
b/buftee.c
index
8e430b6
..
1f22a6a
100644
(file)
--- a/
buftee.c
+++ b/
buftee.c
@@
-51,19
+51,9
@@
#define READ_BUF_SIZE 4096
#define SLOW_NSEC 4000
#define READ_BUF_SIZE 4096
#define SLOW_NSEC 4000
-// *** GLOBALS *****************************************************************
-
-// All queues are locked through one global mutex.
-static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// When a writer runs out of work to do, it sleeps on this global cond.
-static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
-
// Asserted on receipt of SIGTERM, SIGINT.
static volatile int stopping = 0;
// Asserted on receipt of SIGTERM, SIGINT.
static volatile int stopping = 0;
-// *** (end globals) ***********************************************************
-
static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
stopping = 1;
}
static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
stopping = 1;
}
@@
-90,6
+80,12
@@
struct writer_thread {
// Each writer has its own queue.
struct buf_queue queue;
// Each writer has its own queue.
struct buf_queue queue;
+ // Pointer to a shared mutex which protects all queues.
+ pthread_mutex_t* mutex;
+
+ // When a writer runs out of work to do, it sleeps on a shared condition.
+ pthread_cond_t* cond;
+
STAILQ_ENTRY(writer_thread) entries;
};
STAILQ_ENTRY(writer_thread) entries;
};
@@
-298,15
+294,15
@@
static void unlock(pthread_mutex_t* mutex) {
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
static void* writer_routine(void *arg) {
struct writer_thread* my = arg;
struct buf* buf = NULL;
- lock(
&shared_queue_
mutex);
+ lock(
my->
mutex);
for (;;) {
while (!stopping && STAILQ_EMPTY(&my->queue)) {
// Sleep.
for (;;) {
while (!stopping && STAILQ_EMPTY(&my->queue)) {
// Sleep.
- pthread_cond_wait(
&shared_wakeup_cond, &shared_queue_
mutex);
+ pthread_cond_wait(
my->cond, my->
mutex);
}
if (!STAILQ_EMPTY(&my->queue))
buf = dequeue(&my->queue);
}
if (!STAILQ_EMPTY(&my->queue))
buf = dequeue(&my->queue);
- unlock(
&shared_queue_
mutex);
+ unlock(
my->
mutex);
if (stopping) break;
assert(buf != NULL);
if (stopping) break;
assert(buf != NULL);
@@
-319,20
+315,25
@@
static void* writer_routine(void *arg) {
assert(write_ret == buf->len);
// Unreference buffer, freeing it if we have to.
assert(write_ret == buf->len);
// Unreference buffer, freeing it if we have to.
- lock(
&shared_queue_
mutex);
+ lock(
my->
mutex);
unref_buf(buf);
}
warnx("thread exiting cleanly");
return NULL;
}
unref_buf(buf);
}
warnx("thread exiting cleanly");
return NULL;
}
-static void add_writer_thread(struct writer_thread_list* list, const int fd) {
- set_nonblocking(fd);
+static void add_writer_thread(struct writer_thread_list* list,
+ int fd,
+ pthread_mutex_t* shared_queue_mutex,
+ pthread_cond_t* shared_wakeup_cond) {
struct writer_thread* writer = malloc(sizeof(*writer));
struct writer_thread* writer = malloc(sizeof(*writer));
+ set_nonblocking(fd);
writer->fd = fd;
writer->fd = fd;
- STAILQ_INIT(&(writer->queue));
+ writer->mutex = shared_queue_mutex;
+ writer->cond = shared_wakeup_cond;
+ STAILQ_INIT(&writer->queue);
STAILQ_INSERT_TAIL(list, writer, entries);
STAILQ_INSERT_TAIL(list, writer, entries);
- xpthread_create(&
(writer->thread)
, writer_routine, writer);
+ xpthread_create(&
writer->thread
, writer_routine, writer);
}
static void xpthread_cond_broadcast(pthread_cond_t* cond) {
}
static void xpthread_cond_broadcast(pthread_cond_t* cond) {
@@
-354,6
+355,9
@@
static void sig_continue(int _ignored_ __attribute__((__unused__))) {
}
int main(int argc, char **argv) {
}
int main(int argc, char **argv) {
+ pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+ pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
+
struct writer_thread_list writers;
STAILQ_INIT(&writers);
struct writer_thread_list writers;
STAILQ_INIT(&writers);
@@
-365,11
+369,17
@@
int main(int argc, char **argv) {
// On Linux, making STDOUT non-blocking has the side-effect of
// also making STDIN nonblocking.
// On Linux, making STDOUT non-blocking has the side-effect of
// also making STDIN nonblocking.
- add_writer_thread(&writers, STDOUT_FILENO);
+ add_writer_thread(&writers,
+ STDOUT_FILENO,
+ &shared_queue_mutex,
+ &shared_wakeup_cond);
// Process cmdline args.
for (int i = 1; i < argc; i++) {
// Process cmdline args.
for (int i = 1; i < argc; i++) {
- add_writer_thread(&writers, make_file(argv[i]));
+ add_writer_thread(&writers,
+ make_file(argv[i]),
+ &shared_queue_mutex,
+ &shared_wakeup_cond);
}
// Reader loop.
}
// Reader loop.
@@
-392,19
+402,23
@@
int main(int argc, char **argv) {
// Enqueue.
lock(&shared_queue_mutex);
struct writer_thread* writer;
// Enqueue.
lock(&shared_queue_mutex);
struct writer_thread* writer;
- STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf);
+ STAILQ_FOREACH(writer, &writers, entries) {
+ enqueue(&(writer->queue), buf);
+ }
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
}
// Wake and join threads.
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
}
// Wake and join threads.
- stopping = 1;
lock(&shared_queue_mutex);
lock(&shared_queue_mutex);
+ stopping = 1;
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
struct writer_thread* writer;
xpthread_cond_broadcast(&shared_wakeup_cond);
unlock(&shared_queue_mutex);
{
struct writer_thread* writer;
- STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread);
+ STAILQ_FOREACH(writer, &writers, entries) {
+ xpthread_join(writer->thread);
+ }
}
// Free writer list.
}
// Free writer list.