异步 IO 机制 io_uring

一、io_uring 原理

  • 如何解决频繁 copy 的问题 → mmap 内存映射解决。
    • submit queue 中的节点和 complete queue 中的节点共用一块内存,而不是把 submit queue 中的节点 copy 到 complete queue 中
  • 如何做到线程安全 → 无锁环形队列解决。

在这里插入图片描述


二、io_uring 使用

  • 内核为 io_uring 提供了三个系统调用:
    • io_uring_setup
    • io_uring_enter
    • io_uring_register
  • 封装好的库 liburing

三、liburing 安装

git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install

四、io_uring 实现 tcp server

  • io_uring 的 EVENT_READ 表示数据已经读出来了
  • epoll 的 EPOLLIN 表示数据可以读了
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

// gcc main.c -o main -luring

#define EVENT_ACCEPT   	0
#define EVENT_READ		1
#define EVENT_WRITE		2

struct conn_info {
	int fd;
	int event;
};


// 初始化 sockfd
int init_server(unsigned short port) {	
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);	
	struct sockaddr_in serveraddr;	
	memset(&serveraddr, 0, sizeof(struct sockaddr_in));	
	serveraddr.sin_family = AF_INET;	
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);	
	serveraddr.sin_port = htons(port);	

	if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {		
		perror("bind");		
		return -1;	
	}	

	listen(sockfd, 10);
	
	return sockfd;
}


#define ENTRIES_LENGTH		1024
#define BUFFER_LENGTH		1024

int set_event_recv(struct io_uring *ring, int connfd, void *buf, size_t len, int flags) {
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

	struct conn_info accept_info = {
		.fd = connfd,
		.event = EVENT_READ,
	};
	
	io_uring_prep_recv(sqe, connfd, buf, len, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int set_event_send(struct io_uring *ring, int connfd, void *buf, size_t len, int flags) {
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

	struct conn_info accept_info = {
		.fd = connfd,
		.event = EVENT_WRITE,
	};
	
	io_uring_prep_send(sqe, connfd, buf, len, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}


int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
	// 拿到 sq 中可用位置的指针
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

	struct conn_info accept_info = {
		.fd = sockfd,
		.event = EVENT_ACCEPT,
	};
	
	// 提交一个请求 → 往 sq 中插入一个节点
	io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}


int main(int argc, char *argv[]) {
	unsigned short port = 9999;
	int sockfd = init_server(port);

	// 构建 io_uring 
	struct io_uring_params params;
	memset(&params, 0, sizeof(params));
	struct io_uring ring;
	io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

	struct sockaddr_in clientaddr;	
	socklen_t len = sizeof(clientaddr);
	set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);

	char buffer[BUFFER_LENGTH] = {0};

	while (1) {
		// 将 sq 中的任务提交到 worker,让 worker 进行处理
		io_uring_submit(&ring);

		struct io_uring_cqe *cqe;
		// 拿到 cq 中最早完成事件的指针,如果没有完成的事件则会阻塞
		io_uring_wait_cqe(&ring, &cqe);

		struct io_uring_cqe *cqes[128];
		// 一次性从 cq 中拿 128 个完成事件的指针,如果没有完成的事件则会阻塞
		int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // 相当于 epoll_wait
		for (int i = 0;i < nready;i++) {
			struct io_uring_cqe *entries = cqes[i];
			struct conn_info result;
			memcpy(&result, &entries->user_data, sizeof(struct conn_info));

			if (result.event == EVENT_ACCEPT) {
				// 支持断开连接后还可以建立连接以及建立多个连接
				// 往 sq 中放一个读请求 → 处理新的连接
				set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
				
				printf("set_event_accept\n"); 

				int connfd = entries->res;

				set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
			} 
			// io_uring 的 EVENT_READ 表示数据已经读出来了
			// epoll 的 EPOLLIN 表示数据可以读了
			else if (result.event == EVENT_READ) {  
				int ret = entries->res;
				
				printf("set_event_recv ret: %d, %s\n", ret, buffer); 

				if (ret == 0) {
					close(result.fd);
				} else if (ret > 0) {
					set_event_send(&ring, result.fd, buffer, ret, 0);
				}
			}  else if (result.event == EVENT_WRITE) {
				int ret = entries->res;
				
				printf("set_event_send ret: %d, %s\n", ret, buffer);

				set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
			}
		}
		// 删除 cq 中处理过的任务
		io_uring_cq_advance(&ring, nready);
	}
}

五、QPS 测试

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>

#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>

// gcc test_qps_tcp.c -o test_qps_tcp -lpthread
// ./test_qps_tcp -s 127.0.0.1 -p 9999 -t 50 -c 100 -n 100000
// ./test_qps_tcp -s 127.0.0.1 -p 2048 -t 50 -c 100 -n 100000

typedef struct test_context_s {
	char serverip[16];
	int port;
	int threadnum;
	int connection;
	int requestion;

	int failed;	
} test_context_t;

int connect_tcpserver(const char *ip, unsigned short port) {
	int connfd = socket(AF_INET, SOCK_STREAM, 0);

	struct sockaddr_in tcpserver_addr;
	memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));

	tcpserver_addr.sin_family = AF_INET;
	tcpserver_addr.sin_addr.s_addr = inet_addr(ip);
	tcpserver_addr.sin_port = htons(port);

	int ret = connect(connfd, (struct sockaddr*)&tcpserver_addr, sizeof(struct sockaddr_in));
	if (ret) {
		perror("connect");
		return -1;
	}

	return connfd;
}


#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048

int send_recv_tcppkt(int fd) {
	char wbuffer[WBUFFER_LENGTH] = {0};

	for (int i = 0;i < 4;i ++) {
		strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
	}

	int res = send(fd, wbuffer, strlen(wbuffer), 0);
	if (res < 0) {
		exit(1);
	}
	
	char rbuffer[RBUFFER_LENGTH] = {0};
	res = recv(fd, rbuffer, RBUFFER_LENGTH, 0);
	if (res <= 0) {
		exit(1);
	}

	if (strcmp(rbuffer, wbuffer) != 0) {
		printf("failed: '%s' != '%s'\n", rbuffer, wbuffer); 
		return -1;
	}

	return 0;
}

static void *test_qps_entry(void *arg) {
	test_context_t *pctx = (test_context_t*)arg;

	int connfd = connect_tcpserver(pctx->serverip, pctx->port);
	if (connfd < 0) {
		printf("connect_tcp_server failed\n");
		return NULL;
	}

	// 每个线程的发送次数
	int count = pctx->requestion / pctx->threadnum;
	int i = 0;
	int res;

	while (i++ < count) {
		res = send_recv_tcppkt(connfd);
		if (res != 0) {
			printf("send_recv_tcppkt failed\n");
			pctx->failed++; // 可以加锁、也可以不加锁
			continue;
		}
	}

	return NULL;
}


int main(int argc, char *argv[]) {
	int ret = 0;
	test_context_t ctx = {0};
	
	int opt;
	while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1) {
		switch (opt) {
			case 's':
				printf("-s: %s\n", optarg);
				strcpy(ctx.serverip, optarg);
				break;

			case 'p':
				printf("-p: %s\n", optarg);

				ctx.port = atoi(optarg);
				break;

			case 't':
				printf("-t: %s\n", optarg);
				ctx.threadnum = atoi(optarg);
				break;

			case 'c':
				printf("-c: %s\n", optarg);
				ctx.connection = atoi(optarg);
				break;

			case 'n':
				printf("-n: %s\n", optarg);
				ctx.requestion = atoi(optarg);
				break;

			default:
				return -1;
		}
	}

	pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));

	// 记录开始时间
	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);

	for (int i = 0;i < ctx.threadnum;i ++) {
		pthread_create(&ptid[i], NULL, test_qps_entry, &ctx);
	}
	
	for (int i = 0;i < ctx.threadnum;i ++) {
		pthread_join(ptid[i], NULL);
	}

	// 记录结束时间
	struct timeval tv_end;
	gettimeofday(&tv_end, NULL);

	int time_used = TIME_SUB_MS(tv_end, tv_begin);

	printf("success: %d, failed: %d, time_used: %d, qps: %d\n", ctx.requestion - ctx.failed, ctx.failed, time_used, ctx.requestion * 1000 / time_used);

	free(ptid);

	return ret;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/558825.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

了解 Python 底层的解释器 CPython 和 Python 的对象模型

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一、CPython CPython 是 Python 编程语言的官方和最广泛使用的实现。它是用 C 语言编写的&#xff0c;因此得名 “CPython”。作为 Python 生态系统的核心&#xff0c;了解 CPython 的工作原理、主要特…

【新版】系统架构设计师 - 知识点 - 结构化开发方法

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 知识点 - 结构化开发方法结构化开发方法结构化分析结构化设计 数据流图和数据字典模块内聚类型与耦合类型 架构 - 知识点 - 结构化开发方法 结构化开发方法 分析阶段 工具&#xff1a;数据流图、…

如何实现文件上传到阿里云OSS!!!(结合上传pdf使用)

一、开通阿里云OSS对象存储服务 对象存储 OSS_云存储服务_企业数据管理_存储-阿里云阿里云对象存储 OSS 是一款海量、安全、低成本、高可靠的云存储服务&#xff0c;提供 99.995 % 的服务可用性和多种存储类型&#xff0c;适用于数据湖存储&#xff0c;数据迁移&#xff0c;企…

股票战法课程之主力的痕迹

文章目录 1. 主力的操作痕迹2. 主力的建仓2.1 建仓的三种方式2.2 建仓的五个特点2.3 建仓的迹象2.4 建仓的成交量特征 1. 主力的操作痕迹 序号痕迹原因1不跟随大盘节奏筹码都在主力手中2突发利空消息&#xff0c;股价不跌反涨主力被套&#xff0c;不希望散户抛盘3很小的成交量…

智己汽车数据驱动中心PMO高级经理张晶女士受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 智己汽车科技有限公司数据驱动中心PMO高级经理张晶女士受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“规模化敏捷落地实践”。大会将于5月25-26日在北京举办&#xff0c;敬请关注&#xff01; 议题简要&#xff1a; 2…

CSS基础:table的4个标签的样式详解(6000字长文!附案例)

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 云桃桃-大专生&#xff0c;一枚程序媛&#xff0c;感谢关注。回复 “前端基础题”&#xff0c;可免费获得前端基础 100 题汇总&#xff0c;回复 “前端工具”&#xff0c;可获取 Web 开发工具合…

【Java】Comparable和Comparator的区别

文章目录 区别Comparable示例Comparator示例参考资料 都可以用于排序。都是接口。 区别 Comparable示例 让被排序的类实现 Comparable 接口&#xff0c;重写 compareTo() 方法。 import java.util.*;public class Main {public static void main(String[] args){TreeSet<…

端点安全时刻影响着网络安全,我们应该如何保护

端点&#xff08;Endpoint&#xff09;是网络通信中的一个重要概念&#xff0c;指的是网络通信中的发送或接收信息的设备或节点。在一个网络中&#xff0c;端点可以是硬件设备&#xff08;如计算机、服务器、手机、路由器等&#xff09;&#xff0c;也可以是软件应用或服务。端…

PSO-BP和BP多输入多输出回归预测模型 matlab (多输入多输出)

文章目录 效果一览文章概述订阅专栏只能获取一份代码部分源码参考资料效果一览 文章概述 PSO-BP和BP多输入多输出回归预测模型 matlab (多输入多输出) 订阅专栏只能获取一份代码 部分源码 %------

ceph osd分组

一、前言 使用分组可以更好的管理osd&#xff0c;将不同类型的磁盘&#xff0c;分到不同的组中&#xff0c;例如hhd类型的osd分配到hhd组&#xff0c;ssd类型的osd分配到ssd组&#xff0c;将io要求不高的分配到hhd组做存储&#xff0c;io要求高的分配到ssd组做存储 二、配置 查…

Linux驱动开发笔记(一)字符驱动

文章目录 前言一、字符设备驱动程序框架二、基本原理1. 设备号的申请与归还2. 保存file_operations接口3. 设备节点的创建和销毁4. 创建文件设备4.1 mknod4.2 init_special_incode( )函数 5. 查找file_operation接口函数速查表 三、程序编写1. 模块初始化及关闭2. 文件操作方式…

墨子web3时事周报

蚂蚁集团Web3研发进展与布局 国内Web3赛道的领军企业——蚂蚁集团&#xff0c;凭借其在前沿科技领域的深耕不辍&#xff0c;已在Web3技术研发疆域缔造了卓越战绩。特别是在引领行业革新的关键时刻&#xff0c;集团于今年四月末震撼推出了颠覆性的Web3全套解决方案&#xff0c…

【Godot4自学手册】第三十八节给游戏添加音效

今天&#xff0c;我的主要任务就是给游戏添加音效。在添加音效前&#xff0c;我们需要了解一个东西&#xff1a;音频总线。这个东西或许有些枯燥&#xff0c;如果你只为添加一个音效没必要了解太多&#xff0c;但如果你以后将要经常与音频播放打交道&#xff0c;还是要了解一下…

ARM学习(26)链接库的依赖查看

笔者今天来聊一下查看链接库的依赖。 通常情况下&#xff0c;运行一个可执行文件的时候&#xff0c;可能会出现找不到依赖库的情况&#xff0c;比如图下这种情况&#xff0c;可以看到是缺少了license.dll或者libtest.so&#xff0c;所以无法运行。怎么知道它到底缺少什么dll呢&…

论婚恋相亲交友软件的市场前景和开发方案H5小程序APP源码

随着移动互联网的快速发展和社交需求的日益增长&#xff0c;婚恋相亲交友软件小程序成为了越来越多单身人士的选择。本文将从市场前景、使用人群、盈利模式以及竞品分析等多个角度&#xff0c;综合论述这一领域的现状与发展趋势。 一、市场前景 在快节奏的现代生活中&#xf…

矩阵混乱度(熵值)代码计算

1、先回顾下熵值的数据公式&#xff1a; 2、jax.numpy代码 注意的点&#xff1a;熵值计算的输入的必须是归一化的正值 import jax.numpy as jnp import jax def _entroy(probs):log_probs jnp.log2(jnp.maximum(1.0e-30, probs))mean_sum_plogp jnp.mean(- jnp.sum(log_pro…

面试题:Redis如何防止缓存穿透 + 布隆过滤器原理

题目来源 招银网络-技术-1面 题目描述 缓存穿透是什么&#xff1f;如何防止缓存穿透布隆过滤器的原理是什么&#xff1f; 我的回答 缓存穿透是什么&#xff1f; 攻击者大量请求缓存和数据库中都不存在的key。如何防止缓存穿透 可以使用布隆过滤器布隆过滤器的原理是什么&a…

mysql数据库连接工具(mysql数据库连接工具怎么备份数据不备份表结构)

MySQLWorkbench连接,导入和导出数据库? 1、导出&#xff1a;使用MySQL Workbench连接到MySQL服务器&#xff0c;选择要导出的数据库&#xff0c;右键单击数据库并选择“导出”。选择要导出的表和数据&#xff0c;将导出文件保存为.sql文件。 2、打开MySQL Workbench&#xf…

【GlobalMapper精品教程】074:从Lidar点云创建3D地形模型

本文基于地形点云数据,基于泊松方法、贪婪三角形测量方法和阿尔法形状创建3d地形模型。 文章目录 一、加载地形点云数据二、创建三维地形模型1. 泊松方法2. 贪婪三角形测量方法3. 阿尔法形状注意事项一、加载地形点云数据 加载配套案例数据包中的data074.rar中的地形点云数据…

分类分析模型

目录 1.目的 2.内容 2.1决策树分类模型 2.2K近邻分类模型 3.代码实现 3.1分类分析模型 3.2K近邻分类模型 1.目的 掌握利用Python语言及相关库编写决策树分类分析模型的方法&#xff0c;所构建的决策树能够对给定的数据集进行分类。掌握利用Python语言及相关库编写K近邻分…