Merge branch 'master' of https://github.com/sogou/workflow into nossl

This commit is contained in:
Xie Han
2024-07-22 04:25:19 +08:00
2 changed files with 57 additions and 42 deletions

View File

@@ -177,8 +177,15 @@ public:
}
/* Push reply data synchronously. */
virtual int push(const void *buf, size_t size)
int push(const void *buf, size_t size)
{
if (this->state != WFT_STATE_TOREPLY &&
this->state != WFT_STATE_NOREPLY)
{
errno = ENOENT;
return -1;
}
return this->scheduler->push(buf, size, this);
}

View File

@@ -97,16 +97,6 @@ static int __bind_sockaddr(int sockfd, const struct sockaddr *addr,
return 0;
}
static int __send_to_conn(const void *buf, size_t size,
struct CommConnEntry *entry)
{
const struct sockaddr *addr;
socklen_t addrlen;
entry->target->get_addr(&addr, &addrlen);
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
}
static void __release_conn(struct CommConnEntry *entry)
{
delete entry->conn;
@@ -151,7 +141,12 @@ void CommTarget::deinit()
int CommMessageIn::feedback(const void *buf, size_t size)
{
return __send_to_conn(buf, size, this->entry);
struct CommConnEntry *entry = this->entry;
const struct sockaddr *addr;
socklen_t addrlen;
entry->target->get_addr(&addr, &addrlen);
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
}
void CommMessageIn::renew()
@@ -308,7 +303,7 @@ CommSession::~CommSession()
return;
target = (CommServiceTarget *)this->target;
if (this->passive == 1)
if (this->passive == 2)
target->shutdown();
target->decref();
@@ -553,6 +548,7 @@ void Communicator::handle_incoming_request(struct poller_result *res)
list_add(&entry->list, &target->idle_list);
}
session->passive = 2;
pthread_mutex_unlock(&target->mutex);
break;
@@ -1174,6 +1170,7 @@ poller_message_t *Communicator::create_request(void *context)
CommService *service = entry->service;
CommTarget *target = entry->target;
CommSession *session;
CommMessageIn *in;
int timeout;
if (entry->state == CONN_STATE_IDLE)
@@ -1214,20 +1211,22 @@ poller_message_t *Communicator::create_request(void *context)
((CommServiceTarget *)target)->incref();
session->in = session->message_in();
if (session->in)
in = session->message_in();
if (in)
{
session->in->poller_message_t::append = Communicator::append_message;
session->in->entry = entry;
in->poller_message_t::append = Communicator::append_message;
in->entry = entry;
session->in = in;
}
return session->in;
return in;
}
poller_message_t *Communicator::create_reply(void *context)
{
struct CommConnEntry *entry = (struct CommConnEntry *)context;
CommSession *session;
CommMessageIn *in;
if (entry->state == CONN_STATE_IDLE)
{
@@ -1243,14 +1242,15 @@ poller_message_t *Communicator::create_reply(void *context)
}
session = entry->session;
session->in = session->message_in();
if (session->in)
in = session->message_in();
if (in)
{
session->in->poller_message_t::append = Communicator::append_message;
session->in->entry = entry;
in->poller_message_t::append = Communicator::append_message;
in->entry = entry;
session->in = in;
}
return session->in;
return in;
}
int Communicator::recv_request(const void *buf, size_t size,
@@ -1259,6 +1259,7 @@ int Communicator::recv_request(const void *buf, size_t size,
CommService *service = entry->service;
CommTarget *target = entry->target;
CommSession *session;
CommMessageIn *in;
size_t n;
int ret;
@@ -1278,14 +1279,15 @@ int Communicator::recv_request(const void *buf, size_t size,
((CommServiceTarget *)target)->incref();
session->in = session->message_in();
if (session->in)
in = session->message_in();
if (in)
{
session->in->entry = entry;
in->entry = entry;
session->in = in;
do
{
n = size;
ret = session->in->append(buf, &n);
ret = in->append(buf, &n);
if (ret == 0)
{
size -= n;
@@ -1817,14 +1819,14 @@ int Communicator::reply(CommSession *session)
int errno_bak;
int ret;
if (session->passive != 1)
if (session->passive != 2)
{
errno = session->passive ? ENOENT : EINVAL;
errno = EINVAL;
return -1;
}
errno_bak = errno;
session->passive = 2;
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (target->service->reliable)
ret = this->reply_reliable(session, target);
@@ -1850,21 +1852,27 @@ int Communicator::reply(CommSession *session)
int Communicator::push(const void *buf, size_t size, CommSession *session)
{
CommTarget *target = session->target;
struct CommConnEntry *entry;
CommMessageIn *in = session->in;
pthread_mutex_t *mutex;
int ret;
if (session->passive != 1)
if (!in)
{
errno = session->passive ? ENOENT : EINVAL;
errno = ENOENT;
return -1;
}
pthread_mutex_lock(&target->mutex);
if (!list_empty(&target->idle_list))
if (session->passive)
mutex = &session->target->mutex;
else
mutex = &in->entry->mutex;
pthread_mutex_lock(mutex);
if ((session->passive == 2 && !list_empty(&session->target->idle_list)) ||
(!session->passive && in->entry->session == session) ||
session->passive == 1)
{
entry = list_entry(target->idle_list.next, struct CommConnEntry, list);
ret = __send_to_conn(buf, size, entry);
ret = in->CommMessageIn::feedback(buf, size);
}
else
{
@@ -1872,7 +1880,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
ret = -1;
}
pthread_mutex_unlock(&target->mutex);
pthread_mutex_unlock(mutex);
return ret;
}
@@ -1880,13 +1888,13 @@ int Communicator::shutdown(CommSession *session)
{
CommServiceTarget *target;
if (session->passive != 1)
if (session->passive != 2)
{
errno = session->passive ? ENOENT : EINVAL;
errno = EINVAL;
return -1;
}
session->passive = 2;
session->passive = 3;
target = (CommServiceTarget *)session->target;
if (!target->shutdown())
{