Original link: https://jasonkayzk.github.io/2022/12/07/mini-redis%E9%A1%B9%E7%9B%AE-6-%E6%B5%8B%E8%AF%95% E4%B8%8E%E7%A4%BA%E4%BE%8B/
This article is the last section of this series, mainly to test the functions we have implemented before;
Rust provides very convenient tools for writing tests and examples;
source code:
Series of articles:
- “mini-redis project-1-introduction”
- “mini-redis project-2-storage layer”
- “mini-redis project-3-connection layer”
- “mini-redis project-4-server”
- “mini-redis project-5-client”
- “mini-redis project-6-test and example”
mini-redis project-6-test and example
write test code
By default, the integration test cases in the Cargo project can be written in the tests directory at the same level as the src directory;
Here is the server-side test code:
tests/server.rs
/// A basic "hello world" style test. A server instance is started in a/// background task. A client TCP connection is then established and raw redis/// commands are sent to the server. The response is evaluated at the byte/// level.#[tokio::test]async fn key_value_get_set() { let addr = start_server().await; // Establish a connection to the server let mut stream = TcpStream::connect(addr).await.unwrap(); // Get a key, data is missing stream .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") .await .unwrap(); // Read nil response let mut response = [0; 5]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"$-1\r\n", &response); // Set a key stream .write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n") .await .unwrap(); // Read OK let mut response = [0; 5]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"+OK\r\n", &response); // Get the key, data is present stream .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") .await .unwrap(); // Shutdown the write half stream.shutdown().await.unwrap(); // Read "world" response let mut response = [0; 11]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"$5\r\nworld\r\n", &response); // Receive `None` assert_eq!(0, stream.read(&mut response).await.unwrap());}/// Similar to the basic key-value test, however, this time timeouts will be/// tested. This test demonstrates how to test time related behavior.////// When writing tests, it is useful to remove sources of non-determinism. Time/// is a source of non-determinism. Here, we "pause" time using the/// `time::pause()` function. This function is available with the `test-util`/// feature flag. This allows us to deterministically control how time appears/// to advance to the application.#[tokio::test]async fn key_value_timeout() { let addr = start_server().await; // Establish a connection to the server let mut stream = TcpStream::connect(addr).await.unwrap(); // Set a key stream .write_all( b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\ +EX\r\n:1\r\n", ) .await .unwrap(); let mut response = [0; 5]; // Read OK stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"+OK\r\n", &response); // Get the key, data is present stream .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") .await .unwrap(); // Read "world" response let mut response = [0; 11]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"$5\r\nworld\r\n", &response); // Wait for the key to expire time::sleep(Duration::from_secs(1)).await; // Get a key, data is missing stream .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") .await .unwrap(); // Read nil response let mut response = [0; 5]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"$-1\r\n", &response);}#[tokio::test]async fn pub_sub() { let addr = start_server().await; let mut publisher = TcpStream::connect(addr).await.unwrap(); // Publish a message, there are no subscribers yet so the server will // return `0`. publisher .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n") .await .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":0\r\n", &response); // Create a subscriber. This subscriber will only subscribe to the `hello` // channel. let mut sub1 = TcpStream::connect(addr).await.unwrap(); sub1.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n") .await .unwrap(); // Read the subscribe response let mut response = [0; 34]; sub1.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..] ); // Publish a message, there now is a subscriber publisher .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n") .await .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":1\r\n", &response); // The first subscriber received the message let mut response = [0; 39]; sub1.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\nworld\r\n"[..], &response[..] ); // Create a second subscriber // // This subscriber will be subscribed to both `hello` and `foo` let mut sub2 = TcpStream::connect(addr).await.unwrap(); sub2.write_all(b"*3\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n$3\r\nfoo\r\n") .await .unwrap(); // Read the subscribe response let mut response = [0; 34]; sub2.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..] ); let mut response = [0; 32]; sub2.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], &response[..] ); // Publish another message on `hello`, there are two subscribers publisher .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\njazzy\r\n") .await .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":2\r\n", &response); // Publish a message on `foo`, there is only one subscriber publisher .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n") .await .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":1\r\n", &response); // The first subscriber received the message let mut response = [0; 39]; sub1.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], &response[..] ); // The second subscriber received the message let mut response = [0; 39]; sub2.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], &response[..] ); // The first subscriber **did not** receive the second message let mut response = [0; 1]; time::timeout(Duration::from_millis(100), sub1.read(&mut response)) .await .unwrap_err(); // The second subscriber **did** receive the message let mut response = [0; 35]; sub2.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], &response[..] );}#[tokio::test]async fn manage_subscription() { let addr = start_server().await; let mut publisher = TcpStream::connect(addr).await.unwrap(); // Create a subscriber let mut sub = TcpStream::connect(addr).await.unwrap(); sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n") .await .unwrap(); // Read the subscribe response let mut response = [0; 34]; sub.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..] ); // Update subscription to add `foo` sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$3\r\nfoo\r\n") .await .unwrap(); let mut response = [0; 32]; sub.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], &response[..] ); // Update subscription to remove `hello` sub.write_all(b"*2\r\n$11\r\nUNSUBSCRIBE\r\n$5\r\nhello\r\n") .await .unwrap(); let mut response = [0; 37]; sub.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$11\r\nunsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..] ); // Publish a message to `hello` and then a message to `foo` publisher .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n") .await .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":0\r\n", &response); publisher .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n") .await .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":1\r\n", &response); // Receive the message // The second subscriber **did** receive the message let mut response = [0; 35]; sub.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], &response[..] ); // No more messages let mut response = [0; 1]; time::timeout(Duration::from_millis(100), sub.read(&mut response)) .await .unwrap_err(); // Unsubscribe from all channels sub.write_all(b"*1\r\n$11\r\nunsubscribe\r\n") .await .unwrap(); let mut response = [0; 35]; sub.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"[..], &response[..] );}// In this case we test that server Responds with an Error message if a client// sends an unknown command#[tokio::test]async fn send_error_unknown_command() { let addr = start_server().await; // Establish a connection to the server let mut stream = TcpStream::connect(addr).await.unwrap(); // Get a key, data is missing stream .write_all(b"*2\r\n$3\r\nFOO\r\n$5\r\nhello\r\n") .await .unwrap(); let mut response = [0; 28]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"-err unknown command \'foo\'\r\n", &response);}// In this case we test that server Responds with an Error message if a client// sends an GET or SET command after a SUBSCRIBE#[tokio::test]async fn send_error_get_set_after_subscribe() { let addr = start_server().await; let mut stream = TcpStream::connect(addr).await.unwrap(); // send SUBSCRIBE command stream .write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n") .await .unwrap(); let mut response = [0; 34]; stream.read_exact(&mut response).await.unwrap(); assert_eq!( &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..] ); stream .write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n") .await .unwrap(); let mut response = [0; 28]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"-err unknown command \'set\'\r\n", &response); stream .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") .await .unwrap(); let mut response = [0; 28]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"-err unknown command \'get\'\r\n", &response);}async fn start_server() -> SocketAddr { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await }); addr}
Each function marked with the #[tokio::test]
macro is a separate test case, and the internal start_server
is used to start the server at the beginning;
Client test code:
tests/client.rs
/// A PING PONG test without message provided./// It should return "PONG".#[tokio::test]async fn ping_pong_without_message() { let (addr, _) = start_server().await; let mut client = client::connect(addr).await.unwrap(); let pong = client.ping(None).await.unwrap(); assert_eq!(b"PONG", &pong[..]);}/// A PING PONG test with message provided./// It should return the message.#[tokio::test]async fn ping_pong_with_message() { let (addr, _) = start_server().await; let mut client = client::connect(addr).await.unwrap(); let pong = client.ping(Some("你好世界".to_string())).await.unwrap(); assert_eq!("你好世界".as_bytes(), &pong[..]);}/// A basic "hello world" style test. A server instance is started in a/// background task. A client instance is then established and set and get/// commands are sent to the server. The response is then evaluated#[tokio::test]async fn key_value_get_set() { let (addr, _) = start_server().await; let mut client = client::connect(addr).await.unwrap(); client.set("hello", "world".into()).await.unwrap(); let value = client.get("hello").await.unwrap().unwrap(); assert_eq!(b"world", &value[..])}/// similar to the "hello world" style test, But this time/// a single channel subscription will be tested instead#[tokio::test]async fn receive_message_subscribed_channel() { let (addr, _) = start_server().await; let client = client::connect(addr).await.unwrap(); let mut subscriber = client.subscribe(vec!["hello".into()]).await.unwrap(); tokio::spawn(async move { let mut client = client::connect(addr).await.unwrap(); client.publish("hello", "world".into()).await.unwrap() }); let message = subscriber.next_message().await.unwrap().unwrap(); assert_eq!("hello", &message.channel); assert_eq!(b"world", &message.content[..])}/// test that a client gets messages from multiple subscribed channels#[tokio::test]async fn receive_message_multiple_subscribed_channels() { let (addr, _) = start_server().await; let client = client::connect(addr).await.unwrap(); let mut subscriber = client .subscribe(vec!["hello".into(), "world".into()]) .await .unwrap(); tokio::spawn(async move { let mut client = client::connect(addr).await.unwrap(); client.publish("hello", "world".into()).await.unwrap() }); let message1 = subscriber.next_message().await.unwrap().unwrap(); assert_eq!("hello", &message1.channel); assert_eq!(b"world", &message1.content[..]); tokio::spawn(async move { let mut client = client::connect(addr).await.unwrap(); client.publish("world", "howdy?".into()).await.unwrap() }); let message2 = subscriber.next_message().await.unwrap().unwrap(); assert_eq!("world", &message2.channel); assert_eq!(b"howdy?", &message2.content[..])}/// test that a client accurately removes its own subscribed chanel list/// when unsubscribing to all subscribed channels by submitting an empty vec#[tokio::test]async fn unsubscribes_from_channels() { let (addr, _) = start_server().await; let client = client::connect(addr).await.unwrap(); let mut subscriber = client .subscribe(vec!["hello".into(), "world".into()]) .await .unwrap(); subscriber.unsubscribe(&[]).await.unwrap(); assert_eq!(subscriber.get_subscribed().len(), 0);}async fn start_server() -> (SocketAddr, JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let handle = tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await }); (addr, handle)}
After writing, you can use the cargo command to test:
$ cargo test --all running 0 teststest result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s Running unittests src/bin/cli.rs (target/debug/deps/mini_redis_cli-01da23b4d263898c)running 0 teststest result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s Running unittests src/bin/server.rs (target/debug/deps/mini_redis_server-9faa14d080da4ec7)running 0 teststest result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s Running tests/client.rs (target/debug/deps/client-614c81c1e6844848)running 6 teststest ping_pong_without_message ... oktest ping_pong_with_message ... oktest key_value_get_set ... oktest unsubscribes_from_channels ... oktest receive_message_subscribed_channel ... oktest receive_message_multiple_subscribed_channels ... oktest result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s Running tests/server.rs (target/debug/deps/server-e059dba3cac184ec)running 6 teststest send_error_unknown_command ... oktest key_value_get_set ... oktest send_error_get_set_after_subscribe ... oktest pub_sub ... oktest manage_subscription ... oktest key_value_timeout ... oktest result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 1.00s Doc-tests mini-redisrunning 6 teststest src/client/cli.rs - client::cli::Client::ping (line 49) - compile ... oktest src/client/cli.rs - client::cli::Client::get (line 80) - compile ... oktest src/client/cli.rs - client::cli::Client::publish (line 222) - compile ... oktest src/client/mod.rs - client::connect (line 19) - compile ... oktest src/client/cli.rs - client::cli::Client::set (line 123) - compile ... oktest src/client/cli.rs - client::cli::Client::set_expires (line 159) - compile ... oktest result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.05s
Configure GitHub Actions
We can also configure Github Actions so that every commit is automatically tested;
The configuration is as follows:
.github/workflows/ci.yaml
name: CI # Continuous Integrationon: workflow_dispatch: push: paths-ignore: - '**.md' pull_request: paths-ignore: - '**.md'env: RUST_TOOLCHAIN: stable TOOLCHAIN_PROFILE: minimaljobs: lints: name: Run cargo fmt and cargo clippy runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 - name: Install toolchain uses: actions-rs/toolchain@v1 with: profile: $ toolchain: $ override: true components: rustfmt, clippy - name: Cache uses: Swatinem/rust-cache@v2 - name: Run cargo fmt uses: actions-rs/cargo@v1 with: command: fmt args: --all -- --check - name: Run cargo clippy uses: actions-rs/cargo@v1 with: command: clippy args: -- -D warnings test: name: Run cargo test runs-on: ubuntu-latest steps: - name: Checkout sources uses: actions/checkout@v3 - name: Install toolchain uses: actions-rs/toolchain@v1 with: profile: $ toolchain: $ override: true - name: Cache uses: Swatinem/rust-cache@v2 - name: Run cargo test uses: actions-rs/cargo@v1 env: RUST_TEST_THREADS: 8 with: command: test args: --all-features
Test Results:
write example
Cargo also provides the function of writing examples, which are in the examples directory by default;
Here are a few simple examples to guide others how to use it:
examples/hello.rs
//! Hello world client.//!//! A simple client that connects to a mini-redis server, sets key "hello" with value "world",//! and gets it from the server after//!//! You can test this out by running://!//! cargo run --bin mini-redis-server//!//! And then in another terminal run://!//! cargo run --example hello_worlduse mini_redis::client;use mini_redis::error::MiniRedisClientError;#[tokio::main]pub async fn main() -> Result<(), MiniRedisClientError> { // Open a connection to the mini-redis address. let mut client = client::connect("127.0.0.1:6379").await?; // Set the key "hello" with value "world" let result = client.set("hello", "world".into()).await?; println!("set value to the server success, result: {:?}", result); // Get key "hello" let result = client.get("hello").await?; println!("got value from the server success, result: {:?}", result); Ok(())}
examples/ping.rs
#[tokio::main]pub async fn main() -> Result<(), MiniRedisConnectionError> { // Open a connection to the mini-redis address. let mut client = client::connect("127.0.0.1:6379").await?; let result = client.ping(None).await?; println!("empty ping response: {:?}", result); let result = client.ping(Some("hello".into())).await?; println!("bytes ping response: {:?}", result); Ok(())}
examples/pub.rs
#[tokio::main]async fn main() -> Result<(), MiniRedisClientError> { // Open a connection to the mini-redis address. let mut client = client::connect("127.0.0.1:6379").await?; // publish message `bar` on channel foo let res = client.publish("foo", "bar".into()).await?; println!("pushed message success, res: {:?}", res); Ok(())}
examples/sub.rs
#[tokio::main]pub async fn main() -> Result<(), MiniRedisClientError> { // Open a connection to the mini-redis address. let client = client::connect("127.0.0.1:6379").await?; // subscribe to channel foo let mut subscriber = client.subscribe(vec!["foo".into()]).await?; // await messages on channel foo if let Some(msg) = subscriber.next_message().await? { println!( "got message from the channel: {}; message = {:?}", msg.channel, msg.content ); } Ok(())}
Summarize
By learning mini-redis, you should be able to learn:
- The use of tokio framework, how to write asynchronous code;
- A simple implementation of Redis, and a communication protocol;
- How the server and client are implemented;
- Connection management, graceful shutdown of the server;
- …
appendix
source code:
Series of articles:
- “mini-redis project-1-introduction”
- “mini-redis project-2-storage layer”
- “mini-redis project-3-connection layer”
- “mini-redis project-4-server”
- “mini-redis project-5-client”
- “mini-redis project-6-test and example”
This article is reproduced from: https://jasonkayzk.github.io/2022/12/07/mini-redis%E9%A1%B9%E7%9B%AE-6-%E6%B5%8B%E8%AF%95% E4%B8%8E%E7%A4%BA%E4%BE%8B/
This site is only for collection, and the copyright belongs to the original author.