mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
Support preadv/pwritev for multi-thread file IO.
This commit is contained in:
@@ -199,5 +199,4 @@ Linux operating system supports a set of asynchronous IO system calls with high
|
||||
We have implemented a set of posix aio interfaces to support other UNIX systems, and used the sigevent notification method of threads, but it is no longer in use because of its low efficiency.
|
||||
Currently, for non-Linux systems, asynchronous IO is always simulated by multi-threading. When an IO task arrives, a thread is created in real time to execute IO tasks, and then a callback is used to return to the handler thread pool.
|
||||
Multi-threaded IO is also the only choice in macOS, because macOS does not have good sigevent support and posix aio will not work in macOS.
|
||||
Multi-threaded IO does not support preadv and pwritev tasks. When these two tasks are created and run, you will get an ENOSYS error in the callback.
|
||||
Some UNIX systems do not support fdatasync. In this case, an fdsync task is equivalent to an fsync task.
|
||||
|
||||
@@ -195,6 +195,5 @@ Linux操作系统支持一套效率很高,CPU占用非常少的异步IO系统
|
||||
我们曾经实现过一套posix aio接口用于支持其它UNIX系统,并使用线程的sigevent通知方式,但由于其效率太低,已经不再使用了。
|
||||
目前,对于非Linux系统,异步IO一律是用多线程实现,在IO任务到达时,实时创建线程执行IO任务,callback回到handler线程池。
|
||||
多线程IO也是macOS下的唯一选择,因为macOS没有良好的sigevent支持,posix aio行不通。
|
||||
多线程IO不支持preadv和pwritev两种任务,创建并运行这两种任务,会在callback里得到一个ENOSYS错误。
|
||||
某些UNIX系统不支持fdatasync调用,这种情况下,fdsync任务将等价于fsync任务。
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
#include <sys/uio.h>
|
||||
#include <errno.h>
|
||||
#include <dlfcn.h>
|
||||
#include <unistd.h>
|
||||
#include <pthread.h>
|
||||
#include "list.h"
|
||||
@@ -87,6 +88,7 @@ void IOSession::prep_fdsync(int fd)
|
||||
|
||||
int IOService::init(int maxevents)
|
||||
{
|
||||
void *p;
|
||||
int ret;
|
||||
|
||||
if (maxevents <= 0)
|
||||
@@ -96,18 +98,30 @@ int IOService::init(int maxevents)
|
||||
}
|
||||
|
||||
ret = pthread_mutex_init(&this->mutex, NULL);
|
||||
if (ret == 0)
|
||||
if (ret)
|
||||
{
|
||||
this->maxevents = maxevents;
|
||||
this->nevents = 0;
|
||||
INIT_LIST_HEAD(&this->session_list);
|
||||
this->pipe_fd[0] = -1;
|
||||
this->pipe_fd[1] = -1;
|
||||
return 0;
|
||||
errno = ret;
|
||||
return -1;
|
||||
}
|
||||
|
||||
errno = ret;
|
||||
return -1;
|
||||
p = dlsym(RTLD_DEFAULT, "preadv");
|
||||
if (p)
|
||||
this->preadv = (ssize_t (*)(int, const struct iovec *, int, off_t))p;
|
||||
else
|
||||
this->preadv = IOService::preadv_emul;
|
||||
|
||||
p = dlsym(RTLD_DEFAULT, "pwritev");
|
||||
if (p)
|
||||
this->pwritev = (ssize_t (*)(int, const struct iovec *, int, off_t))p;
|
||||
else
|
||||
this->pwritev = IOService::pwritev_emul;
|
||||
|
||||
this->maxevents = maxevents;
|
||||
this->nevents = 0;
|
||||
INIT_LIST_HEAD(&this->session_list);
|
||||
this->pipe_fd[0] = -1;
|
||||
this->pipe_fd[1] = -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void IOService::deinit()
|
||||
@@ -216,9 +230,12 @@ void *IOService::io_routine(void *arg)
|
||||
ret = fdatasync(fd);
|
||||
break;
|
||||
case IO_CMD_PREADV:
|
||||
ret = service->preadv(fd, (const struct iovec *)session->buf,
|
||||
session->count, session->offset);
|
||||
break;
|
||||
case IO_CMD_PWRITEV:
|
||||
errno = ENOSYS;
|
||||
ret = -1;
|
||||
ret = service->pwritev(fd, (const struct iovec *)session->buf,
|
||||
session->count, session->offset);
|
||||
break;
|
||||
default:
|
||||
errno = EINVAL;
|
||||
@@ -249,3 +266,49 @@ void *IOService::aio_finish(void *ptr, void *context)
|
||||
return session;
|
||||
}
|
||||
|
||||
ssize_t IOService::preadv_emul(int fd, const struct iovec *iov, int iovcnt,
|
||||
off_t offset)
|
||||
{
|
||||
size_t total = 0;
|
||||
ssize_t n;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < iovcnt; i++)
|
||||
{
|
||||
n = pread(fd, iov[i].iov_base, iov[i].iov_len, offset);
|
||||
if (n < 0)
|
||||
return total == 0 ? -1 : total;
|
||||
|
||||
total += n;
|
||||
if ((size_t)n < iov[i].iov_len)
|
||||
return total;
|
||||
|
||||
offset += n;
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
ssize_t IOService::pwritev_emul(int fd, const struct iovec *iov, int iovcnt,
|
||||
off_t offset)
|
||||
{
|
||||
size_t total = 0;
|
||||
ssize_t n;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < iovcnt; i++)
|
||||
{
|
||||
n = pwrite(fd, iov[i].iov_base, iov[i].iov_len, offset);
|
||||
if (n < 0)
|
||||
return total == 0 ? -1 : total;
|
||||
|
||||
total += n;
|
||||
if ((size_t)n < iov[i].iov_len)
|
||||
return total;
|
||||
|
||||
offset += n;
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
|
||||
@@ -105,6 +105,14 @@ private:
|
||||
static void *io_routine(void *arg);
|
||||
static void *aio_finish(void *ptr, void *context);
|
||||
|
||||
private:
|
||||
static ssize_t preadv_emul(int fd, const struct iovec *iov, int iovcnt,
|
||||
off_t offset);
|
||||
static ssize_t pwritev_emul(int fd, const struct iovec *iov, int iovcnt,
|
||||
off_t offset);
|
||||
ssize_t (*preadv)(int, const struct iovec *, int, off_t);
|
||||
ssize_t (*pwritev)(int, const struct iovec *, int, off_t);
|
||||
|
||||
public:
|
||||
virtual ~IOService() { }
|
||||
friend class Communicator;
|
||||
|
||||
Reference in New Issue
Block a user