Fix Communicator::shutdown() for UDP service session.

This commit is contained in:
Xie Han
2024-03-03 21:54:13 +08:00
parent 851a6b4d4d
commit 797e2d1014

View File

@@ -348,43 +348,34 @@ CommSession::~CommSession()
target = this->target;
if (this->passive == 1)
{
if (this->reliable)
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
{
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
{
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
if (this->reliable)
{
errno_bak = errno;
mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
errno = errno_bak;
}
pthread_mutex_unlock(&target->mutex);
}
else
{
if (!list_empty(&target->idle_list))
else
{
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
if (__sync_sub_and_fetch(&entry->ref, 1) == 0)
{
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
}
pthread_mutex_unlock(&target->mutex);
}
((CommServiceTarget *)target)->decref();
}
inline int Communicator::first_timeout(CommSession *session)
{
int timeout = session->target->response_timeout;
@@ -1099,7 +1090,6 @@ void Communicator::handle_recvfrom_result(struct poller_result *res)
{
state = CS_STATE_TOREPLY;
error = 0;
__sync_add_and_fetch(&entry->ref, 1);
entry->state = CONN_STATE_IDLE;
list_add(&entry->list, &target->idle_list);
}
@@ -1113,7 +1103,7 @@ void Communicator::handle_recvfrom_result(struct poller_result *res)
}
entry->session->handle(state, error);
if (__sync_sub_and_fetch(&entry->ref, 1) == 0)
if (state == CS_STATE_ERROR)
{
__release_conn(entry);
((CommServiceTarget *)target)->decref();
@@ -1513,25 +1503,25 @@ void *Communicator::recvfrom(const struct sockaddr *addr, socklen_t addrlen,
int sockfd;
sockfd = dup(service->listen_fd);
if (sockfd < 0)
return NULL;
result = Communicator::accept(addr, addrlen, sockfd, context);
if (result)
if (sockfd >= 0)
{
target = (CommServiceTarget *)result;
entry = Communicator::accept_conn(target, service);
if (entry)
result = Communicator::accept(addr, addrlen, sockfd, context);
if (result)
{
if (Communicator::recv_request(buf, size, entry) >= 0)
return entry;
target = (CommServiceTarget *)result;
entry = Communicator::accept_conn(target, service);
if (entry)
{
if (Communicator::recv_request(buf, size, entry) >= 0)
return entry;
__release_conn(entry);
__release_conn(entry);
}
else
close(sockfd);
target->decref();
}
else
close(sockfd);
target->decref();
}
return NULL;
@@ -1760,7 +1750,7 @@ int Communicator::request_new_conn(CommSession *session, CommTarget *target)
entry = Communicator::launch_conn(session, target);
if (entry)
{
entry->mpoller = mpoller;
entry->mpoller = this->mpoller;
session->conn = entry->conn;
session->seq = entry->seq++;
data.operation = PD_OP_CONNECT;
@@ -1961,12 +1951,11 @@ int Communicator::reply_unreliable(CommSession *session, CommTarget *target)
return 0;
}
if (__sync_sub_and_fetch(&entry->ref, 1) == 0)
{
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
else
errno = ENOENT;
return -1;
}
@@ -2056,6 +2045,7 @@ int Communicator::shutdown(CommSession *session)
{
CommTarget *target = session->target;
struct CommConnEntry *entry;
struct list_head *pos;
int ret;
if (session->passive != 1)
@@ -2068,10 +2058,21 @@ int Communicator::shutdown(CommSession *session)
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
{
entry = list_entry(target->idle_list.next, struct CommConnEntry, list);
list_del(&entry->list);
ret = mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
if (session->reliable)
{
ret = mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
}
else
{
ret = 0;
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
}
else
{