Update linux codes

This commit is contained in:
Xie Han
2023-11-07 20:20:35 +08:00
parent ec8062a110
commit 96e49a141c
8 changed files with 342 additions and 227 deletions

View File

@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.6)
project(kernel)
if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
if (CMAKE_SYSTEM_NAME STREQUAL "Linux" OR CMAKE_SYSTEM_NAME STREQUAL "Android")
set(IOSERVICE_FILE IOService_linux.cc)
elseif (UNIX)
set(IOSERVICE_FILE IOService_thread.cc)

View File

@@ -144,6 +144,11 @@ public:
return this->comm.reply(session);
}
int shutdown(CommSession *session)
{
return this->comm.shutdown(session);
}
int push(const void *buf, size_t size, CommSession *session)
{
return this->comm.push(buf, size, session);
@@ -165,6 +170,12 @@ public:
return this->comm.sleep(session);
}
/* Call 'unsleep' only before 'handle()' returns. */
int unsleep(SleepSession *session)
{
return this->comm.unsleep(session);
}
/* for file aio services. */
int io_bind(IOService *service)
{

View File

@@ -340,8 +340,11 @@ CommSession::~CommSession()
{
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
errno_bak = errno;
mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
errno = errno_bak;
}
@@ -1095,12 +1098,22 @@ void Communicator::handle_sleep_result(struct poller_result *res)
SleepSession *session = (SleepSession *)res->data.context;
int state;
if (res->state == PR_ST_STOPPED)
state = SS_STATE_DISRUPTED;
else
switch (res->state)
{
case PR_ST_FINISHED:
state = SS_STATE_COMPLETE;
break;
case PR_ST_DELETED:
res->error = ECANCELED;
case PR_ST_ERROR:
state = SS_STATE_ERROR;
break;
case PR_ST_STOPPED:
state = SS_STATE_DISRUPTED;
break;
}
session->handle(state, 0);
session->handle(state, res->error);
}
void Communicator::handle_aio_result(struct poller_result *res)
@@ -1799,19 +1812,56 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
return ret;
}
int Communicator::shutdown(CommSession *session)
{
CommTarget *target = session->target;
struct CommConnEntry *entry;
int ret;
if (session->passive != 1)
{
errno = session->passive ? ENOENT : EPERM;
return -1;
}
session->passive = 2;
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
{
entry = list_entry(target->idle_list.next, struct CommConnEntry, list);
list_del(&entry->list);
ret = mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
}
else
{
errno = ENOENT;
ret = -1;
}
pthread_mutex_unlock(&target->mutex);
return ret;
}
int Communicator::sleep(SleepSession *session)
{
struct timespec value;
if (session->duration(&value) >= 0)
{
if (mpoller_add_timer(&value, session, this->mpoller) >= 0)
if (mpoller_add_timer(&value, session, &session->timer, &session->index,
this->mpoller) >= 0)
return 0;
}
return -1;
}
int Communicator::unsleep(SleepSession *session)
{
return mpoller_del_timer(session->timer, session->index, this->mpoller);
}
int Communicator::is_handler_thread() const
{
return thrdpool_in_pool(this->thrdpool);

View File

@@ -246,6 +246,10 @@ private:
virtual int duration(struct timespec *value) = 0;
virtual void handle(int state, int error) = 0;
private:
void *timer;
int index;
public:
virtual ~SleepSession() { }
friend class Communicator;
@@ -268,10 +272,13 @@ public:
int push(const void *buf, size_t size, CommSession *session);
int shutdown(CommSession *session);
int bind(CommService *service);
void unbind(CommService *service);
int sleep(SleepSession *session);
int unsleep(SleepSession *session);
int io_bind(IOService *service);
void io_unbind(IOService *service);

View File

@@ -39,6 +39,12 @@ public:
this->handle(SS_STATE_ERROR, errno);
}
protected:
int cancel()
{
return this->scheduler->unsleep(this);
}
protected:
int state;
int error;

View File

@@ -48,35 +48,41 @@ struct __mpoller
static inline int mpoller_add(const struct poller_data *data, int timeout,
mpoller_t *mpoller)
{
unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
int index = (unsigned int)data->fd % mpoller->nthreads;
return poller_add(data, timeout, mpoller->poller[index]);
}
static inline int mpoller_del(int fd, mpoller_t *mpoller)
{
unsigned int index = (unsigned int)fd % mpoller->nthreads;
int index = (unsigned int)fd % mpoller->nthreads;
return poller_del(fd, mpoller->poller[index]);
}
static inline int mpoller_mod(const struct poller_data *data, int timeout,
mpoller_t *mpoller)
{
unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
int index = (unsigned int)data->fd % mpoller->nthreads;
return poller_mod(data, timeout, mpoller->poller[index]);
}
static inline int mpoller_set_timeout(int fd, int timeout, mpoller_t *mpoller)
{
unsigned int index = (unsigned int)fd % mpoller->nthreads;
int index = (unsigned int)fd % mpoller->nthreads;
return poller_set_timeout(fd, timeout, mpoller->poller[index]);
}
static inline int mpoller_add_timer(const struct timespec *value, void *context,
void **timer, int *index,
mpoller_t *mpoller)
{
static unsigned int n = 0;
unsigned int index = n++ % mpoller->nthreads;
return poller_add_timer(value, context, mpoller->poller[index]);
*index = n++ % mpoller->nthreads;
return poller_add_timer(value, context, timer, mpoller->poller[*index]);
}
static inline int mpoller_del_timer(void *timer, int index, mpoller_t *mpoller)
{
return poller_del_timer(timer, mpoller->poller[index]);
}
#endif

View File

@@ -30,13 +30,11 @@
#include <errno.h>
#include <limits.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "list.h"
#include "rbtree.h"
#include "poller.h"
@@ -66,8 +64,8 @@ struct __poller_node
struct __poller
{
size_t max_open_files;
void (*cb)(struct poller_result *, void *);
void *ctx;
void (*callback)(struct poller_result *, void *);
void *context;
pthread_t tid;
int pfd;
@@ -127,6 +125,11 @@ static inline int __poller_create_timerfd()
return timerfd_create(CLOCK_MONOTONIC, 0);
}
static inline int __poller_close_timerfd(int fd)
{
return close(fd);
}
static inline int __poller_add_timerfd(int fd, poller_t *poller)
{
struct epoll_event ev = {
@@ -195,7 +198,12 @@ static inline int __poller_mod_fd(int fd, int old_event,
static inline int __poller_create_timerfd()
{
return dup(0);
return 0;
}
static inline int __poller_close_timerfd(int fd)
{
return 0;
}
static inline int __poller_add_timerfd(int fd, poller_t *poller)
@@ -365,7 +373,7 @@ static int __poller_append_message(const void *buf, size_t *n,
res->data = node->data;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
poller->callback((struct poller_result *)res, poller->context);
node->data.message = NULL;
node->res = NULL;
@@ -474,7 +482,7 @@ static void __poller_handle_read(struct __poller_node *node,
}
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
#ifndef IOV_MAX
@@ -565,7 +573,7 @@ static void __poller_handle_write(struct __poller_node *node,
node->state = PR_ST_ERROR;
}
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_listen(struct __poller_node *node,
@@ -600,7 +608,7 @@ static void __poller_handle_listen(struct __poller_node *node,
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
poller->callback((struct poller_result *)res, poller->context);
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
@@ -614,7 +622,7 @@ static void __poller_handle_listen(struct __poller_node *node,
node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_connect(struct __poller_node *node,
@@ -640,7 +648,7 @@ static void __poller_handle_connect(struct __poller_node *node,
node->state = PR_ST_ERROR;
}
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_ssl_accept(struct __poller_node *node,
@@ -668,7 +676,7 @@ static void __poller_handle_ssl_accept(struct __poller_node *node,
node->state = PR_ST_ERROR;
}
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_ssl_connect(struct __poller_node *node,
@@ -696,7 +704,7 @@ static void __poller_handle_ssl_connect(struct __poller_node *node,
node->state = PR_ST_ERROR;
}
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_ssl_shutdown(struct __poller_node *node,
@@ -724,7 +732,7 @@ static void __poller_handle_ssl_shutdown(struct __poller_node *node,
node->state = PR_ST_ERROR;
}
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_event(struct __poller_node *node,
@@ -765,7 +773,7 @@ static void __poller_handle_event(struct __poller_node *node,
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
poller->callback((struct poller_result *)res, poller->context);
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
@@ -783,7 +791,7 @@ static void __poller_handle_event(struct __poller_node *node,
node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_notify(struct __poller_node *node,
@@ -806,7 +814,7 @@ static void __poller_handle_notify(struct __poller_node *node,
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
poller->callback((struct poller_result *)res, poller->context);
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
@@ -838,7 +846,7 @@ static void __poller_handle_notify(struct __poller_node *node,
}
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static void __poller_handle_recvfrom(struct __poller_node *node,
@@ -873,7 +881,7 @@ static void __poller_handle_recvfrom(struct __poller_node *node,
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
poller->callback((struct poller_result *)res, poller->context);
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
@@ -887,7 +895,7 @@ static void __poller_handle_recvfrom(struct __poller_node *node,
node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
static int __poller_handle_pipe(poller_t *poller)
@@ -903,7 +911,7 @@ static int __poller_handle_pipe(poller_t *poller)
if (node[i])
{
free(node[i]->res);
poller->cb((struct poller_result *)node[i], poller->ctx);
poller->callback((struct poller_result *)node[i], poller->context);
}
else
stop = 1;
@@ -923,56 +931,58 @@ static void __poller_handle_timeout(const struct __poller_node *time_node,
list_for_each_safe(pos, tmp, &poller->timeo_list)
{
node = list_entry(pos, struct __poller_node, list);
if (__timeout_cmp(node, time_node) <= 0)
{
if (node->data.fd >= 0)
{
poller->nodes[node->data.fd] = NULL;
__poller_del_fd(node->data.fd, node->event, poller);
}
if (__timeout_cmp(node, time_node) > 0)
break;
list_move_tail(pos, &timeo_list);
if (node->data.fd >= 0)
{
poller->nodes[node->data.fd] = NULL;
__poller_del_fd(node->data.fd, node->event, poller);
}
else
break;
node->removed = 1;
list_move_tail(pos, &timeo_list);
}
if (poller->tree_first)
while (poller->tree_first)
{
while (1)
{
node = rb_entry(poller->tree_first, struct __poller_node, rb);
if (__timeout_cmp(node, time_node) < 0)
{
if (node->data.fd >= 0)
{
poller->nodes[node->data.fd] = NULL;
__poller_del_fd(node->data.fd, node->event, poller);
}
poller->tree_first = rb_next(poller->tree_first);
rb_erase(&node->rb, &poller->timeo_tree);
list_add_tail(&node->list, &timeo_list);
if (poller->tree_first)
continue;
poller->tree_last = NULL;
}
node = rb_entry(poller->tree_first, struct __poller_node, rb);
if (__timeout_cmp(node, time_node) > 0)
break;
if (node->data.fd >= 0)
{
poller->nodes[node->data.fd] = NULL;
__poller_del_fd(node->data.fd, node->event, poller);
}
else
node->removed = 1;
poller->tree_first = rb_next(poller->tree_first);
rb_erase(&node->rb, &poller->timeo_tree);
list_add_tail(&node->list, &timeo_list);
if (!poller->tree_first)
poller->tree_last = NULL;
}
pthread_mutex_unlock(&poller->mutex);
while (!list_empty(&timeo_list))
list_for_each_safe(pos, tmp, &timeo_list)
{
node = list_entry(timeo_list.next, struct __poller_node, list);
list_del(&node->list);
node = list_entry(pos, struct __poller_node, list);
if (node->data.fd >= 0)
{
node->error = ETIMEDOUT;
node->state = PR_ST_ERROR;
}
else
{
node->error = 0;
node->state = PR_ST_FINISHED;
}
node->error = ETIMEDOUT;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
poller->callback((struct poller_result *)node, poller->context);
}
}
@@ -1024,44 +1034,46 @@ static void *__poller_thread_routine(void *arg)
for (i = 0; i < nevents; i++)
{
node = (struct __poller_node *)__poller_event_data(&events[i]);
if (node > (struct __poller_node *)1)
if (node <= (struct __poller_node *)1)
{
switch (node->data.operation)
{
case PD_OP_READ:
__poller_handle_read(node, poller);
break;
case PD_OP_WRITE:
__poller_handle_write(node, poller);
break;
case PD_OP_LISTEN:
__poller_handle_listen(node, poller);
break;
case PD_OP_CONNECT:
__poller_handle_connect(node, poller);
break;
case PD_OP_SSL_ACCEPT:
__poller_handle_ssl_accept(node, poller);
break;
case PD_OP_SSL_CONNECT:
__poller_handle_ssl_connect(node, poller);
break;
case PD_OP_SSL_SHUTDOWN:
__poller_handle_ssl_shutdown(node, poller);
break;
case PD_OP_EVENT:
__poller_handle_event(node, poller);
break;
case PD_OP_NOTIFY:
__poller_handle_notify(node, poller);
break;
case PD_OP_RECVFROM:
__poller_handle_recvfrom(node, poller);
break;
}
if (node == (struct __poller_node *)1)
has_pipe_event = 1;
continue;
}
switch (node->data.operation)
{
case PD_OP_READ:
__poller_handle_read(node, poller);
break;
case PD_OP_WRITE:
__poller_handle_write(node, poller);
break;
case PD_OP_LISTEN:
__poller_handle_listen(node, poller);
break;
case PD_OP_CONNECT:
__poller_handle_connect(node, poller);
break;
case PD_OP_SSL_ACCEPT:
__poller_handle_ssl_accept(node, poller);
break;
case PD_OP_SSL_CONNECT:
__poller_handle_ssl_connect(node, poller);
break;
case PD_OP_SSL_SHUTDOWN:
__poller_handle_ssl_shutdown(node, poller);
break;
case PD_OP_EVENT:
__poller_handle_event(node, poller);
break;
case PD_OP_NOTIFY:
__poller_handle_notify(node, poller);
break;
case PD_OP_RECVFROM:
__poller_handle_recvfrom(node, poller);
break;
}
else if (node == (struct __poller_node *)1)
has_pipe_event = 1;
}
if (has_pipe_event)
@@ -1073,13 +1085,6 @@ static void *__poller_thread_routine(void *arg)
__poller_handle_timeout(&time_node, poller);
}
#if OPENSSL_VERSION_NUMBER < 0x10100000L
# ifdef CRYPTO_LOCK_ECDH
ERR_remove_thread_state(NULL);
# else
ERR_remove_state(0);
# endif
#endif
return NULL;
}
@@ -1115,7 +1120,7 @@ static int __poller_create_timer(poller_t *poller)
return 0;
}
close(timerfd);
__poller_close_timerfd(timerfd);
}
return -1;
@@ -1139,8 +1144,8 @@ poller_t *__poller_create(void **nodes_buf, const struct poller_params *params)
{
poller->nodes = (struct __poller_node **)nodes_buf;
poller->max_open_files = params->max_open_files;
poller->cb = params->callback;
poller->ctx = params->context;
poller->callback = params->callback;
poller->context = params->context;
poller->timeo_tree.rb_node = NULL;
poller->tree_first = NULL;
@@ -1183,7 +1188,7 @@ poller_t *poller_create(const struct poller_params *params)
void __poller_destroy(poller_t *poller)
{
pthread_mutex_destroy(&poller->mutex);
close(poller->timerfd);
__poller_close_timerfd(poller->timerfd);
close(poller->pfd);
free(poller);
}
@@ -1301,7 +1306,8 @@ static int __poller_data_get_event(int *event, const struct poller_data *data)
}
}
int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
static struct __poller_node *__poller_new_node(const struct poller_data *data,
int timeout, poller_t *poller)
{
struct __poller_node *res = NULL;
struct __poller_node *node;
@@ -1311,18 +1317,18 @@ int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
if ((size_t)data->fd >= poller->max_open_files)
{
errno = data->fd < 0 ? EBADF : EMFILE;
return -1;
return NULL;
}
need_res = __poller_data_get_event(&event, data);
if (need_res < 0)
return -1;
return NULL;
if (need_res)
{
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
if (!res)
return -1;
return NULL;
}
node = (struct __poller_node *)malloc(sizeof (struct __poller_node));
@@ -1335,36 +1341,49 @@ int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
node->res = res;
if (timeout >= 0)
__poller_node_set_timeout(timeout, node);
pthread_mutex_lock(&poller->mutex);
if (!poller->nodes[data->fd])
{
if (__poller_add_fd(data->fd, event, node, poller) >= 0)
{
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
poller->nodes[data->fd] = node;
node = NULL;
}
}
pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;
free(node);
}
free(res);
return node;
}
int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
{
struct __poller_node *node;
node = __poller_new_node(data, timeout, poller);
if (!node)
return -1;
pthread_mutex_lock(&poller->mutex);
if (!poller->nodes[data->fd])
{
if (__poller_add_fd(data->fd, node->event, node, poller) >= 0)
{
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
poller->nodes[data->fd] = node;
node = NULL;
}
}
else
errno = EEXIST;
pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;
free(node->res);
free(node);
return -1;
}
int poller_del(int fd, poller_t *poller)
{
struct __poller_node *node;
int stopped = 0;
if ((size_t)fd >= poller->max_open_files)
{
@@ -1387,12 +1406,8 @@ int poller_del(int fd, poller_t *poller)
node->error = 0;
node->state = PR_ST_DELETED;
if (poller->stopped)
{
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}
else
stopped = poller->stopped;
if (!stopped)
{
node->removed = 1;
write(poller->pipe_wr, &node, sizeof (void *));
@@ -1402,89 +1417,69 @@ int poller_del(int fd, poller_t *poller)
errno = ENOENT;
pthread_mutex_unlock(&poller->mutex);
if (stopped)
{
free(node->res);
poller->callback((struct poller_result *)node, poller->context);
}
return -!node;
}
int poller_mod(const struct poller_data *data, int timeout, poller_t *poller)
{
struct __poller_node *res = NULL;
struct __poller_node *node;
struct __poller_node *old;
int need_res;
int event;
struct __poller_node *orig;
int stopped = 0;
if ((size_t)data->fd >= poller->max_open_files)
{
errno = data->fd < 0 ? EBADF : EMFILE;
return -1;
}
need_res = __poller_data_get_event(&event, data);
if (need_res < 0)
node = __poller_new_node(data, timeout, poller);
if (!node)
return -1;
if (need_res)
pthread_mutex_lock(&poller->mutex);
orig = poller->nodes[data->fd];
if (orig)
{
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
if (!res)
return -1;
}
node = (struct __poller_node *)malloc(sizeof (struct __poller_node));
if (node)
{
node->data = *data;
node->event = event;
node->in_rbtree = 0;
node->removed = 0;
node->res = res;
if (timeout >= 0)
__poller_node_set_timeout(timeout, node);
pthread_mutex_lock(&poller->mutex);
old = poller->nodes[data->fd];
if (old)
if (__poller_mod_fd(data->fd, orig->event, node->event, node, poller) >= 0)
{
if (__poller_mod_fd(data->fd, old->event, event, node, poller) >= 0)
if (orig->in_rbtree)
__poller_tree_erase(orig, poller);
else
list_del(&orig->list);
orig->error = 0;
orig->state = PR_ST_MODIFIED;
stopped = poller->stopped;
if (!stopped)
{
if (old->in_rbtree)
__poller_tree_erase(old, poller);
else
list_del(&old->list);
old->error = 0;
old->state = PR_ST_MODIFIED;
if (poller->stopped)
{
free(old->res);
poller->cb((struct poller_result *)old, poller->ctx);
}
else
{
old->removed = 1;
write(poller->pipe_wr, &old, sizeof (void *));
}
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
poller->nodes[data->fd] = node;
node = NULL;
orig->removed = 1;
write(poller->pipe_wr, &orig, sizeof (void *));
}
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
poller->nodes[data->fd] = node;
node = NULL;
}
else
errno = ENOENT;
}
else
errno = ENOENT;
pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;
free(node);
pthread_mutex_unlock(&poller->mutex);
if (stopped)
{
free(orig->res);
poller->callback((struct poller_result *)orig, poller->context);
}
free(res);
if (node == NULL)
return 0;
free(node->res);
free(node);
return -1;
}
@@ -1526,7 +1521,7 @@ int poller_set_timeout(int fd, int timeout, poller_t *poller)
return -!node;
}
int poller_add_timer(const struct timespec *value, void *context,
int poller_add_timer(const struct timespec *value, void *context, void **timer,
poller_t *poller)
{
struct __poller_node *node;
@@ -1551,6 +1546,7 @@ int poller_add_timer(const struct timespec *value, void *context,
node->timeout.tv_sec++;
}
*timer = node;
pthread_mutex_lock(&poller->mutex);
__poller_insert_node(node, poller);
pthread_mutex_unlock(&poller->mutex);
@@ -1560,10 +1556,45 @@ int poller_add_timer(const struct timespec *value, void *context,
return -1;
}
int poller_del_timer(void *timer, poller_t *poller)
{
struct __poller_node *node = (struct __poller_node *)timer;
int stopped = 0;
pthread_mutex_lock(&poller->mutex);
if (!node->removed)
{
node->removed = 1;
if (node->in_rbtree)
__poller_tree_erase(node, poller);
else
list_del(&node->list);
node->error = 0;
node->state = PR_ST_DELETED;
stopped = poller->stopped;
if (!stopped)
write(poller->pipe_wr, &node, sizeof (void *));
}
else
{
errno = ENOENT;
node = NULL;
}
pthread_mutex_unlock(&poller->mutex);
if (stopped)
poller->callback((struct poller_result *)node, poller->context);
return -!node;
}
void poller_stop(poller_t *poller)
{
struct __poller_node *node;
struct list_head *pos, *tmp;
LIST_HEAD(node_list);
void *p = NULL;
write(poller->pipe_wr, &p, sizeof (void *));
@@ -1571,8 +1602,6 @@ void poller_stop(poller_t *poller)
poller->stopped = 1;
pthread_mutex_lock(&poller->mutex);
poller->nodes[poller->pipe_rd] = NULL;
poller->nodes[poller->pipe_wr] = NULL;
close(poller->pipe_wr);
__poller_handle_pipe(poller);
close(poller->pipe_rd);
@@ -1583,26 +1612,31 @@ void poller_stop(poller_t *poller)
{
node = rb_entry(poller->timeo_tree.rb_node, struct __poller_node, rb);
rb_erase(&node->rb, &poller->timeo_tree);
list_add(&node->list, &poller->timeo_list);
list_add(&node->list, &node_list);
}
list_splice_init(&poller->no_timeo_list, &poller->timeo_list);
list_for_each_safe(pos, tmp, &poller->timeo_list)
list_splice_init(&poller->timeo_list, &node_list);
list_splice_init(&poller->no_timeo_list, &node_list);
list_for_each(pos, &node_list)
{
node = list_entry(pos, struct __poller_node, list);
list_del(&node->list);
if (node->data.fd >= 0)
{
poller->nodes[node->data.fd] = NULL;
__poller_del_fd(node->data.fd, node->event, poller);
}
node->error = 0;
node->state = PR_ST_STOPPED;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
else
node->removed = 1;
}
pthread_mutex_unlock(&poller->mutex);
list_for_each_safe(pos, tmp, &node_list)
{
node = list_entry(pos, struct __poller_node, list);
node->error = 0;
node->state = PR_ST_STOPPED;
free(node->res);
poller->callback((struct poller_result *)node, poller->context);
}
}

View File

@@ -103,8 +103,9 @@ int poller_add(const struct poller_data *data, int timeout, poller_t *poller);
int poller_del(int fd, poller_t *poller);
int poller_mod(const struct poller_data *data, int timeout, poller_t *poller);
int poller_set_timeout(int fd, int timeout, poller_t *poller);
int poller_add_timer(const struct timespec *value, void *context,
int poller_add_timer(const struct timespec *value, void *context, void **timer,
poller_t *poller);
int poller_del_timer(void *timer, poller_t *poller);
void poller_stop(poller_t *poller);
void poller_destroy(poller_t *poller);