From fec3f6ced807fc7d2eb23cfa490842c8082ae90f Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Sun, 26 May 2024 03:18:19 +0800 Subject: [PATCH] Add msgqueue_put_head(). --- src/kernel/msgqueue.c | 72 ++++++++++++++++++++++++++++++------------- src/kernel/msgqueue.h | 3 +- 2 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/kernel/msgqueue.c b/src/kernel/msgqueue.c index cededb3a..1d7e94be 100644 --- a/src/kernel/msgqueue.c +++ b/src/kernel/msgqueue.c @@ -62,27 +62,6 @@ void msgqueue_set_block(msgqueue_t *queue) queue->nonblock = 0; } -static size_t __msgqueue_swap(msgqueue_t *queue) -{ - void **get_head = queue->get_head; - size_t cnt; - - queue->get_head = queue->put_head; - pthread_mutex_lock(&queue->put_mutex); - while (queue->msg_cnt == 0 && !queue->nonblock) - pthread_cond_wait(&queue->get_cond, &queue->put_mutex); - - cnt = queue->msg_cnt; - if (cnt > queue->msg_max - 1) - pthread_cond_broadcast(&queue->put_cond); - - queue->put_head = get_head; - queue->put_tail = get_head; - queue->msg_cnt = 0; - pthread_mutex_unlock(&queue->put_mutex); - return cnt; -} - void msgqueue_put(void *msg, msgqueue_t *queue) { void **link = (void **)((char *)msg + queue->linkoff); @@ -99,6 +78,57 @@ void msgqueue_put(void *msg, msgqueue_t *queue) pthread_cond_signal(&queue->get_cond); } +void msgqueue_put_head(void *msg, msgqueue_t *queue) +{ + void **link = (void **)((char *)msg + queue->linkoff); + + pthread_mutex_lock(&queue->put_mutex); + while (*queue->get_head) + { + if (pthread_mutex_trylock(&queue->get_mutex) == 0) + { + pthread_mutex_unlock(&queue->put_mutex); + *link = *queue->get_head; + *queue->get_head = link; + pthread_mutex_unlock(&queue->get_mutex); + return; + } + } + + while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) + pthread_cond_wait(&queue->put_cond, &queue->put_mutex); + + *link = *queue->put_head; + if (*link == NULL) + queue->put_tail = link; + + *queue->put_head = link; + queue->msg_cnt++; + pthread_mutex_unlock(&queue->put_mutex); + pthread_cond_signal(&queue->get_cond); +} + +static size_t __msgqueue_swap(msgqueue_t *queue) +{ + void **get_head = queue->get_head; + size_t cnt; + + pthread_mutex_lock(&queue->put_mutex); + while (queue->msg_cnt == 0 && !queue->nonblock) + pthread_cond_wait(&queue->get_cond, &queue->put_mutex); + + cnt = queue->msg_cnt; + if (cnt > queue->msg_max - 1) + pthread_cond_broadcast(&queue->put_cond); + + queue->get_head = queue->put_head; + queue->put_head = get_head; + queue->put_tail = get_head; + queue->msg_cnt = 0; + pthread_mutex_unlock(&queue->put_mutex); + return cnt; +} + void *msgqueue_get(msgqueue_t *queue) { void *msg; diff --git a/src/kernel/msgqueue.h b/src/kernel/msgqueue.h index d1105abe..3e5c1b07 100644 --- a/src/kernel/msgqueue.h +++ b/src/kernel/msgqueue.h @@ -35,8 +35,9 @@ extern "C" * 'linkoff' can be positive or negative or zero. */ msgqueue_t *msgqueue_create(size_t maxlen, int linkoff); -void msgqueue_put(void *msg, msgqueue_t *queue); void *msgqueue_get(msgqueue_t *queue); +void msgqueue_put(void *msg, msgqueue_t *queue); +void msgqueue_put_head(void *msg, msgqueue_t *queue); void msgqueue_set_nonblock(msgqueue_t *queue); void msgqueue_set_block(msgqueue_t *queue); void msgqueue_destroy(msgqueue_t *queue);