diff --git a/src/kernel/Communicator.cc b/src/kernel/Communicator.cc index ff1e6589..eb8ce4b8 100644 --- a/src/kernel/Communicator.cc +++ b/src/kernel/Communicator.cc @@ -199,35 +199,7 @@ void CommTarget::deinit() int CommMessageIn::feedback(const void *buf, size_t size) { - struct CommConnEntry *entry = this->entry; - CommSession *session = entry->session; - const struct sockaddr *addr; - socklen_t addrlen; - int ret; - - if (session->passive && !session->reliable) - { - entry->target->get_addr(&addr, &addrlen); - return sendto(entry->sockfd, buf, size, 0, addr, addrlen); - } - - if (!entry->ssl) - return write(entry->sockfd, buf, size); - - if (size == 0) - return 0; - - ret = SSL_write(entry->ssl, buf, size); - if (ret <= 0) - { - ret = SSL_get_error(entry->ssl, ret); - if (ret != SSL_ERROR_SYSCALL) - errno = -ret; - - ret = -1; - } - - return ret; + return this->entry->session->push(buf, size, this->entry); } void CommMessageIn::renew() @@ -327,6 +299,9 @@ public: } } +public: + int shutdown(int reliable); + private: int sockfd; int ref; @@ -344,44 +319,81 @@ private: friend class Communicator; }; -CommSession::~CommSession() +int CommServiceTarget::shutdown(int reliable) { struct CommConnEntry *entry; - struct list_head *pos; - CommTarget *target; int errno_bak; + int ret = 0; + + pthread_mutex_lock(&this->mutex); + if (!list_empty(&this->idle_list)) + { + entry = list_entry(this->idle_list.next, struct CommConnEntry, list); + list_del(&entry->list); + + if (reliable) + { + errno_bak = errno; + mpoller_del(entry->sockfd, entry->mpoller); + entry->state = CONN_STATE_CLOSING; + errno = errno_bak; + } + else + { + __release_conn(entry); + this->decref(); + } + + ret = 1; + } + + pthread_mutex_unlock(&this->mutex); + return ret; +} + +int CommSession::push(const void *buf, size_t size, struct CommConnEntry *entry) +{ + const struct sockaddr *addr; + socklen_t addrlen; + int ret; + + if (this->passive && !this->reliable) + { + entry->target->get_addr(&addr, &addrlen); + return sendto(entry->sockfd, buf, size, 0, addr, addrlen); + } + + if (!entry->ssl) + return write(entry->sockfd, buf, size); + + if (size == 0) + return 0; + + ret = SSL_write(entry->ssl, buf, size); + if (ret <= 0) + { + ret = SSL_get_error(entry->ssl, ret); + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + + ret = -1; + } + + return ret; +} + +CommSession::~CommSession() +{ + CommServiceTarget *target; if (!this->passive) return; - target = this->target; + target = (CommServiceTarget *)this->target; if (this->passive == 1) - { - pthread_mutex_lock(&target->mutex); - if (!list_empty(&target->idle_list)) - { - pos = target->idle_list.next; - entry = list_entry(pos, struct CommConnEntry, list); - list_del(pos); + target->shutdown(this->reliable); - if (this->reliable) - { - errno_bak = errno; - mpoller_del(entry->sockfd, entry->mpoller); - entry->state = CONN_STATE_CLOSING; - errno = errno_bak; - } - else - { - __release_conn(entry); - ((CommServiceTarget *)target)->decref(); - } - } - - pthread_mutex_unlock(&target->mutex); - } - - ((CommServiceTarget *)target)->decref(); + target->decref(); } inline int Communicator::first_timeout(CommSession *session) @@ -1981,7 +1993,7 @@ int Communicator::reply(CommSession *session) if (session->passive != 1) { - errno = session->passive ? ENOENT : EPERM; + errno = session->passive ? ENOENT : EINVAL; return -1; } @@ -2018,7 +2030,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session) if (session->passive != 1) { - errno = session->passive ? ENOENT : EPERM; + errno = session->passive ? ENOENT : EINVAL; return -1; } @@ -2026,30 +2038,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session) if (!list_empty(&target->idle_list)) { entry = list_entry(target->idle_list.next, struct CommConnEntry, list); - if (session->reliable) - { - if (!entry->ssl) - ret = write(entry->sockfd, buf, size); - else if (size == 0) - ret = 0; - else - { - ret = SSL_write(entry->ssl, buf, size); - if (ret <= 0) - { - ret = SSL_get_error(entry->ssl, ret); - if (ret != SSL_ERROR_SYSCALL) - errno = -ret; - - ret = -1; - } - } - } - else - { - ret = sendto(entry->sockfd, buf, size, 0, - target->addr, target->addrlen); - } + ret = session->push(buf, size, entry); } else { @@ -2063,45 +2052,23 @@ int Communicator::push(const void *buf, size_t size, CommSession *session) int Communicator::shutdown(CommSession *session) { - CommTarget *target = session->target; - struct CommConnEntry *entry; - struct list_head *pos; - int ret; + CommServiceTarget *target; if (session->passive != 1) { - errno = session->passive ? ENOENT : EPERM; + errno = session->passive ? ENOENT : EINVAL; return -1; } session->passive = 2; - pthread_mutex_lock(&target->mutex); - if (!list_empty(&target->idle_list)) - { - pos = target->idle_list.next; - entry = list_entry(pos, struct CommConnEntry, list); - list_del(pos); - - if (session->reliable) - { - ret = mpoller_del(entry->sockfd, entry->mpoller); - entry->state = CONN_STATE_CLOSING; - } - else - { - ret = 0; - __release_conn(entry); - ((CommServiceTarget *)target)->decref(); - } - } - else + target = (CommServiceTarget *)session->target; + if (!target->shutdown(session->reliable)) { errno = ENOENT; - ret = -1; + return -1; } - pthread_mutex_unlock(&target->mutex); - return ret; + return 0; } int Communicator::sleep(SleepSession *session) diff --git a/src/kernel/Communicator.h b/src/kernel/Communicator.h index 6339197a..dec78ad6 100644 --- a/src/kernel/Communicator.h +++ b/src/kernel/Communicator.h @@ -90,7 +90,7 @@ private: public: virtual ~CommTarget() { } - friend class CommSession; + friend class CommServiceTarget; friend class Communicator; }; @@ -157,8 +157,11 @@ private: private: struct timespec begin_time; int timeout; - short passive; - short reliable; + char passive; + char reliable; + +private: + int push(const void *buf, size_t size, struct CommConnEntry *entry); public: CommSession() { this->passive = 0; }