Add msgqueue_put_head().

This commit is contained in:
Xie Han
2024-05-26 03:18:19 +08:00
committed by xiehan
parent fa85db076a
commit fec3f6ced8
2 changed files with 53 additions and 22 deletions

View File

@@ -62,27 +62,6 @@ void msgqueue_set_block(msgqueue_t *queue)
queue->nonblock = 0;
}
static size_t __msgqueue_swap(msgqueue_t *queue)
{
void **get_head = queue->get_head;
size_t cnt;
queue->get_head = queue->put_head;
pthread_mutex_lock(&queue->put_mutex);
while (queue->msg_cnt == 0 && !queue->nonblock)
pthread_cond_wait(&queue->get_cond, &queue->put_mutex);
cnt = queue->msg_cnt;
if (cnt > queue->msg_max - 1)
pthread_cond_broadcast(&queue->put_cond);
queue->put_head = get_head;
queue->put_tail = get_head;
queue->msg_cnt = 0;
pthread_mutex_unlock(&queue->put_mutex);
return cnt;
}
void msgqueue_put(void *msg, msgqueue_t *queue)
{
void **link = (void **)((char *)msg + queue->linkoff);
@@ -99,6 +78,57 @@ void msgqueue_put(void *msg, msgqueue_t *queue)
pthread_cond_signal(&queue->get_cond);
}
void msgqueue_put_head(void *msg, msgqueue_t *queue)
{
void **link = (void **)((char *)msg + queue->linkoff);
pthread_mutex_lock(&queue->put_mutex);
while (*queue->get_head)
{
if (pthread_mutex_trylock(&queue->get_mutex) == 0)
{
pthread_mutex_unlock(&queue->put_mutex);
*link = *queue->get_head;
*queue->get_head = link;
pthread_mutex_unlock(&queue->get_mutex);
return;
}
}
while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock)
pthread_cond_wait(&queue->put_cond, &queue->put_mutex);
*link = *queue->put_head;
if (*link == NULL)
queue->put_tail = link;
*queue->put_head = link;
queue->msg_cnt++;
pthread_mutex_unlock(&queue->put_mutex);
pthread_cond_signal(&queue->get_cond);
}
static size_t __msgqueue_swap(msgqueue_t *queue)
{
void **get_head = queue->get_head;
size_t cnt;
pthread_mutex_lock(&queue->put_mutex);
while (queue->msg_cnt == 0 && !queue->nonblock)
pthread_cond_wait(&queue->get_cond, &queue->put_mutex);
cnt = queue->msg_cnt;
if (cnt > queue->msg_max - 1)
pthread_cond_broadcast(&queue->put_cond);
queue->get_head = queue->put_head;
queue->put_head = get_head;
queue->put_tail = get_head;
queue->msg_cnt = 0;
pthread_mutex_unlock(&queue->put_mutex);
return cnt;
}
void *msgqueue_get(msgqueue_t *queue)
{
void *msg;

View File

@@ -35,8 +35,9 @@ extern "C"
* 'linkoff' can be positive or negative or zero. */
msgqueue_t *msgqueue_create(size_t maxlen, int linkoff);
void msgqueue_put(void *msg, msgqueue_t *queue);
void *msgqueue_get(msgqueue_t *queue);
void msgqueue_put(void *msg, msgqueue_t *queue);
void msgqueue_put_head(void *msg, msgqueue_t *queue);
void msgqueue_set_nonblock(msgqueue_t *queue);
void msgqueue_set_block(msgqueue_t *queue);
void msgqueue_destroy(msgqueue_t *queue);