Simplify Communicator's codes.

This commit is contained in:
Xie Han
2024-03-27 17:23:59 +08:00
parent 29e9d9de98
commit 7249adde3f
2 changed files with 85 additions and 115 deletions

View File

@@ -199,35 +199,7 @@ void CommTarget::deinit()
int CommMessageIn::feedback(const void *buf, size_t size)
{
struct CommConnEntry *entry = this->entry;
CommSession *session = entry->session;
const struct sockaddr *addr;
socklen_t addrlen;
int ret;
if (session->passive && !session->reliable)
{
entry->target->get_addr(&addr, &addrlen);
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
}
if (!entry->ssl)
return write(entry->sockfd, buf, size);
if (size == 0)
return 0;
ret = SSL_write(entry->ssl, buf, size);
if (ret <= 0)
{
ret = SSL_get_error(entry->ssl, ret);
if (ret != SSL_ERROR_SYSCALL)
errno = -ret;
ret = -1;
}
return ret;
return this->entry->session->push(buf, size, this->entry);
}
void CommMessageIn::renew()
@@ -327,6 +299,9 @@ public:
}
}
public:
int shutdown(int reliable);
private:
int sockfd;
int ref;
@@ -344,44 +319,81 @@ private:
friend class Communicator;
};
CommSession::~CommSession()
int CommServiceTarget::shutdown(int reliable)
{
struct CommConnEntry *entry;
struct list_head *pos;
CommTarget *target;
int errno_bak;
int ret = 0;
pthread_mutex_lock(&this->mutex);
if (!list_empty(&this->idle_list))
{
entry = list_entry(this->idle_list.next, struct CommConnEntry, list);
list_del(&entry->list);
if (reliable)
{
errno_bak = errno;
mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
errno = errno_bak;
}
else
{
__release_conn(entry);
this->decref();
}
ret = 1;
}
pthread_mutex_unlock(&this->mutex);
return ret;
}
int CommSession::push(const void *buf, size_t size, struct CommConnEntry *entry)
{
const struct sockaddr *addr;
socklen_t addrlen;
int ret;
if (this->passive && !this->reliable)
{
entry->target->get_addr(&addr, &addrlen);
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
}
if (!entry->ssl)
return write(entry->sockfd, buf, size);
if (size == 0)
return 0;
ret = SSL_write(entry->ssl, buf, size);
if (ret <= 0)
{
ret = SSL_get_error(entry->ssl, ret);
if (ret != SSL_ERROR_SYSCALL)
errno = -ret;
ret = -1;
}
return ret;
}
CommSession::~CommSession()
{
CommServiceTarget *target;
if (!this->passive)
return;
target = this->target;
target = (CommServiceTarget *)this->target;
if (this->passive == 1)
{
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
{
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
target->shutdown(this->reliable);
if (this->reliable)
{
errno_bak = errno;
mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
errno = errno_bak;
}
else
{
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
}
pthread_mutex_unlock(&target->mutex);
}
((CommServiceTarget *)target)->decref();
target->decref();
}
inline int Communicator::first_timeout(CommSession *session)
@@ -1981,7 +1993,7 @@ int Communicator::reply(CommSession *session)
if (session->passive != 1)
{
errno = session->passive ? ENOENT : EPERM;
errno = session->passive ? ENOENT : EINVAL;
return -1;
}
@@ -2018,7 +2030,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
if (session->passive != 1)
{
errno = session->passive ? ENOENT : EPERM;
errno = session->passive ? ENOENT : EINVAL;
return -1;
}
@@ -2026,30 +2038,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
if (!list_empty(&target->idle_list))
{
entry = list_entry(target->idle_list.next, struct CommConnEntry, list);
if (session->reliable)
{
if (!entry->ssl)
ret = write(entry->sockfd, buf, size);
else if (size == 0)
ret = 0;
else
{
ret = SSL_write(entry->ssl, buf, size);
if (ret <= 0)
{
ret = SSL_get_error(entry->ssl, ret);
if (ret != SSL_ERROR_SYSCALL)
errno = -ret;
ret = -1;
}
}
}
else
{
ret = sendto(entry->sockfd, buf, size, 0,
target->addr, target->addrlen);
}
ret = session->push(buf, size, entry);
}
else
{
@@ -2063,45 +2052,23 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
int Communicator::shutdown(CommSession *session)
{
CommTarget *target = session->target;
struct CommConnEntry *entry;
struct list_head *pos;
int ret;
CommServiceTarget *target;
if (session->passive != 1)
{
errno = session->passive ? ENOENT : EPERM;
errno = session->passive ? ENOENT : EINVAL;
return -1;
}
session->passive = 2;
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
{
pos = target->idle_list.next;
entry = list_entry(pos, struct CommConnEntry, list);
list_del(pos);
if (session->reliable)
{
ret = mpoller_del(entry->sockfd, entry->mpoller);
entry->state = CONN_STATE_CLOSING;
}
else
{
ret = 0;
__release_conn(entry);
((CommServiceTarget *)target)->decref();
}
}
else
target = (CommServiceTarget *)session->target;
if (!target->shutdown(session->reliable))
{
errno = ENOENT;
ret = -1;
return -1;
}
pthread_mutex_unlock(&target->mutex);
return ret;
return 0;
}
int Communicator::sleep(SleepSession *session)

View File

@@ -90,7 +90,7 @@ private:
public:
virtual ~CommTarget() { }
friend class CommSession;
friend class CommServiceTarget;
friend class Communicator;
};
@@ -157,8 +157,11 @@ private:
private:
struct timespec begin_time;
int timeout;
short passive;
short reliable;
char passive;
char reliable;
private:
int push(const void *buf, size_t size, struct CommConnEntry *entry);
public:
CommSession() { this->passive = 0; }