Merge pull request #1247 from Barenboim/master

Add PD_OP_RECV_MESSAGE to support UDP server.
This commit is contained in:
xiehan
2023-04-13 21:22:48 +08:00
committed by GitHub
2 changed files with 84 additions and 27 deletions

View File

@@ -573,14 +573,15 @@ static void __poller_handle_listen(struct __poller_node *node,
{
struct __poller_node *res = node->res;
struct sockaddr_storage ss;
socklen_t len;
struct sockaddr *addr = (struct sockaddr *)&ss;
socklen_t addrlen;
void *result;
int sockfd;
void *p;
while (1)
{
len = sizeof (struct sockaddr_storage);
sockfd = accept(node->data.fd, (struct sockaddr *)&ss, &len);
addrlen = sizeof (struct sockaddr_storage);
sockfd = accept(node->data.fd, addr, &addrlen);
if (sockfd < 0)
{
if (errno == EAGAIN || errno == EMFILE || errno == ENFILE)
@@ -591,13 +592,12 @@ static void __poller_handle_listen(struct __poller_node *node,
break;
}
p = node->data.accept((const struct sockaddr *)&ss, len,
sockfd, node->data.context);
if (!p)
result = node->data.accept(addr, addrlen, sockfd, node->data.context);
if (!result)
break;
res->data = node->data;
res->data.result = p;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
@@ -733,17 +733,17 @@ static void __poller_handle_event(struct __poller_node *node,
struct __poller_node *res = node->res;
unsigned long long cnt = 0;
unsigned long long value;
ssize_t ret;
void *p;
void *result;
ssize_t n;
while (1)
{
ret = read(node->data.fd, &value, sizeof (unsigned long long));
if (ret == sizeof (unsigned long long))
n = read(node->data.fd, &value, sizeof (unsigned long long));
if (n == sizeof (unsigned long long))
cnt += value;
else
{
if (ret >= 0)
if (n >= 0)
errno = EINVAL;
break;
}
@@ -757,12 +757,12 @@ static void __poller_handle_event(struct __poller_node *node,
return;
cnt--;
p = node->data.event(node->data.context);
if (!p)
result = node->data.event(node->data.context);
if (!result)
break;
res->data = node->data;
res->data.result = p;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
@@ -790,20 +790,20 @@ static void __poller_handle_notify(struct __poller_node *node,
poller_t *poller)
{
struct __poller_node *res = node->res;
ssize_t ret;
void *p;
void *result;
ssize_t n;
while (1)
{
ret = read(node->data.fd, &p, sizeof (void *));
if (ret == sizeof (void *))
n = read(node->data.fd, &result, sizeof (void *));
if (n == sizeof (void *))
{
p = node->data.notify(p, node->data.context);
if (!p)
result = node->data.notify(result, node->data.context);
if (!result)
break;
res->data = node->data;
res->data.result = p;
res->data.result = result;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
@@ -813,11 +813,11 @@ static void __poller_handle_notify(struct __poller_node *node,
if (!res)
break;
}
else if (ret < 0 && errno == EAGAIN)
else if (n < 0 && errno == EAGAIN)
return;
else
{
if (ret > 0)
if (n > 0)
errno = EINVAL;
break;
}
@@ -826,7 +826,7 @@ static void __poller_handle_notify(struct __poller_node *node,
if (__poller_remove_node(node, poller))
return;
if (ret == 0)
if (n == 0)
{
node->error = 0;
node->state = PR_ST_FINISHED;
@@ -841,6 +841,54 @@ static void __poller_handle_notify(struct __poller_node *node,
poller->cb((struct poller_result *)node, poller->ctx);
}
static void __poller_handle_recv_message(struct __poller_node *node,
poller_t *poller)
{
struct __poller_node *res = node->res;
struct sockaddr_storage ss;
struct sockaddr *addr = (struct sockaddr *)&ss;
socklen_t addrlen;
poller_message_t *msg;
void *p = poller->buf;
ssize_t n;
while (1)
{
addrlen = sizeof (struct sockaddr_storage);
n = recvfrom(node->data.fd, p, POLLER_BUFSIZE, 0, addr, &addrlen);
if (n < 0)
{
if (errno == EAGAIN)
return;
else
break;
}
msg = node->data.recv_message(addr, addrlen, p, n, node->data.context);
if (!msg)
break;
res->data = node->data;
res->data.message = msg;
res->error = 0;
res->state = PR_ST_SUCCESS;
poller->cb((struct poller_result *)res, poller->ctx);
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
node->res = res;
if (!res)
break;
}
if (__poller_remove_node(node, poller))
return;
node->error = errno;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}
static int __poller_handle_pipe(poller_t *poller)
{
struct __poller_node **node = (struct __poller_node **)poller->buf;
@@ -1006,6 +1054,9 @@ static void *__poller_thread_routine(void *arg)
case PD_OP_NOTIFY:
__poller_handle_notify(node, poller);
break;
case PD_OP_RECV_MESSAGE:
__poller_handle_recv_message(node, poller);
break;
}
}
else if (node == (struct __poller_node *)1)
@@ -1240,6 +1291,9 @@ static int __poller_data_get_event(int *event, const struct poller_data *data)
case PD_OP_NOTIFY:
*event = EPOLLIN | EPOLLET;
return 1;
case PD_OP_RECV_MESSAGE:
*event = EPOLLIN | EPOLLET;
return 1;
default:
errno = EINVAL;
return -1;

View File

@@ -35,6 +35,7 @@ struct __poller_message
struct poller_data
{
#define PD_OP_TIMER 0
#define PD_OP_READ 1
#define PD_OP_WRITE 2
#define PD_OP_LISTEN 3
@@ -46,7 +47,7 @@ struct poller_data
#define PD_OP_SSL_SHUTDOWN 7
#define PD_OP_EVENT 8
#define PD_OP_NOTIFY 9
#define PD_OP_TIMER 10
#define PD_OP_RECV_MESSAGE 10
short operation;
unsigned short iovcnt;
int fd;
@@ -58,6 +59,8 @@ struct poller_data
void *(*accept)(const struct sockaddr *, socklen_t, int, void *);
void *(*event)(void *);
void *(*notify)(void *, void *);
poller_message_t *(*recv_message)(const struct sockaddr *, socklen_t,
const void *, size_t, void *);
};
void *context;
union