MiniDB Development Notes 2 – Network Communication: PostgreSQL Server Implementation

Original link: https://blog.lss233.com/minidb-kai-fa-shou-zha-2-wang-luo-tong-xin-postgresql-fu-wu-duan-shi-xian/

To write a protocol capable of network communication, we need to have a client and a server, define various data packet formats and their interaction processes, and then need to consider various factors such as security and efficiency… It’s too much trouble! So, instead of designing a communication protocol from scratch, why not study the protocols of existing database systems first?

If we directly implement the protocol of a certain database, then the client of this database can directly connect to our database, so that we don’t even need to write the client. hey-hey……

With this idea in mind, I did some research on the Internet and finally decided to use the communication protocol of the PostgreSQL database.

PostgreSQL was chosen mainly because…

  1. The community is active and informative (you can Google it if you don’t know)
  2. The official website has more detailed communication protocol documents (I understand it)
  3. The copyright belongs to the community and will not be affected by some commercial decision (then you will write for nothing)
  4. The open source license is free, allowing the code modified based on it to be commercialized (what if my project becomes famous one day?)

Another thing worth mentioning is that there are not many projects implemented on GitHub using Kotlin/Java and PostgreSQL protocols. Even if there are, they are all used as clients, so our MiniDB is the first to do so (no suspicion of plagiarism). !)

PostgreSQL’s communication protocol documentation can be found here: https://www.postgresql.org/docs/current/protocol.html

0x01 PostgreSQL Communication Protocol Overview

Turning to the chapter directory, we can see that it consists of the following parts:

  • Message Flow – Define when each type of data packet is sent and what kind of response is expected
  • Message Data Types – defines the data types that will be included in the packet
  • Message Formats – define the structure and meaning of each packet
  • Changelog of communication protocols
  • some other advanced stuff, which is not something we need to consider at the moment

To allow an ordinary PostgreSQL client to connect to our server, we only need to understand the handshake phase and the query phase . As for other details, we will consider them later.

Start-up Phase: handshake

The simplest handshake process is shown in the following figure:

According to the introduction in the document, after a client with SSL support establishes a connection, it will first send an SSLRequest packet to ask the server whether to perform SSL encryption.

The server either replies with S, and then both parties enter into SSL encryption, or replies with N, and only communicates in plaintext.

Next, the client sends a StartupMessage containing some basic settings, user name and database name and other information, representing the official handshake.

The server will send a packet from the Authentication series to the client, indicating that the client is requested to authenticate in some way. If the authentication is successful, send the AuthenticationOk packet, which means the authentication is successful. If authentication fails, send ErrorResponse.

After the authentication is successful, the client will wait for the server to send the ReadyForQuery packet, which means that the server is ready and can enter the query stage. During this period, the server can send some ParameterStatus packets to the client, these packets contain some default settings of the server (such as encoding, time zone, etc.).

Query Phase: query phase

PostgreSQL has a variety of different query modes, here we first learn the simplest query process.

First, the client sends a Query packet, which contains the SQL statement we want to query (there may be multiple!)

After receiving the query statement, the server performs the query, and returns the result after obtaining the result.

If the result obtained is a table (for example, SELECT or EXPLAIN is executed, the server will first send a RowDescription packet to introduce the meaning of each column, and then a DataRow packet for each row of data, and then end with a CommandComplete packet. Finally there is a ReadyForQuery waiting for the next query request.

Message Formats: Packet formats

Generally speaking, the data packets in the communication protocol generally have the following information:

  • Packet Identifier – used to distinguish types between packets
  • Packet length – used to tell the other end how much memory space to prepare to receive data
  • Body – the actual content to be sent

PostgreSQL’s packet format consists of these elements.

0x02 Simple implementation under Netty

Having said that, it’s time to start writing code.

The code structure of the network part of MiniDB is as follows:

 MiniDB │ ├─com │ └─lss233 │ └─minidb │ └─networking │ │ MessageType.kt │ │ NettyServer.kt │ │ NettyServerInitializer.kt │ │ Session.kt │ │ │ │ ├─query │ │ │ QueryHandler.kt │ │ │ │ │ └─startup │ │ SSLRequestRejectHandler.kt │ │ StartupMessageHandler.kt │ │ │ └─packets │ AuthenticationOk.kt │ AuthenticationSASL.kt │ CommandComplete.kt │ EmptyQueryResponse.kt │ ErrorResponse.kt │ NotificationResponse.kt │ ParameterStatus.kt │ PostgresSQLPacket.kt │ Query.kt │ ReadyForQuery.kt │ RowDescription.kt │ SSLRequest.kt │ StartupMessage.kt │ Terminate.kt │ └─Main.kt

NettyServerInitializer

This class defines the order in which packets are processed.

 @Throws(Exception::class) public override fun initChannel(ch: SocketChannel) { val session = Session() val pipeline = ch.pipeline() pipeline.addLast(PostgreSQLDecoder(session), PostgreSQLEncoder(session)) pipeline.addLast(SSLRequestRejectHandler(session), StartupMessageHandler(session)) pipeline.addLast(QueryHandler(session)) pipeline.addLast(TerminateHandler(session)) }

In Netty, every time the server receives a new connection, a new SocketChannel is generated to represent this connection.

The data packets sent and received by this connection will be processed link by link like a pipeline in a factory. The pipeline here is the pipeline.

PostgreSQL’s data packets are structured, so at the front end of this pipeline, we added the data parser PostgreDecoder and the data encoder PostgreEncoder to realize the conversion between our objects and data packets.

Session

The vast majority of packets in PostgreSQL begin with a 1-byte identifier (usually a meaningful character), followed by a 4-byte packet length, followed by packet-specific information. According to the document, the SSLRequest, StartupMessage and CancelRequest packets, due to historical reasons, are directly the length of the packet at the beginning, and there is no packet identifier.

Therefore, we need a separate object to hold the state of the connection, which is created when the connection is created and dies when the connection is disconnected.

 class Session { var state = State.Startup var user: String? = null var database: String? = null val properties = HashMap<String, String>() enum class State { Startup, Authenticating, Query, Terminated } }

Here, I define four different states: Startup handshake, Authenticating authentication, Query query and Terminated termination.

After the client establishes a connection with MiniDB, it first enters the handshake state. The data sent at this time does not have a packet identifier at the beginning.

After the client sends the StartupMessage, it enters the authentication state. The packets that are sent next all have an identifier at the beginning.

After the authentication is successful, the query state is entered, and then the client is allowed to send query commands.

When the connection is disconnected, it enters the termination state and releases resources.

In the subsequent process, more states may be added.

PostgreDecoder

In the decode method, we first judge the state, and then select the packet type according to the state.

 val mType = if(session.state == Session.State.Startup) { val position = `in`.readerIndex() val length = `in`.readInt() val magicNumber = `in`.readInt() `in`.readerIndex(position) if(length == 8 && magicNumber == 80877103) { // 这是他们规定好的MessageType.SSLRequest } else { MessageType.StartupMessage } } else { MessageType.getType(`in`.readByte()) }

According to the data packet type, construct a special data packet object, and then hand over the remaining data to the object for processing.

 fun parse(type: MessageType?, payload: ByteBuf): IncomingPacket? { return when(type) { MessageType.SSLRequest -> SSLRequest().parse(payload) MessageType.StartupMessage -> StartupMessage().parse(payload) MessageType.Query -> Query().parse(payload) MessageType.Terminate -> Terminate().parse(payload) else -> null } }

data pack

According to the data transmission direction, MiniDB divides the data into two types: IncomingPacket and OutgoingPacket, which respectively represent the data packets that the server will receive and the data packets that the server will send.

 interface PostgreSQLPacket { } interface OutgoingPacket: PostgreSQLPacket { fun write(buf: ByteBuf): OutgoingPacket } interface IncomingPacket: PostgreSQLPacket { fun parse(buf: ByteBuf): IncomingPacket }

The IncomingPacket#parse method will read data from the data stream and copy it to its own members;

OutgoingPacket#write will write data to the data stream according to the value of its own member variable.

The return value of these two methods is the object itself, which is convenient for chaining calls (completely personal preference).

A concrete class looks like this:

 class ParameterStatus(private val key: String, private val value: String): OutgoingPacket { override fun write(buf: ByteBuf): OutgoingPacket { buf.writeCharSequence(key, StandardCharsets.UTF_8) buf.writeByte(0) buf.writeCharSequence(value, StandardCharsets.UTF_8) buf.writeByte(0) return this } }

PostgreSQLEncoder

During encoding, we first store the data in an empty data stream (just to know how long it is).

Then we just write data to the output stream in the order described in the official documentation.

 override fun encode(ctx: ChannelHandlerContext?, msg: OutgoingPacket?, out: ByteBuf?) { val mType = msg?.javaClass?.simpleName?.let { MessageType.valueOf(it) } val buf = Unpooled.buffer() msg?.write(buf) val type = mType?.type?.toInt() ?: '?'.code // '?' 代表未知标识符val len = buf.writerIndex() + 4 println("<- ${msg?.javaClass?.simpleName}(${type.toChar()}) len $len") out?.writeByte(type) out?.writeInt(len) out?.writeBytes(buf, buf.writerIndex()) }

Handlers

After talking about packets and codecs, the packet processor is left.

Netty can be smart enough to hand over some packets to only certain processors for processing.

As long as we respond correctly according to the type of the packet, the client can successfully connect to us.

To give a few examples.

At the beginning of the handshake, the client will ask us if we want SSL encryption.

This kind of thing is too early for us, so we decisively return to NO!

 class SSLRequestRejectHandler(private val session: Session) : SimpleChannelInboundHandler<SSLRequest>() { override fun channelRead0(ctx: ChannelHandlerContext?, msg: SSLRequest?) { // 咱们这幼小的MiniDB 可玩不来SSL 这种东西ctx?.writeAndFlush(Unpooled.copiedBuffer("N", StandardCharsets.UTF_8))?.sync() } }

Next, the client will send us a StartupMessage packet, which will contain the username and database name.

In theory, we should let the client prove that it can access the database, we will switch to the authentication phase, send it an authentication method, and wait for its reply.

However, because this process is more complicated, I don’t want the cumbersome authentication process to spoil my interest in tricking the client, so I decided to tell it perfunctorily: Okay, you’ve logged in successfully.

and enter the query stage.

 class StartupMessageHandler(private val session: Session) : SimpleChannelInboundHandler<StartupMessage>() { override fun channelRead0(ctx: ChannelHandlerContext?, msg: StartupMessage?) { // session.state = Session.State.Authenticating // ctx?.writeAndFlush(AuthenticationSASL(listOf("SCRAM-SHA256")))?.sync() // 极其敷衍告诉客户端:你登录成功了ctx?.writeAndFlush(AuthenticationOk())?.sync() ctx?.writeAndFlush(ParameterStatus("client_encoding", "UTF8"))?.sync() ctx?.writeAndFlush(ParameterStatus("DataStyle", "ISO, YMD"))?.sync() ctx?.writeAndFlush(ParameterStatus("TimeZone", "Asia/Shanghai"))?.sync() ctx?.writeAndFlush(ParameterStatus("server_encoding", "UTF8"))?.sync() ctx?.writeAndFlush(ParameterStatus("server_version", "14.5"))?.sync() session.state = Session.State.Query ctx?.writeAndFlush(ReadyForQuery())?.sync() } }

QueryHandler

After entering the query stage, our MiniDB will process the SQL statement sent by the client.

The parsing part of the SQL statement is done by my teammates. All I have to do is feed the SQL statement to the SQL parser he has written, and then read its parsing results.

According to the introduction of the official document, our server must respond after receiving the SQL statement from the client, otherwise the client will wait foolishly.

But since we didn’t write anything about the engine part, we can only pretend to reply a few words now.

 class QueryHandler(private val session: Session) : SimpleChannelInboundHandler<Query>() { private val REGEX_STMT_SET = Regex("set (.+) to (.+)"); override fun channelRead0(ctx: ChannelHandlerContext?, msg: Query?) { try { var queryString = msg?.queryString // 先把查询语句转化为MySQL 风格if(REGEX_STMT_SET.matches(queryString!!)) { queryString = queryString.replace(REGEX_STMT_SET, "SET $1=$2") } // 交给词法解析器val ast = SQLParserDelegate.parse(queryString) println(" Q(${ast.javaClass.simpleName}: $queryString") // 分析解析后的SQL 语句,作出不同的反应when(ast) { is DMLSelectStatement -> { // 这是一条查询语句ctx?.writeAndFlush(RowDescription())?.sync() // 查到了0 条结果也是一种查ctx?.writeAndFlush(CommandComplete("SELECT 0"))?.sync() } is DALSetStatement -> { // 这是一条设置语句for (pair in ast.assignmentList) { // 更新设置session.properties[(pair.key as SysVarPrimary).varText] = pair.value.evaluation(emptyMap()).toString() // 告知客户端设置成功ctx?.writeAndFlush(CommandComplete("SET"))?.sync() ctx?.writeAndFlush(ParameterStatus( (pair.key as SysVarPrimary).varText, session.properties[(pair.key as SysVarPrimary).varText]!! ))?.sync() } } } } catch (e: SQLSyntaxErrorException) { System.err.println(" Q(Error): ${msg?.queryString}") e.printStackTrace() // 告诉客户端你发的东西有问题val err = ErrorResponse() err.message = e.message!! ctx?.writeAndFlush(err)?.sync() } // 等待下一条语句ctx?.writeAndFlush(ReadyForQuery())?.sync() } }

0x03 test

Using Navicat to connect our MiniDB, it is a success without any error!

But on the server side, we can see MiniDB and Navicat chatting happily!

Although Navicat does not show any database, according to their chat records, Navicat actually reads the database list through a SELECT statement.

This prompts us to write database information to a table when implementing the database engine, so that no additional work is required.

0x04 Trivia

In the actual writing process, I found that the actual client and server will send some information that is not mentioned in the document (or I didn’t read it), so that the server I wrote cannot be connected normally.

In this case, we need to build a real server, and then use the packet capture tool to find different data packets.

Here, I am using sokit’s relay mode. The picture above shows the data of both parties when Navicat connects to the PostgreSQL database, and the picture below shows the data of both parties when Navicat connects to MiniDB.

So far, the network communication part of MiniDB has come to an end. The next step is to enter the development of the database engine, and then implement the real database.

This article is reproduced from: https://blog.lss233.com/minidb-kai-fa-shou-zha-2-wang-luo-tong-xin-postgresql-fu-wu-duan-shi-xian/
This site is for inclusion only, and the copyright belongs to the original author.