From e64c02ec1650ab385cfcf7260fcd3cb8c6d4ebdd Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Mon, 22 Jul 2024 04:00:47 +0800 Subject: [PATCH] Enable 'push()' on an active session. (#1590) --- src/factory/WFTask.h | 9 ++- src/kernel/Communicator.cc | 128 ++++++++++++++++++++----------------- 2 files changed, 76 insertions(+), 61 deletions(-) diff --git a/src/factory/WFTask.h b/src/factory/WFTask.h index 78c8c318..6c5722bd 100644 --- a/src/factory/WFTask.h +++ b/src/factory/WFTask.h @@ -179,8 +179,15 @@ public: } /* Push reply data synchronously. */ - virtual int push(const void *buf, size_t size) + int push(const void *buf, size_t size) { + if (this->state != WFT_STATE_TOREPLY && + this->state != WFT_STATE_NOREPLY) + { + errno = ENOENT; + return -1; + } + return this->scheduler->push(buf, size, this); } diff --git a/src/kernel/Communicator.cc b/src/kernel/Communicator.cc index 68885b22..7f96b10a 100644 --- a/src/kernel/Communicator.cc +++ b/src/kernel/Communicator.cc @@ -119,35 +119,6 @@ static int __create_ssl(SSL_CTX *ssl_ctx, struct CommConnEntry *entry) return -1; } -static int __send_to_conn(const void *buf, size_t size, - struct CommConnEntry *entry) -{ - const struct sockaddr *addr; - socklen_t addrlen; - int ret; - - if (!entry->ssl) - { - entry->target->get_addr(&addr, &addrlen); - return sendto(entry->sockfd, buf, size, 0, addr, addrlen); - } - - if (size == 0) - return 0; - - ret = SSL_write(entry->ssl, buf, size); - if (ret <= 0) - { - ret = SSL_get_error(entry->ssl, ret); - if (ret != SSL_ERROR_SYSCALL) - errno = -ret; - - ret = -1; - } - - return ret; -} - static void __release_conn(struct CommConnEntry *entry) { delete entry->conn; @@ -198,7 +169,31 @@ void CommTarget::deinit() int CommMessageIn::feedback(const void *buf, size_t size) { - return __send_to_conn(buf, size, this->entry); + struct CommConnEntry *entry = this->entry; + const struct sockaddr *addr; + socklen_t addrlen; + int ret; + + if (!entry->ssl) + { + entry->target->get_addr(&addr, &addrlen); + return sendto(entry->sockfd, buf, size, 0, addr, addrlen); + } + + if (size == 0) + return 0; + + ret = SSL_write(entry->ssl, buf, size); + if (ret <= 0) + { + ret = SSL_get_error(entry->ssl, ret); + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + + ret = -1; + } + + return ret; } void CommMessageIn::renew() @@ -358,7 +353,7 @@ CommSession::~CommSession() return; target = (CommServiceTarget *)this->target; - if (this->passive == 1) + if (this->passive == 2) target->shutdown(); target->decref(); @@ -615,6 +610,7 @@ void Communicator::handle_incoming_request(struct poller_result *res) list_add(&entry->list, &target->idle_list); } + session->passive = 2; pthread_mutex_unlock(&target->mutex); break; @@ -1304,6 +1300,7 @@ poller_message_t *Communicator::create_request(void *context) CommService *service = entry->service; CommTarget *target = entry->target; CommSession *session; + CommMessageIn *in; int timeout; if (entry->state == CONN_STATE_IDLE) @@ -1344,20 +1341,22 @@ poller_message_t *Communicator::create_request(void *context) ((CommServiceTarget *)target)->incref(); - session->in = session->message_in(); - if (session->in) + in = session->message_in(); + if (in) { - session->in->poller_message_t::append = Communicator::append_message; - session->in->entry = entry; + in->poller_message_t::append = Communicator::append_message; + in->entry = entry; + session->in = in; } - return session->in; + return in; } poller_message_t *Communicator::create_reply(void *context) { struct CommConnEntry *entry = (struct CommConnEntry *)context; CommSession *session; + CommMessageIn *in; if (entry->state == CONN_STATE_IDLE) { @@ -1373,14 +1372,15 @@ poller_message_t *Communicator::create_reply(void *context) } session = entry->session; - session->in = session->message_in(); - if (session->in) + in = session->message_in(); + if (in) { - session->in->poller_message_t::append = Communicator::append_message; - session->in->entry = entry; + in->poller_message_t::append = Communicator::append_message; + in->entry = entry; + session->in = in; } - return session->in; + return in; } int Communicator::recv_request(const void *buf, size_t size, @@ -1389,6 +1389,7 @@ int Communicator::recv_request(const void *buf, size_t size, CommService *service = entry->service; CommTarget *target = entry->target; CommSession *session; + CommMessageIn *in; size_t n; int ret; @@ -1408,14 +1409,15 @@ int Communicator::recv_request(const void *buf, size_t size, ((CommServiceTarget *)target)->incref(); - session->in = session->message_in(); - if (session->in) + in = session->message_in(); + if (in) { - session->in->entry = entry; + in->entry = entry; + session->in = in; do { n = size; - ret = session->in->append(buf, &n); + ret = in->append(buf, &n); if (ret == 0) { size -= n; @@ -1949,14 +1951,14 @@ int Communicator::reply(CommSession *session) int errno_bak; int ret; - if (session->passive != 1) + if (session->passive != 2) { - errno = session->passive ? ENOENT : EINVAL; + errno = EINVAL; return -1; } errno_bak = errno; - session->passive = 2; + session->passive = 3; target = (CommServiceTarget *)session->target; if (target->service->reliable) ret = this->reply_reliable(session, target); @@ -1982,21 +1984,27 @@ int Communicator::reply(CommSession *session) int Communicator::push(const void *buf, size_t size, CommSession *session) { - CommTarget *target = session->target; - struct CommConnEntry *entry; + CommMessageIn *in = session->in; + pthread_mutex_t *mutex; int ret; - if (session->passive != 1) + if (!in) { - errno = session->passive ? ENOENT : EINVAL; + errno = ENOENT; return -1; } - pthread_mutex_lock(&target->mutex); - if (!list_empty(&target->idle_list)) + if (session->passive) + mutex = &session->target->mutex; + else + mutex = &in->entry->mutex; + + pthread_mutex_lock(mutex); + if ((session->passive == 2 && !list_empty(&session->target->idle_list)) || + (!session->passive && in->entry->session == session) || + session->passive == 1) { - entry = list_entry(target->idle_list.next, struct CommConnEntry, list); - ret = __send_to_conn(buf, size, entry); + ret = in->CommMessageIn::feedback(buf, size); } else { @@ -2004,7 +2012,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session) ret = -1; } - pthread_mutex_unlock(&target->mutex); + pthread_mutex_unlock(mutex); return ret; } @@ -2012,13 +2020,13 @@ int Communicator::shutdown(CommSession *session) { CommServiceTarget *target; - if (session->passive != 1) + if (session->passive != 2) { - errno = session->passive ? ENOENT : EINVAL; + errno = EINVAL; return -1; } - session->passive = 2; + session->passive = 3; target = (CommServiceTarget *)session->target; if (!target->shutdown()) {