如何用Rust快速实现MySQL代理服务(组图)

一、前言

背景

在Database Mesh中,Pisanix是一个以数据库为中心的治理框架,为用户提供了很多治理能力,比如数据库流量管理、SQL防火墙、负载均衡和审计。在 Pisanix 中,Pisa-Proxy 是整个 Database Mesh 实现中数据平面的核心组件。 Pisa-Proxy 服务本身需要具备 MySQL 协议意识,理解 SQL 语句,并且能够对后端代理的数据库、SQL 并发控制和断路功能做一些特定的策略。在这些特性中,能够理解 MySQL 协议尤为重要。本文将主要介绍 MySQL 协议以及 MySQL 协议在 Pisa-Proxy 中的 Rust 实现。

为什么要生锈

为什么选择 Rust 语言?我们的考虑有以下必要条件。

二、整体架构,模块化设计

整体架构

如图1所示,代理服务包括服务器和客户端之间的协议解析、SQL解析、访问控制、连接池等模块。

工作流程

在图1中,我们可以看到整个Proxy服务可以概括为以下几个阶段。

首先,Pisa-Proxy 支持 MySQL 协议,并伪装成数据库服务器。应用连接配置只需要修改访问地址即可建立连接。 Pisa-Proxy 读取应用程序发送的握手请求和数据包;

得到应用程序发送的SQL语句后,解析该SQL,得到该SQL的AST;

得到对应的AST后,基于AST实现高级访问控制和SQL防火墙能力;

访问控制和防火墙通过后,提交SQL执行。 SQL执行阶段的指标会被收集为Prometheus Metrics,最后根据负载均衡策略获取执行语句的后端数据库连接;

如果连接池为空,则根据配置建立与后端数据库的连接,并从该连接向后端数据库发送SQL;

最后读取SQL执行结果,组装返回给客户端。

三、如何用 Rust 快速实现 MySQL 代理服务

如图2所示,整个代理服务一般分为服务端和客户端两部分,即代理服务充当服务器,处理来自客户端的请求。而服务器,则需要在服务器上发起身份验证,并将客户端命令发送到 MySQL 数据库。在这两部分中,我们需要创建两组 Socket 来完成对网络数据包的处理。

技术选择

简介

对于网络报告处理和运行时处理,我们使用 Rust 实现的 Tokio() 框架。 Tokio 框架是用 Rust 编写的可靠、异步和精益应用程序的运行时。 Tokio 是一个事件驱动的非阻塞 I/O 平台,用于使用 Rust 编程语言编写异步应用程序。概括地说,它提供了几个主要组件:

同时,Tokio还提供了丰富的工具链,比如编解码工具包、成帧包等,可以让我们在MySQL中处理各种数据包更加方便快捷。

项目地址:

优势

一个。快速:Tokio 旨在使应用程序尽可能快。

b.零成本抽象:Tokio 基于 Future。虽然 Future 不是 Rust 独有的,但与其他语言的 Futures 不同,Tokio Futures 被编译成状态机,使用 Futures 实现通用同步,无需额外的开销成本。 Tokio 的非阻塞 IO 可以充分利用系统的优势,比如像 Linux Epoll 一样实现多路复用技术,在单线程上多路复用允许套接字和批处理接收操作系统消息,从而减少系统调用,所有这些都可以减少系统的开销。应用。

c。可靠:Rust 的所有权模型和类型系统使系统级应用程序无需担心内存不安全。

d。轻量级:没有垃圾收集器,而且由于 Tokio 是基于 Rust 构建的,编译后的可执行文件包含最少的语言运行时。这意味着,没有垃圾收集器,没有虚拟机,没有 JIT 编译,也没有堆栈操作。这样在编写多线程并发系统时可以有效避免阻塞。

e。模块化:每个组件都在一个单独的库中。如果需要,应用程序可以挑选它需要的组件,避免依赖于它不需要的组件。

代码实现

Rust 中的数据包处理

#[derive(Debug)]
pub struct Packet {
    pub sequence: u8,
    pub conn: BufStream,
    pub header: [u8; 4],
}

以上是Proxy数据包处理逻辑的核心结构。该结构包含三个字段:

以下功能主要定义在报文处理逻辑中,整个代理服务中的网络数据交换通过以下方法完成。

Pisa-Proxy 作为服务器

pub struct Connection {
    salt: Vec,
    status: u16,
    collation: CollationId,
    capability: u32,
    connection_id: u32,
    _charset: String,
    user: String,
    password: String,
    auth_data: BytesMut,
    pub auth_plugin_name: String,
    pub db: String,
    pub affected_rows: i64,
    pub pkt: Packet,
}

上述结构描述了当 Pisa-Proxy 作为服务器处理来自客户端的请求时包含的字段。例如,它包含与 MySQL 客户端的认证信息,以及包含数据包处理逻辑的 Packet。

Pisa-Proxy 作为客户端

#[derive(Debug, Default)]
pub struct ClientConn {
    pub framed: Option<Box>,
    pub auth_info: Option,
    user: String,
    password: String,
    endpoint: String,
}

Tokio 提供的编解码器

在 Pisa-Proxy 中,大量使用了 Tokio 工具包中的编解码器。使用编解码器 Rust 会自动帮助开发人员将原始字节转换为 Rust 数据类型,方便开发人员处理数据。使用codec,只需要对代码中定义的类型实现Decoder和Encoder Traits,就可以通过stream和sink读写数据。让我们通过一个简单的例子来看看使用 Tokio 编解码器的步骤。

使用 Tokio 编解码器分为三个步骤:

首先,我们需要自定义一个错误类型。这里我们定义了一个ProtocolError,并为它实现了一个from方法,这样它就可以接收错误处理了。

pub enum ProtocolError {
    Io(io::Error),
}
impl From for ProtocolError {
    fn from(err: io::Error) -> Self {
        ProtocolError::Io(err)
    }
}

定义一个数据类型,这里我们声明一个Message为String,然后定义一个结构体,也就是我们要解析的是原来的字节流实际转换成的结构体,也就是framed(概念Tokio 中的框架)),这里定义了一个空结构 PisaProxy;

type Message = String;
struct PisaProxy;

下一步是分别为 PisaProxy 实现 Encoder 和 Decoder Traits。在这里的示例中,实现的功能是将数据转换为字节数组并附加到 buf。在编码器中,我们首先需要指定Item类型为Message和错误类型。这里的编码处理逻辑将字符串拼接起来返回给客户端。

这里的Encoder编码是指将用户定义的类型转换为BytesMut类型并写入TcpStream,Decoder解码是指将读取的字节数据序列化成Rust结构。

impl Encoder for PisaProxy {
    type Error = ProtocolError;
    fn encode(&mut self, item: Message, dst: &mut BytesMut) ->Result {
        dst.extend(item.as_bytes());
        Ok(())
    }
}
impl Decoder for PisaProxy {
    type Item = Message;
    type Error = ProtocolError;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option, Self::Error> {
        if src.is_empty() {
            return Ok(None);
        }
        let data = src.split();
        let mut buf = BytesMut::from(&b"hello:"[..]);
        buf.extend_from_slice(&data[..]);
        let data = String::from_utf8_lossy(&buf[..]).to_string();
        Ok(Some(data))
    }
}

当PisaProxy结构实例化后,就可以调用framed方法了。 codec的framed方法(codec.framed(socket))将TcpStream转换为Framed。这个Framed在tokio中实现了Stream和Sink这两个trait,实现的两个Trait实例具有接收(通过Stream)和发送(通过Sink)数据的功能,这样我们就可以调用send方法发送数据了。

#[tokio::main]
async fn main() -> Result<(), Box> {
    let addr = "127.0.0.1:9088";
    let listener = TcpListener::bind(addr).await?;
    println!("listen on: {:?}", addr);
    loop {
        let (socket, addr) = listener.accept().await?;
        println!("accepted connect from: {}", addr);
        tokio::spawn(async move {
            let codec = PisaProxy {};
            let mut conn = codec.framed(socket);
            loop {
                match conn.next().await {
                    Some(Ok(None)) => println("waiting for data..."),
                    Some(Ok(data)) => {
                        println!("data {:?}", data);
                        conn.send(data).await;
                    },
                    Some(Err(e)) => {
                    },
                    None => {},
                }
            }
        });
    }
}

四、Pisa-Proxy中MySQL协议的实现

MySQL 协议简介

MySQL数据库本身是一个典型的C/S结构服务,客户端和服务器通信可以通过Tcp和Unix Socket进行交互。本文主要讲解通过网络Tcp实现MySQL代理。

MySQL客户端和服务端的交互主要包括两个重要的过程:1.握手认证,2.发送命令。本文将主要围绕这两个流程来介绍相关的实现。客户端与服务器的交互主要包括以下几种报文:数据报文、数据结束报文、成功报告报文和错误消息报文。这些类型的数据包将在后面的章节中详细介绍。 .

交互过程

MySQL客户端与服务端的交互主要包括两个过程,即握手认证阶段和命令执行阶段。当然,在这两个过程之前,必须先体验一下TCP的三次握手过程。三次握手完成后,首先进入握手认证阶段。信息交换完毕,客户端正确登录服务器后,进入命令执行阶段。图 3 完整描述了整个交互过程。

代码链接:

握手认证

握手认证阶段是MySQL客户端与服务器连接的一个非常重要的阶段,发生在TCP三次握手之后。首先,服务器会向客户端发送服务器信息,包括协议版本号、服务版本信息、挑战随机数、能力标志等。客户端收到服务器的响应后,发起认证请求。认证请求将在服务器响应中携带客户端用户名、数据库名和由挑战随机数加密的客户端密码,然后发送给服务器进行验证。在验证过程中,除了验证用户名和密码外,还会匹配客户端使用的认证插件。如果不匹配,插件会自动切换,判断客户端是否使用加密链接。当以上所有阶段都正常完成后,客户端登录成功,服务器返回客户端OK数据报文。

上述过程分别在runtime和protocol的server中实现。运行时中的启动函数等待请求进入。 TCP三次握手完成后,握手功能从图3所示的握手功能开始,开始握手阶段。握手有三个过程。首先write_initial_handshake将服务器信息发送给客户端,然后read_handshake_response客户端用服务器信息启动认证请求。

pub async fn handshake(&mut self) -> Result {
        match self.write_initial_handshake().await {
            Err(err) => return Err(err::ProtocolError::Io(err)),
            Ok(_) => debug!("it is ok"),
        }
        match self.read_handshake_response().await {
            Err(err) => {
                return Err(err);
            }
            Ok(_) => {
                self.pkt.write_ok().await?;
                debug!("handshake response ok")
            }
        }
        self.pkt.sequence = 0;
        Ok(())
    }

执行命令

当握手和认证阶段完成后,客户端只能与服务器建立真正意义上的连接。然后此时进入执行命令阶段。在MySQL中,可以发送命令的指令有很多种,下面我们为大家介绍一下。

代码链接:

如下代码所示,Pisa-Proxy 会在这里对不同类型的指令进行不同的逻辑处理。比如初始化db的处理逻辑是handle_init_db,查询的处理逻辑是handle_query。

match cmd {
          COM_INIT_DB => self.handle_init_db(&payload, true).await,
          COM_QUERY => self.handle_query(&payload).await,
          COM_FIELD_LIST => self.handle_field_list(&payload).await,
          COM_QUIT => self.handle_quit().await,
          COM_PING => self.handle_ok().await,
          COM_STMT_PREPARE => self.handle_prepare(&payload).await,
          COM_STMT_EXECUTE => self.handle_execute(&payload).await,
          COM_STMT_CLOSE => self.handle_stmt_close(&payload).await,
          COM_STMT_RESET => self.handle_ok().await,
          _ => self.handle_err(format!("command {} not support", cmd)).await,
}

MySQL 协议基本数据类型

MySQL协议中有以下数据类型:

参考链接:

消息结构

MySQL客户端和服务器之间交换的数据最大长度为16MByte,基本数据包结构类型如下:

图 4

图 5

MySQL 消息的基本结构如图 4 和图 5 所示。消息由两部分组成,消息头和消息体。在消息头中,3个字节代表数据消息的长度,1个字节存放序号,消息体存放实际消息数据。

消息头

用于标记当前请求报文的实际数据长度值,以字节为单位,占用3个字节,最大值为0xFFFFFF,即2^24-1。

序列号

序列 ID 从 0 开始,随着每个数据包递增,并在输入新命令时重置为 0。序列号 ID 可能会环绕。发生回卷时,需要将序号ID重置为0,重新开始计数和递增。

消息数据

消息体用于存储请求的内容和响应的数据。长度由消息头中的长度值决定。

客户端请求命令消息

该指令用于标识客户端要执行的命令的类型,以字节为单位占用1个字节。常用的请求命令消息有Text协议和Binary二进制协议。这里可以参考【执行命令】中的代码,描述了不同指令的处理逻辑。

文本协议中常用的命令如下:

值指令函数

0x01

COM_QUIT

关闭连接

0x02

COM_INIT_DB

切换数据库

0x03

COM_QUERY

SQL查询请求

0x04

COM_FIELD_LIST

获取数据表字段信息

0x05

COM_CREATE_DB

创建数据库

0x06

COM_DROP_DB

删除数据库

0x08

COM_SHUTDOWN

停止服务器

0x0A

COM_PROCESS_INFO

获取当前连接列表

0x0B

COM_CONNECT

(内线程状态)

0x0E

COM_PING

测试连通性

更多信息请参考:

以下是二进制协议中常用的命令,即Prepare Statement:

值指令函数

0x16

COM_STMT_PREPARE

预处理的 SQL 语句

0x17

COM_STMT_EXECUTE

执行准备好的语句

0x18

CCOM_STMT_SEND_LONG_DATA

发送BLOB类型数据

0x19

COM_STMT_CLOSE

销毁准备好的语句

0x1A

COM_STMT_RESET

清除prepared statement参数缓存

更多信息请参考:

响应消息

……

例如,下面的代码展示了如何向客户端写入 OK 和 EOF 消息。

#[inline]
pub async fn write_ok(&mut self) -> Result {
  let mut data = [7, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0];
  self.set_seq_id(&mut data);
  self.write_buf(&data).await
}
  
#[inline]
pub async fn write_eof(&mut self) -> Result {
  let mut eof = [5, 0, 0, 0, 0xfe, 0, 0, 2, 0];
  self.set_seq_id(&mut eof);
  self.write_buf(&eof).await
}

com_query 请求流程

com_query的请求流程如图6所示,com_query指令可能返回如下结果集:

在()文件中,主要处理ResultSet结果集的处理。可以看到这里定义了ResultSet结构,同时也为ResultSetCodec实现了Encoder和Decoder Traits,这样就可以通过codec来处理ResultSet了。的消息。

#[derive(Debug, Default)]
pub struct ResultsetCodec {
  pub next_state: DecodeResultsetState,
  pub col: u64,
  pub is_binary: bool,
  pub seq: u8,
  pub auth_info: Option,
}

图 6

五、总结

以上就是本文的全部内容。在本文中,我们介绍了使用 Rust 实现 MySQL 代理的动机,介绍了 MySQL 协议中的一些常用概念以及 MySQL 中的数据包如何在网络中交换数据;最后介绍了如何使用 Rust 快速实现一个 MySQL 代理服务。更多实现细节,请关注 Pisanix 代码库。

欢迎点击链接查看相关教学视频:

六、相关链接

皮萨尼克斯

项目地址:

官网地址:

数据库网格:

Mini-Proxy:最小的 MySQL Rust 代理实现

项目地址:

社区

目前,Pisanix 社区每两周组织一次在线讨论。详细情况如下。欢迎所有朋友参与。

A 列,B 列

邮件列表

两周一次的英语社区会议(自 2022 年 2 月 27 日起),太平洋标准时间周三上午 9:00

华人社区双周会(2022 年 4 月 27 日起),格林威治标准时间+8 周三晚上 9:00

微信助手

比萨尼克斯

松弛

会议纪要

关于作者

SphereEx MeshLab研发工程师王博,目前专注于Database Mesh、Cloud Native研发。 Linux、llvm、yacc、ebpf 用户、Gopher & Rustacean 和 c 漏洞猎人。

GitHub:

© 版权声明
THE END
喜欢就支持一下吧
点赞271 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片