Refactor thrdpool using msgqueue.

This commit is contained in:
XieHan
2022-06-21 22:53:54 +08:00
parent d4097d1fcf
commit f13f42bda6

View File

@@ -20,12 +20,12 @@
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include "list.h"
#include "msgqueue.h"
#include "thrdpool.h"
struct __thrdpool
{
struct list_head task_queue;
msgqueue_t *msgqueue;
size_t nthreads;
size_t stacksize;
pthread_t tid;
@@ -37,7 +37,7 @@ struct __thrdpool
struct __thrdpool_task_entry
{
struct list_head list;
void *link;
struct thrdpool_task task;
};
@@ -46,26 +46,18 @@ static pthread_t __zero_tid;
static void *__thrdpool_routine(void *arg)
{
thrdpool_t *pool = (thrdpool_t *)arg;
struct list_head **pos = &pool->task_queue.next;
struct __thrdpool_task_entry *entry;
void (*task_routine)(void *);
void *task_context;
pthread_t tid;
pthread_setspecific(pool->key, pool);
while (1)
while (!pool->terminate)
{
pthread_mutex_lock(&pool->mutex);
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex);
if (pool->terminate)
entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);
if (!entry)
break;
entry = list_entry(*pos, struct __thrdpool_task_entry, list);
list_del(*pos);
pthread_mutex_unlock(&pool->mutex);
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
@@ -80,6 +72,7 @@ static void *__thrdpool_routine(void *arg)
}
/* One thread joins another. Don't need to keep all thread IDs. */
pthread_mutex_lock(&pool->mutex);
tid = pool->tid;
pool->tid = pthread_self();
if (--pool->nthreads == 0)
@@ -121,8 +114,8 @@ static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex);
msgqueue_set_nonblock(pool->msgqueue);
pool->terminate = &term;
pthread_cond_broadcast(&pool->cond);
if (in_pool)
{
@@ -177,14 +170,17 @@ thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
int ret;
pool = (thrdpool_t *)malloc(sizeof (thrdpool_t));
if (pool)
if (!pool)
return NULL;
if (__thrdpool_init_locks(pool) >= 0)
{
if (__thrdpool_init_locks(pool) >= 0)
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
pool->msgqueue = msgqueue_create((size_t)-1, 0);
if (pool->msgqueue)
{
INIT_LIST_HEAD(&pool->task_queue);
pool->stacksize = stacksize;
pool->nthreads = 0;
memset(&pool->tid, 0, sizeof (pthread_t));
@@ -192,17 +188,18 @@ thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
pthread_key_delete(pool->key);
msgqueue_destroy(pool->msgqueue);
}
else
errno = ret;
__thrdpool_destroy_locks(pool);
pthread_key_delete(pool->key);
}
else
errno = ret;
free(pool);
__thrdpool_destroy_locks(pool);
}
free(pool);
return NULL;
}
@@ -212,13 +209,8 @@ inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
entry->task = *task;
pthread_mutex_lock(&pool->mutex);
list_add_tail(&entry->list, &pool->task_queue);
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
((struct __thrdpool_task_entry *)buf)->task = *task;
msgqueue_put(buf, pool->msgqueue);
}
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool)
@@ -271,19 +263,22 @@ void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
{
int in_pool = thrdpool_in_pool(pool);
struct __thrdpool_task_entry *entry;
struct list_head *pos, *tmp;
__thrdpool_terminate(in_pool, pool);
list_for_each_safe(pos, tmp, &pool->task_queue)
while (1)
{
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);
if (!entry)
break;
if (pending)
pending(&entry->task);
free(entry);
}
msgqueue_destroy(pool->msgqueue);
pthread_key_delete(pool->key);
__thrdpool_destroy_locks(pool);
if (!in_pool)