Enable 'push()' on an active session. (#1590)

This commit is contained in:
xiehan
2024-07-22 04:00:47 +08:00
committed by GitHub
parent a8078aeac1
commit e64c02ec16
2 changed files with 76 additions and 61 deletions

View File

@@ -179,8 +179,15 @@ public:
}
/* Push reply data synchronously. */
virtual int push(const void *buf, size_t size)
int push(const void *buf, size_t size)
{
if (this->state != WFT_STATE_TOREPLY &&
this->state != WFT_STATE_NOREPLY)
{
errno = ENOENT;
return -1;
}
return this->scheduler->push(buf, size, this);
}

View File

@@ -119,35 +119,6 @@ static int __create_ssl(SSL_CTX *ssl_ctx, struct CommConnEntry *entry)
return -1;
}
static int __send_to_conn(const void *buf, size_t size,
struct CommConnEntry *entry)
{
const struct sockaddr *addr;
socklen_t addrlen;
int ret;
if (!entry->ssl)
{
entry->target->get_addr(&addr, &addrlen);
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
}
if (size == 0)
return 0;
ret = SSL_write(entry->ssl, buf, size);
if (ret <= 0)
{
ret = SSL_get_error(entry->ssl, ret);
if (ret != SSL_ERROR_SYSCALL)
errno = -ret;
ret = -1;
}
return ret;
}
static void __release_conn(struct CommConnEntry *entry)
{
delete entry->conn;
@@ -198,7 +169,31 @@ void CommTarget::deinit()
int CommMessageIn::feedback(const void *buf, size_t size)
{
return __send_to_conn(buf, size, this->entry);
struct CommConnEntry *entry = this->entry;
const struct sockaddr *addr;
socklen_t addrlen;
int ret;
if (!entry->ssl)
{
entry->target->get_addr(&addr, &addrlen);
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
}
if (size == 0)
return 0;
ret = SSL_write(entry->ssl, buf, size);
if (ret <= 0)
{
ret = SSL_get_error(entry->ssl, ret);
if (ret != SSL_ERROR_SYSCALL)
errno = -ret;
ret = -1;
}
return ret;
}
void CommMessageIn::renew()
@@ -358,7 +353,7 @@ CommSession::~CommSession()
return;
target = (CommServiceTarget *)this->target;
if (this->passive == 1)
if (this->passive == 2)
target->shutdown();
target->decref();
@@ -615,6 +610,7 @@ void Communicator::handle_incoming_request(struct poller_result *res)
list_add(&entry->list, &target->idle_list);
}
session->passive = 2;
pthread_mutex_unlock(&target->mutex);
break;
@@ -1304,6 +1300,7 @@ poller_message_t *Communicator::create_request(void *context)
CommService *service = entry->service;
CommTarget *target = entry->target;
CommSession *session;
CommMessageIn *in;
int timeout;
if (entry->state == CONN_STATE_IDLE)
@@ -1344,20 +1341,22 @@ poller_message_t *Communicator::create_request(void *context)
((CommServiceTarget *)target)->incref();
session->in = session->message_in();
if (session->in)
in = session->message_in();
if (in)
{
session->in->poller_message_t::append = Communicator::append_message;
session->in->entry = entry;
in->poller_message_t::append = Communicator::append_message;
in->entry = entry;
session->in = in;
}
return session->in;
return in;
}
poller_message_t *Communicator::create_reply(void *context)
{
struct CommConnEntry *entry = (struct CommConnEntry *)context;
CommSession *session;
CommMessageIn *in;
if (entry->state == CONN_STATE_IDLE)
{
@@ -1373,14 +1372,15 @@ poller_message_t *Communicator::create_reply(void *context)
}
session = entry->session;
session->in = session->message_in();
if (session->in)
in = session->message_in();
if (in)
{
session->in->poller_message_t::append = Communicator::append_message;
session->in->entry = entry;
in->poller_message_t::append = Communicator::append_message;
in->entry = entry;
session->in = in;
}
return session->in;
return in;
}
int Communicator::recv_request(const void *buf, size_t size,
@@ -1389,6 +1389,7 @@ int Communicator::recv_request(const void *buf, size_t size,
CommService *service = entry->service;
CommTarget *target = entry->target;
CommSession *session;
CommMessageIn *in;
size_t n;
int ret;
@@ -1408,14 +1409,15 @@ int Communicator::recv_request(const void *buf, size_t size,
((CommServiceTarget *)target)->incref();
session->in = session->message_in();
if (session->in)
in = session->message_in();
if (in)
{
session->in->entry = entry;
in->entry = entry;
session->in = in;
do
{
n = size;
ret = session->in->append(buf, &n);
ret = in->append(buf, &n);
if (ret == 0)
{
size -= n;
@@ -1949,14 +1951,14 @@ int Communicator::reply(CommSession *session)
int errno_bak;
int ret;
if (session->passive != 1)
if (session->passive != 2)
{
errno = session->passive ? ENOENT : EINVAL;
errno = EINVAL;
return -1;
}
errno_bak = errno;
session->passive = 2;
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (target->service->reliable)
ret = this->reply_reliable(session, target);
@@ -1982,21 +1984,27 @@ int Communicator::reply(CommSession *session)
int Communicator::push(const void *buf, size_t size, CommSession *session)
{
CommTarget *target = session->target;
struct CommConnEntry *entry;
CommMessageIn *in = session->in;
pthread_mutex_t *mutex;
int ret;
if (session->passive != 1)
if (!in)
{
errno = session->passive ? ENOENT : EINVAL;
errno = ENOENT;
return -1;
}
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
if (session->passive)
mutex = &session->target->mutex;
else
mutex = &in->entry->mutex;
pthread_mutex_lock(mutex);
if ((session->passive == 2 && !list_empty(&session->target->idle_list)) ||
(!session->passive && in->entry->session == session) ||
session->passive == 1)
{
entry = list_entry(target->idle_list.next, struct CommConnEntry, list);
ret = __send_to_conn(buf, size, entry);
ret = in->CommMessageIn::feedback(buf, size);
}
else
{
@@ -2004,7 +2012,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
ret = -1;
}
pthread_mutex_unlock(&target->mutex);
pthread_mutex_unlock(mutex);
return ret;
}
@@ -2012,13 +2020,13 @@ int Communicator::shutdown(CommSession *session)
{
CommServiceTarget *target;
if (session->passive != 1)
if (session->passive != 2)
{
errno = session->passive ? ENOENT : EINVAL;
errno = EINVAL;
return -1;
}
session->passive = 2;
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (!target->shutdown())
{