Original link: https://jasonkayzk.github.io/2022/12/07/mini-redis%E9%A1%B9%E7%9B%AE-5-%E5%AE%A2%E6%88%B7% E7%AB%AF/
In the previous article “mini-redis project-4-server” , we implemented the server of mini-redis, and this article implements the client;
source code:
Series of articles:
- “mini-redis project-1-introduction”
- “mini-redis project-2-storage layer”
- “mini-redis project-3-connection layer”
- “mini-redis project-4-server”
- “mini-redis project-5-client”
- “mini-redis project-6-test and example”
mini-redis project-5-client
client entrance
Same as the server, the entry of the client is also an executable file in the bin directory, the specific implementation is as follows:
First look at the definition of the client command line:
src/bin/cli.rs
use mini_redis::client::cmd::Command;#[derive(Parser, Debug)]#[clap( name = "mini-redis-cli", version, author, about = "Issue Redis commands")]struct Cli { #[clap(subcommand)] command: Command, #[clap(name = "hostname", long, default_value = "127.0.0.1")] host: String, #[clap(long, default_value_t = DEFAULT_PORT)] port: u16,}
We define the client’s command line parameters Cli, including:
- command: subcommand, marked with
#[clap(subcommand)]
at the same time; note: the Command here is the client command line Command, not the command in mini-redis we described in the previous section! - host: server address, the default is
127.0.0.1
; - port: server port address, the default is
DEFAULT_PORT=6379
;
The main function is defined as follows:
/// Entry point for CLI tool.////// `flavor = "current_thread"` is used here to avoid spawning background/// threads. The CLI tool use case benefits more by being lighter instead of/// multi-threaded.#[tokio::main(flavor = "current_thread")]async fn main() -> Result<(), MiniRedisClientError> { dotenv().ok(); logger::init(); // Parse command line arguments let cli = Cli::parse(); debug!("get cli: {:?}", cli); // Get the remote address to connect to let addr = format!("{}:{}", cli.host, cli.port); // Establish a connection let mut client = client::connect(&addr).await?; // Process the requested command match cli.command { Command::Ping { msg } => { let value = client.ping(msg).await?; if let Ok(string) = std::str::from_utf8(&value) { println!("\"{}\"", string); } else { println!("{:?}", value); } } Command::Get { key } => { if let Some(value) = client.get(&key).await? { if let Ok(string) = std::str::from_utf8(&value) { println!("\"{}\"", string); } else { println!("{:?}", value); } } else { println!("(nil)"); } } Command::Set { key, value, expires: None, } => { client.set(&key, value).await?; println!("OK"); } Command::Set { key, value, expires: Some(expires), } => { client.set_expires(&key, value, expires).await?; println!("OK"); } Command::Publish { channel, message } => { client.publish(&channel, message).await?; println!("Publish OK"); } Command::Subscribe { channels } => { if channels.is_empty() { return Err(MiniRedisConnectionError::InvalidArgument( "channel(s) must be provided".into(), ) .into()); } let mut subscriber = client.subscribe(channels).await?; // await messages on channels while let Some(msg) = subscriber.next_message().await? { println!( "got message from the channel: {}; message = {:?}", msg.channel, msg.content ); } } } Ok(())}
Similar to the server side, also:
- First initialize env, logger, cli;
- Then establish a TCP connection with the server through
client::connect(&addr).await
; - Finally, execute the corresponding command specified by the command line through match;
The above Command and Client are defined in the client module, let’s look at it in detail below;
Client module overview
The directory structure of the client module is as follows:
$ tree ./src/client ./src/client├── cli.rs├── cmd.rs├── mod.rs└── subscriber.rs
Contents of each file:
- mod: The connect function is exposed to the outside world to obtain a TCP connection and create a Client;
- cmd: Define the Command command of the client command line tool;
- cli: define Client implementation;
- subscriber: client channel subscriber;
Let’s look at it separately;
Command line subcommands
In cmd.rs
, the subcommand command field marked with the #[clap(subcommand)]
macro above is defined;
The following is the specific implementation:
src/client/cmd.rs
#[derive(Subcommand, Debug)]pub enum Command { Ping { /// Message to ping msg: Option<String>, }, /// Get the value of key. Get { /// Name of key to get key: String, }, /// Set key to hold the string value. Set { /// Name of key to set key: String, /// Value to set. #[clap(parse(from_str = bytes_from_str))] value: Bytes, /// Expire the value after specified amount of time #[clap(parse(try_from_str = duration_from_ms_str))] expires: Option<Duration>, }, /// Publisher to send a message to a specific channel. Publish { /// Name of channel channel: String, #[clap(parse(from_str = bytes_from_str))] /// Message to publish message: Bytes, }, /// Subscribe a client to a specific channel or channels. Subscribe { /// Specific channel or channels channels: Vec<String>, },}fn duration_from_ms_str(src: &str) -> Result<Duration, ParseIntError> { let ms = src.parse::<u64>()?; Ok(Duration::from_millis(ms))}fn bytes_from_str(src: &str) -> Bytes { Bytes::from(src.to_string())}
It can be seen that each subcommand basically corresponds to the commands in mini-redis one by one;
At the same time, several commands specify type conversion through from_str
, and complete the parsing of the corresponding format;
Here you can learn the usage of the clap command line library;
create client
The function connect to create a client is defined in mod.rs
:
src/client/mod.rs
/// Establish a connection with the Redis server located at `addr`.////// `addr` may be any type that can be asynchronously converted to a/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`/// trait is the Tokio version and not the `std` version.pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client, MiniRedisConnectionError> { // The `addr` argument is passed directly to `TcpStream::connect`. This // performs any asynchronous DNS lookup and attempts to establish the TCP // connection. An error at either step returns an error, which is then // bubbled up to the caller of `mini_redis` connect. let socket = TcpStream::connect(addr).await?; // Initialize the connection state. This allocates read/write buffers to // perform redis protocol frame parsing. let connection = Connection::new(socket); Ok(Client { connection })}
The function creates the Connection (send a whole Frame) defined in the third article by obtaining the socket, and returns;
The logic is very simple, so I won’t repeat it;
The client implements Client
First look at the definition of Client:
src/client/cli.rs
pub struct Client { pub(crate) connection: Connection,}
Very simple, it wraps the Connection we defined earlier;
Let’s look at the method in Client:
impl Client { pub async fn ping(&mut self, msg: Option<String>) -> Result<Bytes, MiniRedisConnectionError> { let frame = Ping::new(msg).into_frame()?; self.connection.write_frame(&frame).await?; match self.read_response().await? { Frame::Simple(value) => Ok(value.into()), Frame::Bulk(value) => Ok(value), frame => Err(MiniRedisConnectionError::CommandExecute(frame.to_string())), } } pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>, MiniRedisConnectionError> { let frame = Get::new(key).into_frame()?; self.connection.write_frame(&frame).await?; match self.read_response().await? { Frame::Simple(value) => Ok(Some(value.into())), Frame::Bulk(value) => Ok(Some(value)), Frame::Null => Ok(None), frame => Err(MiniRedisConnectionError::CommandExecute(frame.to_string())), } } pub async fn set(&mut self, key: &str, value: Bytes) -> Result<(), MiniRedisConnectionError> { self.set_cmd(Set::new(key, value, None)).await } pub async fn set_expires( &mut self, key: &str, value: Bytes, expiration: Duration, ) -> Result<(), MiniRedisConnectionError> { self.set_cmd(Set::new(key, value, Some(expiration))).await } async fn set_cmd(&mut self, cmd: Set) -> Result<(), MiniRedisConnectionError> { let frame = cmd.into_frame()?; self.connection.write_frame(&frame).await?; match self.read_response().await? { Frame::Simple(response) if response == "OK" => Ok(()), frame => Err(MiniRedisConnectionError::CommandExecute(frame.to_string())), } } pub async fn publish( &mut self, channel: &str, message: Bytes, ) -> Result<u64, MiniRedisConnectionError> { let frame = Publish::new(channel, message).into_frame()?; self.connection.write_frame(&frame).await?; match self.read_response().await? { Frame::Integer(response) => Ok(response), frame => Err(MiniRedisConnectionError::CommandExecute(frame.to_string())), } } pub async fn subscribe( mut self, channels: Vec<String>, ) -> Result<Subscriber, MiniRedisConnectionError> { self.subscribe_cmd(&channels).await?; Ok(Subscriber { client: self, subscribed_channels: channels, }) } pub(crate) async fn subscribe_cmd( &mut self, channels: &[String], ) -> Result<(), MiniRedisConnectionError> { let frame = Subscribe::new(channels).into_frame()?; self.connection.write_frame(&frame).await?; for channel in channels { let response = self.read_response().await?; // Verify it is confirmation of subscription. match response { Frame::Array(ref frame) => match frame.as_slice() { [subscribe, schannel, ..] if *subscribe == "subscribe" && *schannel == channel => { debug!("subscribe channel: {} success", channel); } _ => { error!("subscribe frame failed, response: {}", response); return Err(MiniRedisConnectionError::CommandExecute( response.to_string(), )); } }, frame => { error!( "subscribe frame failed, response frame type not match: {}", frame ); return Err(MiniRedisConnectionError::InvalidFrameType); } }; } Ok(()) } pub(crate) async fn read_response(&mut self) -> Result<Frame, MiniRedisConnectionError> { let response = self.connection.read_frame().await?; match response { Some(Frame::Error(msg)) => Err(MiniRedisConnectionError::CommandExecute(msg)), Some(frame) => Ok(frame), None => { // Receiving `None` here indicates the server has closed the // connection without sending a frame. This is unexpected and is // represented as a "connection reset by peer" error. Err(MiniRedisConnectionError::Disconnect) } } }}
The logic is very similar, both are:
- First create a corresponding command, and then call
into_frame
to convert to a specific frame; - Then send the data to the Server through the write_frame method encapsulated in the connection;
- Finally, call the internal read_response method, parse the response and output it;
ChannelSubscriber
When the subscribe command is executed, a corresponding Subscriber will be created to subscribe to each channel;
The specific implementation is as follows:
/// A client that has entered pub/sub mode.////// Once clients subscribe to a channel, they may only perform pub/sub related/// commands. The `Client` type is transitioned to a `Subscriber` type in order/// to prevent non-pub/sub methods from being called.pub struct Subscriber { /// The subscribed client. pub(crate) client: Client, /// The set of channels to which the `Subscriber` is currently subscribed. pub(crate) subscribed_channels: Vec<String>,}/// A message received on a subscribed channel.#[derive(Debug, Clone)]pub struct Message { pub channel: String, pub content: Bytes,}impl Subscriber { /// Subscribe to a list of new channels pub async fn subscribe(&mut self, channels: &[String]) -> Result<(), MiniRedisConnectionError> { // Issue the subscribe command self.client.subscribe_cmd(channels).await?; // Update the set of subscribed channels. self.subscribed_channels .extend(channels.iter().map(Clone::clone)); Ok(()) } /// Returns the set of channels currently subscribed to. pub fn get_subscribed(&self) -> &[String] { &self.subscribed_channels } /// Receive the next message published on a subscribed channel, waiting if /// necessary. /// /// `None` indicates the subscription has been terminated. pub async fn next_message(&mut self) -> Result<Option<Message>, MiniRedisConnectionError> { match self.client.connection.read_frame().await? { Some(frame) => { debug!("subscribe received next message: {:?}", frame); match frame { Frame::Array(ref frame) => match frame.as_slice() { [message, channel, content] if *message == "message" => Ok(Some(Message { channel: channel.to_string(), content: Bytes::from(content.to_string()), })), _ => { error!("invalid message, frame: {:?}", frame); Err(MiniRedisConnectionError::InvalidFrameType) } }, frame => Err(MiniRedisConnectionError::CommandExecute(frame.to_string())), } } None => Ok(None), } } /// Convert the subscriber into a `Stream` yielding new messages published /// on subscribed channels. /// /// `Subscriber` does not implement stream itself as doing so with safe code /// is non trivial. The usage of async/await would require a manual Stream /// implementation to use `unsafe` code. Instead, a conversion function is /// provided and the returned stream is implemented with the help of the /// `async-stream` crate. pub fn into_stream(mut self) -> impl Stream<Item = Result<Message, MiniRedisConnectionError>> { // Uses the `try_stream` macro from the `async-stream` crate. Generators // are not stable in Rust. The crate uses a macro to simulate generators // on top of async/await. There are limitations, so read the // documentation there. try_stream! { while let Some(message) = self.next_message().await? { yield message; } } } /// Unsubscribe to a list of new channels pub async fn unsubscribe( &mut self, channels: &[String], ) -> Result<(), MiniRedisConnectionError> { let frame = Unsubscribe::new(channels).into_frame()?; debug!("unsubscribe command: {:?}", frame); // Write the frame to the socket self.client.connection.write_frame(&frame).await?; // if the input channel list is empty, server acknowledges as unsubscribing // from all subscribed channels, so we assert that the unsubscribe list received // matches the client subscribed one let num = if channels.is_empty() { self.subscribed_channels.len() } else { channels.len() }; // Read the response for _ in 0..num { let response = self.client.read_response().await?; match response { Frame::Array(ref frame) => match frame.as_slice() { [unsubscribe, channel, ..] if *unsubscribe == "unsubscribe" => { let len = self.subscribed_channels.len(); if len == 0 { // There must be at least one channel return Err(MiniRedisConnectionError::InvalidArgument( response.to_string(), )); } // unsubscribed channel should exist in the subscribed list at this point self.subscribed_channels.retain(|c| *channel != &c[..]); // Only a single channel should be removed from the // list of subscribed channels. if self.subscribed_channels.len() != len - 1 { return Err(MiniRedisConnectionError::CommandExecute( response.to_string(), )); } } _ => { return Err(MiniRedisConnectionError::InvalidFrameType); } }, frame => return Err(MiniRedisConnectionError::CommandExecute(frame.to_string())), }; } Ok(()) }}
The implementation logic is also very clear: the main thing is to encapsulate the Client in a layer, and then read the message through the stream provided by the connection;
At the same time, it can also receive sub/unsub commands, and the processing logic is very similar to that of the server;
summary
This article explains the implementation of the client. Due to the encapsulation of the previous parts, the implementation of the client becomes very simple;
The next article will also be the last in this series, mainly to test our implementation;
appendix
source code:
Series of articles:
- “mini-redis project-1-introduction”
- “mini-redis project-2-storage layer”
- “mini-redis project-3-connection layer”
- “mini-redis project-4-server”
- “mini-redis project-5-client”
- “mini-redis project-6-test and example”
This article is reproduced from: https://jasonkayzk.github.io/2022/12/07/mini-redis%E9%A1%B9%E7%9B%AE-5-%E5%AE%A2%E6%88%B7% E7%AB%AF/
This site is only for collection, and the copyright belongs to the original author.