Update Unix code.

This commit is contained in:
Xie Han
2025-03-13 15:06:52 +08:00
parent 9c74ed258f
commit 026ea2947b
3 changed files with 37 additions and 44 deletions

View File

@@ -224,7 +224,7 @@ int CommService::init(const struct sockaddr *bind_addr, socklen_t addrlen,
this->addrlen = addrlen;
this->listen_timeout = listen_timeout;
this->response_timeout = response_timeout;
INIT_LIST_HEAD(&this->alive_list);
INIT_LIST_HEAD(&this->keep_alive_list);
this->ssl_ctx = NULL;
this->ssl_accept_timeout = 0;
@@ -253,9 +253,9 @@ int CommService::drain(int max)
errno_bak = errno;
pthread_mutex_lock(&this->mutex);
while (cnt != max && !list_empty(&this->alive_list))
while (cnt != max && !list_empty(&this->keep_alive_list))
{
pos = this->alive_list.next;
pos = this->keep_alive_list.prev;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
cnt++;
@@ -310,12 +310,7 @@ private:
CommService *service;
private:
virtual int create_connect_fd()
{
errno = EPERM;
return -1;
}
virtual ~CommServiceTarget() { }
friend class Communicator;
};
@@ -359,7 +354,7 @@ CommSession::~CommSession()
return;
target = (CommServiceTarget *)this->target;
if (this->passive == 2)
if (!this->out && target->has_idle_conn())
target->shutdown();
target->decref();
@@ -485,7 +480,7 @@ int Communicator::send_message_sync(struct iovec vectors[], int cnt,
if (service->listen_fd >= 0)
{
entry->state = CONN_STATE_KEEPALIVE;
list_add_tail(&entry->list, &service->alive_list);
list_add(&entry->list, &service->keep_alive_list);
entry = NULL;
}
@@ -616,7 +611,6 @@ void Communicator::handle_incoming_request(struct poller_result *res)
list_add(&entry->list, &target->idle_list);
}
session->passive = 2;
pthread_mutex_unlock(&target->mutex);
break;
@@ -794,7 +788,7 @@ void Communicator::handle_reply_result(struct poller_result *res)
if (!this->stop_flag && service->listen_fd >= 0)
{
entry->state = CONN_STATE_KEEPALIVE;
list_add_tail(&entry->list, &service->alive_list);
list_add(&entry->list, &service->keep_alive_list);
}
else
{
@@ -1070,6 +1064,7 @@ void Communicator::handle_recvfrom_result(struct poller_result *res)
{
CommService *service = (CommService *)res->data.context;
struct CommConnEntry *entry;
CommSession *session;
CommTarget *target;
int state, error;
@@ -1077,6 +1072,7 @@ void Communicator::handle_recvfrom_result(struct poller_result *res)
{
case PR_ST_SUCCESS:
entry = (struct CommConnEntry *)res->data.result;
session = entry->session;
target = entry->target;
if (entry->state == CONN_STATE_SUCCESS)
{
@@ -1094,7 +1090,7 @@ void Communicator::handle_recvfrom_result(struct poller_result *res)
error = EBADMSG;
}
entry->session->handle(state, error);
session->handle(state, error);
if (state == CS_STATE_ERROR)
{
__release_conn(entry);
@@ -1246,19 +1242,12 @@ void Communicator::handler_thread_routine(void *context)
break;
default:
free(res);
if (comm->thrdpool)
thrdpool_exit(comm->thrdpool);
continue;
thrdpool_exit(comm->thrdpool);
return;
}
free(res);
}
if (!comm->thrdpool)
{
mpoller_destroy(comm->mpoller);
msgqueue_destroy(comm->msgqueue);
}
}
int Communicator::append_message(const void *buf, size_t *size,
@@ -1612,15 +1601,12 @@ int Communicator::init(size_t poller_threads, size_t handler_threads)
void Communicator::deinit()
{
int in_handler = this->is_handler_thread();
this->stop_flag = 1;
mpoller_stop(this->mpoller);
msgqueue_set_nonblock(this->msgqueue);
thrdpool_destroy(NULL, this->thrdpool);
this->thrdpool = NULL;
if (!in_handler)
Communicator::handler_thread_routine(this);
mpoller_destroy(this->mpoller);
msgqueue_destroy(this->msgqueue);
}
int Communicator::nonblock_connect(CommTarget *target)
@@ -1969,14 +1955,19 @@ int Communicator::reply(CommSession *session)
int errno_bak;
int ret;
if (session->passive != 2)
if (!session->passive)
{
errno = EINVAL;
return -1;
}
if (session->out)
{
errno = ENOENT;
return -1;
}
errno_bak = errno;
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (target->service->reliable)
ret = this->reply_reliable(session, target);
@@ -2018,9 +2009,8 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
mutex = &in->entry->mutex;
pthread_mutex_lock(mutex);
if ((session->passive == 2 && !list_empty(&session->target->idle_list)) ||
(!session->passive && in->entry->session == session) ||
session->passive == 1)
if ((!session->passive || session->target->has_idle_conn()) &&
in->entry->session == session)
{
ret = in->inner()->feedback(buf, size);
}
@@ -2038,15 +2028,14 @@ int Communicator::shutdown(CommSession *session)
{
CommServiceTarget *target;
if (session->passive != 2)
if (!session->passive)
{
errno = EINVAL;
return -1;
}
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (!target->shutdown())
if (session->out || !target->shutdown())
{
errno = ENOENT;
return -1;
@@ -2063,7 +2052,9 @@ int Communicator::sleep(SleepSession *session)
{
if (mpoller_add_timer(&value, session, &session->timer, &session->index,
this->mpoller) >= 0)
{
return 0;
}
}
return -1;

View File

@@ -230,7 +230,7 @@ private:
int ref;
private:
struct list_head alive_list;
struct list_head keep_alive_list;
pthread_mutex_t mutex;
public:

View File

@@ -1600,6 +1600,7 @@ int poller_add_timer(const struct timespec *value, void *context, void **timer,
int poller_del_timer(void *timer, poller_t *poller)
{
struct __poller_node *node = (struct __poller_node *)timer;
int stopped = 0;
pthread_mutex_lock(&poller->mutex);
if (!node->removed)
@@ -1610,6 +1611,12 @@ int poller_del_timer(void *timer, poller_t *poller)
__poller_tree_erase(node, poller);
else
list_del(&node->list);
node->error = 0;
node->state = PR_ST_DELETED;
stopped = poller->stopped;
if (!stopped)
write(poller->pipe_wr, &node, sizeof (void *));
}
else
{
@@ -1618,15 +1625,10 @@ int poller_del_timer(void *timer, poller_t *poller)
}
pthread_mutex_unlock(&poller->mutex);
if (node)
{
node->error = 0;
node->state = PR_ST_DELETED;
if (stopped)
poller->callback((struct poller_result *)node, poller->context);
return 0;
}
return -1;
return -!node;
}
void poller_stop(poller_t *poller)