mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
Move 'reliable' attribute to CommService.
This commit is contained in:
@@ -119,6 +119,35 @@ static int __create_ssl(SSL_CTX *ssl_ctx, struct CommConnEntry *entry)
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int __send_to_conn(const void *buf, size_t size,
|
||||
struct CommConnEntry *entry)
|
||||
{
|
||||
const struct sockaddr *addr;
|
||||
socklen_t addrlen;
|
||||
int ret;
|
||||
|
||||
if (!entry->ssl)
|
||||
{
|
||||
entry->target->get_addr(&addr, &addrlen);
|
||||
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
|
||||
}
|
||||
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
ret = SSL_write(entry->ssl, buf, size);
|
||||
if (ret <= 0)
|
||||
{
|
||||
ret = SSL_get_error(entry->ssl, ret);
|
||||
if (ret != SSL_ERROR_SYSCALL)
|
||||
errno = -ret;
|
||||
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void __release_conn(struct CommConnEntry *entry)
|
||||
{
|
||||
delete entry->conn;
|
||||
@@ -199,7 +228,7 @@ void CommTarget::deinit()
|
||||
|
||||
int CommMessageIn::feedback(const void *buf, size_t size)
|
||||
{
|
||||
return this->entry->session->push(buf, size, this->entry);
|
||||
return __send_to_conn(buf, size, this->entry);
|
||||
}
|
||||
|
||||
void CommMessageIn::renew()
|
||||
@@ -300,7 +329,7 @@ public:
|
||||
}
|
||||
|
||||
public:
|
||||
int shutdown(int reliable);
|
||||
int shutdown();
|
||||
|
||||
private:
|
||||
int sockfd;
|
||||
@@ -319,7 +348,7 @@ private:
|
||||
friend class Communicator;
|
||||
};
|
||||
|
||||
int CommServiceTarget::shutdown(int reliable)
|
||||
int CommServiceTarget::shutdown()
|
||||
{
|
||||
struct CommConnEntry *entry;
|
||||
int errno_bak;
|
||||
@@ -331,7 +360,7 @@ int CommServiceTarget::shutdown(int reliable)
|
||||
entry = list_entry(this->idle_list.next, struct CommConnEntry, list);
|
||||
list_del(&entry->list);
|
||||
|
||||
if (reliable)
|
||||
if (this->service->reliable)
|
||||
{
|
||||
errno_bak = errno;
|
||||
mpoller_del(entry->sockfd, entry->mpoller);
|
||||
@@ -351,37 +380,6 @@ int CommServiceTarget::shutdown(int reliable)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int CommSession::push(const void *buf, size_t size, struct CommConnEntry *entry)
|
||||
{
|
||||
const struct sockaddr *addr;
|
||||
socklen_t addrlen;
|
||||
int ret;
|
||||
|
||||
if (this->passive && !this->reliable)
|
||||
{
|
||||
entry->target->get_addr(&addr, &addrlen);
|
||||
return sendto(entry->sockfd, buf, size, 0, addr, addrlen);
|
||||
}
|
||||
|
||||
if (!entry->ssl)
|
||||
return write(entry->sockfd, buf, size);
|
||||
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
ret = SSL_write(entry->ssl, buf, size);
|
||||
if (ret <= 0)
|
||||
{
|
||||
ret = SSL_get_error(entry->ssl, ret);
|
||||
if (ret != SSL_ERROR_SYSCALL)
|
||||
errno = -ret;
|
||||
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
CommSession::~CommSession()
|
||||
{
|
||||
CommServiceTarget *target;
|
||||
@@ -391,7 +389,7 @@ CommSession::~CommSession()
|
||||
|
||||
target = (CommServiceTarget *)this->target;
|
||||
if (this->passive == 1)
|
||||
target->shutdown(this->reliable);
|
||||
target->shutdown();
|
||||
|
||||
target->decref();
|
||||
}
|
||||
@@ -1372,7 +1370,6 @@ poller_message_t *Communicator::create_request(void *context)
|
||||
return NULL;
|
||||
|
||||
session->passive = 1;
|
||||
session->reliable = 1;
|
||||
entry->session = session;
|
||||
session->target = target;
|
||||
session->conn = entry->conn;
|
||||
@@ -1439,7 +1436,6 @@ int Communicator::recv_request(const void *buf, size_t size,
|
||||
return -1;
|
||||
|
||||
session->passive = 1;
|
||||
session->reliable = 0;
|
||||
entry->session = session;
|
||||
session->target = target;
|
||||
session->conn = entry->conn;
|
||||
@@ -1815,7 +1811,7 @@ int Communicator::request(CommSession *session, CommTarget *target)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Communicator::nonblock_listen(CommService *service, int *reliable)
|
||||
int Communicator::nonblock_listen(CommService *service)
|
||||
{
|
||||
int sockfd = service->create_listen_fd();
|
||||
int ret;
|
||||
@@ -1830,7 +1826,7 @@ int Communicator::nonblock_listen(CommService *service, int *reliable)
|
||||
ret = listen(sockfd, SOMAXCONN);
|
||||
if (ret >= 0 || errno == EOPNOTSUPP)
|
||||
{
|
||||
*reliable = (ret >= 0);
|
||||
service->reliable = (ret >= 0);
|
||||
return sockfd;
|
||||
}
|
||||
}
|
||||
@@ -1846,10 +1842,9 @@ int Communicator::bind(CommService *service)
|
||||
{
|
||||
struct poller_data data;
|
||||
int errno_bak = errno;
|
||||
int reliable;
|
||||
int sockfd;
|
||||
|
||||
sockfd = this->nonblock_listen(service, &reliable);
|
||||
sockfd = this->nonblock_listen(service);
|
||||
if (sockfd >= 0)
|
||||
{
|
||||
service->listen_fd = sockfd;
|
||||
@@ -1857,7 +1852,7 @@ int Communicator::bind(CommService *service)
|
||||
data.fd = sockfd;
|
||||
data.context = service;
|
||||
data.result = NULL;
|
||||
if (reliable)
|
||||
if (service->reliable)
|
||||
{
|
||||
data.operation = PD_OP_LISTEN;
|
||||
data.accept = Communicator::accept;
|
||||
@@ -1987,7 +1982,7 @@ int Communicator::reply_unreliable(CommSession *session, CommTarget *target)
|
||||
int Communicator::reply(CommSession *session)
|
||||
{
|
||||
struct CommConnEntry *entry;
|
||||
CommTarget *target;
|
||||
CommServiceTarget *target;
|
||||
int errno_bak;
|
||||
int ret;
|
||||
|
||||
@@ -1999,8 +1994,8 @@ int Communicator::reply(CommSession *session)
|
||||
|
||||
errno_bak = errno;
|
||||
session->passive = 2;
|
||||
target = session->target;
|
||||
if (session->reliable)
|
||||
target = (CommServiceTarget *)session->target;
|
||||
if (target->service->reliable)
|
||||
ret = this->reply_idle_conn(session, target);
|
||||
else
|
||||
ret = this->reply_unreliable(session, target);
|
||||
@@ -2038,7 +2033,7 @@ int Communicator::push(const void *buf, size_t size, CommSession *session)
|
||||
if (!list_empty(&target->idle_list))
|
||||
{
|
||||
entry = list_entry(target->idle_list.next, struct CommConnEntry, list);
|
||||
ret = session->push(buf, size, entry);
|
||||
ret = __send_to_conn(buf, size, entry);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -2062,7 +2057,7 @@ int Communicator::shutdown(CommSession *session)
|
||||
|
||||
session->passive = 2;
|
||||
target = (CommServiceTarget *)session->target;
|
||||
if (!target->shutdown(session->reliable))
|
||||
if (!target->shutdown())
|
||||
{
|
||||
errno = ENOENT;
|
||||
return -1;
|
||||
|
||||
@@ -157,11 +157,7 @@ private:
|
||||
private:
|
||||
struct timespec begin_time;
|
||||
int timeout;
|
||||
char passive;
|
||||
char reliable;
|
||||
|
||||
private:
|
||||
int push(const void *buf, size_t size, struct CommConnEntry *entry);
|
||||
int passive;
|
||||
|
||||
public:
|
||||
CommSession() { this->passive = 0; }
|
||||
@@ -226,6 +222,7 @@ private:
|
||||
void decref();
|
||||
|
||||
private:
|
||||
int reliable;
|
||||
int listen_fd;
|
||||
int ref;
|
||||
|
||||
@@ -344,7 +341,7 @@ private:
|
||||
static void handler_thread_routine(void *context);
|
||||
|
||||
static int nonblock_connect(CommTarget *target);
|
||||
static int nonblock_listen(CommService *service, int *reliable);
|
||||
static int nonblock_listen(CommService *service);
|
||||
|
||||
static struct CommConnEntry *launch_conn(CommSession *session,
|
||||
CommTarget *target);
|
||||
|
||||
Reference in New Issue
Block a user