io_uring 是 Linux 下高性能的异步 IO 框架,网上很多相关资料,我之前也初步分析了一下它的实现,有兴趣的可以查看 https://zhuanlan.zhihu.com/p/387620810。
Libuv 中最近加入了对 io_uring 的支持,那么为什么要把它引入 Libuv 呢?因为 epoll 不支持普通文件的 Poll 能力,所以在 Libuv 中,异步文件 IO 操作需要通过线程池来实现,具体来说就是当用户发起一个异步文件 IO 操作时,Libuv 会把这个操作放到线程池中,当子线程处理这个任务时,会执行一个阻塞式的系统调用,这个系统调用会引起线程阻塞,从而导致这个线程被消耗掉了,当 IO 操作完成后,子线程就会被唤醒,子线程再通过主线程去执行用户的回调。在 Libuv 早期的实现中,如果执行比较慢的任务过多就会把线程池中的线程消耗完,从而导致执行比较快的 IO 操作需要等待很长时间,一个例子就是 DNS 解析会阻塞文件 IO 任务。而 io_uring 可以支持普通文件 IO(当然能力不仅于此),不再需要借助线程池的能力,目前 Libuv 中部分异步文件 IO 操作已经替换成 io_uring(需要通过环境变量开启),下面来看看它的实现。
原生 io_uring 的使用比较复杂,通常需要借助 liburing 库,但是 Libuv 中可能为了减少对第三方库的依赖,实现上使用原生的方式。
io_uring 初始化
在 Libuv 初始化时会进行 io_uring 的初始化。
uv__iou_init(loop->backend_fd,&lfields->iou,64,UV__IORING_SETUP_SQPOLL);
lfields->iou 为 io_uring 核心结构体,UVIORING_SETUP_SQPOLL 设置内核创建线程轮询是否有任务需要处理(用户层设置),接着看看 uviou_init。
staticvoiduv__iou_init(intepollfd,struct uv__iou*iou,uint32_t entries,uint32_t flags){struct uv__io_uring_params params;struct epoll_event e;size_t cqlen;size_t sqlen;size_t maxlen;size_t sqelen;uint32_t i;char*sq;char*sqe;intringfd;memset(¶ms,0,sizeof(params));params.flags=flags;// UV__IORING_SETUP_SQPOLL 模式下,设置多久没有任务提交则内核线程进入 sleep 状态if(flags&UV__IORING_SETUP_SQPOLL)params.sq_thread_idle=10;/* milliseconds / // 调用系统调用初始化 io_uring ringfd = uv__io_uring_setup(entries, ¶ms); // 映射到内核发送 / 完成队列的内存,用户层和内核可以共同操作这个队列 sq = mmap(0, maxlen, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, ringfd, 0); / sqe = mmap(0, sqelen, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, ringfd, 0x10000000ull); /* IORING_OFF_SQES */memset(&e,0,sizeof(e));e.events=POLLIN;e.data.fd=ringfd;// 注册等待可读事件,io_uring 中有任务完成后就会通过 epollepoll_ctl(epollfd,EPOLL_CTL_ADD,ringfd,&e);// 初始化 io_uring 结构体iou->sqhead=(uint32_t*)(sq+params.sq_off.head);iou->sqtail=(uint32_t*)(sq+params.sq_off.tail);iou->sqmask=(uint32_t)(sq+params.sq_off.ring_mask);iou->sqarray=(uint32_t*)(sq+params.sq_off.array);iou->sqflags=(uint32_t*)(sq+params.sq_off.flags);iou->cqhead=(uint32_t*)(sq+params.cq_off.head);iou->cqtail=(uint32_t*)(sq+params.cq_off.tail);iou->cqmask=(uint32_t)(sq+params.cq_off.ring_mask);iou->sq=sq;iou->cqe=sq+params.cq_off.cqes;iou->sqe=sqe;iou->sqlen=sqlen;iou->cqlen=cqlen;iou->maxlen=maxlen;iou->sqelen=sqelen;iou->ringfd=ringfd;iou->in_flight=0;iou->flags=0;}
uv__iou_init 完成了 io_uring 的初始化,并且把 io_uring 对应的 fd 注册到 epoll,当 io_uring 有任务完成时,就可以通过 epoll 感知到。接着就可以使用 io_uring 了。
下面看一个异步文件 IO 的操作。
intuv_fs_open(uv_loop_t*loop,uv_fs_t*req,constchar*path,intflags,intmode,uv_fs_cb cb){INIT(OPEN);PATH;req->flags=flags;req->mode=mode;if(cb!=NULL)if(uv__iou_fs_open(loop,req))return0;POST;}
uv_fs_open 可以以异步的方式打开一个文件,之前时通过线程池实现的,加入 io_uring 后,就会多了一层拦截,来看看 uv__iou_fs_open。
intuv__iou_fs_open(uv_loop_t*loop,uv_fs_t*req){struct uv__io_uring_sqe*sqe;struct uv__iou*iou;// 获取 io_uring 结构体iou=&uv__get_internal_fields(loop)->iou;// 获取一个任务节点,任务节点会和 req 互相关联,回调时会用到sqe=uv__iou_get_sqe(iou,loop,req);// 设置操作上下文sqe->addr=(uintptr_t)req->path;sqe->fd=AT_FDCWD;sqe->len=req->mode;// 设置操作类型sqe->opcode=UV__IORING_OP_OPENAT;sqe->open_flags=req->flags|O_CLOEXEC;// 提交任务uv__iou_submit(iou);return1;}
uviou_fs_open 中有两个核心逻辑 uviou_get_sqe 和 uviou_submit,首先来看 uviou_get_sqe。
staticstruct uv__io_uring_sqe*uv__iou_get_sqe(struct uv__iou*iou,uv_loop_t*loop,uv_fs_t*req){struct uv__io_uring_sqe*sqe;uint32_t head;uint32_t tail;uint32_t mask;uint32_t slot;if(iou->ringfd==-1)returnNULL;head=atomic_load_explicit((_Atomic uint32_t*)iou->sqhead,memory_order_acquire);tail=*iou->sqtail;mask=iou->sqmask;slot=tail&mask;sqe=iou->sqe;// 从请求队列中获取一个节点sqe=&sqe[slot];memset(sqe,0,sizeof(*sqe));// 任务节点关联到 req,回调时需要使用sqe->user_data=(uintptr_t)req;req->work_req.loop=loop;req->work_req.work=NULL;req->work_req.done=NULL;uv__queue_init(&req->work_req.wq);uv__req_register(loop,req);iou->in_flight++;returnsqe;}
uviou_get_sqe 主要是从任务队列中获取一个空闲节点并关联上请求上下文结构体,uviou_get_sqe 的调用方需要设置操作上下文,比如操作类型,操作的 fd 等。通过 uviou_get_sqe 获取任务节点并设置了操作上下文后,这个任务就会自动被操作系统感知。因为 Libuv 是使用了 UVIORING_SETUP_SQPOLL 模式,所以还需要判断这时候内核轮训线程是否处于睡眠状态,这就是 uv__iou_submit 的逻辑。
staticvoiduv__iou_submit(struct uv__iou*iou){uint32_t flags;atomic_store_explicit((_Atomic uint32_t*)iou->sqtail,*iou->sqtail+1,memory_order_release);flags=atomic_load_explicit((_Atomic uint32_t*)iou->sqflags,memory_order_acquire);// 判断内核线程是否处于睡眠状态if(flags&UV__IORING_SQ_NEED_WAKEUP)// 唤醒内核线程,说明有任务需要处理if(uv__io_uring_enter(iou->ringfd,0,0,UV__IORING_ENTER_SQ_WAKEUP))if(errno!=EOWNERDEAD)/* Kernel bug. Harmless, ignore. / perror("libuv: io_uring_enter(wakeup)"); /
这样就完成了任务的提交。
任务完成后,io_uring 对应的 fd 就会变成可读,从而 epoll 就会感知到,来看看 epoll 的处理。下面是 epoll 处理就绪 fd 时的一段逻辑。
if(fd==iou->ringfd){uv__poll_io_uring(loop,iou);have_iou_events=1;continue;}
如果是 io_uring 的 fd 可读,则执行 uv__poll_io_uring。
staticvoiduv__poll_io_uring(uv_loop_t*loop,struct uv__iou*iou){struct uv__io_uring_cqe*cqe;struct uv__io_uring_cqe*e;uv_fs_t*req;uint32_t head;uint32_t tail;uint32_t mask;uint32_t i;uint32_t flags;intnevents;intrc;// 完成队列头/尾节点head=iou->cqhead;tail=atomic_load_explicit((_Atomic uint32_t)iou->cqtail,memory_order_acquire);mask=iou->cqmask;cqe=iou->cqe;nevents=0;// 遍历完成队列for(i=head;i!=tail;i++){e=&cqe[i&mask];// 拿到操作关联的请求结构体req=(uv_fs_t*)(uintptr_t)e->user_data;uv__req_unregister(loop,req);iou->in_flight--;// 操作返回值,表示操作是否成功req->result=e->res;// 执行回调req->cb(req);}
uv__poll_io_uring 的逻辑很简单,就是遍历完成队列,然后拿到对应的请求上下文结构体,最后执行它的回调。
现代软件中大多数使用的 IO 模型是 epoll,随着 io_uring 的发展和成熟,io_uring 将会出现在更多的软件中,之前我也体验了一下 io_uring,有兴趣的可以体验下 https://github.com/theanarkh/nodejs_io_uring。