2022-22: Implement compression support for Databend

Original link: https://xuanwo.io/reports/2022-22/

Iteration 14 starts on 5/21 and ends on 6/4 for two weeks. This cycle is mainly to support the compression of Databend, specifically to support the following functions:

Support reading compressed files in Stage/Location

 copy into ontime200 from '@s1' FILES = ( 'ontime_200.csv.gz' ) FILE_FORMAT = ( type = 'CSV' field_delimiter = ',' compression = 'gzip' record_delimiter = '\n' skip_header = 1 );

Support streaming upload of compressed files:

 curl -H "insert_sql:insert into ontime_streaming_load format Csv" \ -H "skip_header:1" \ -H "compression:zstd" \ -F "upload=@/tmp/ontime_200.csv.zst" \ -u root: \ -XPUT \ "http://localhost:8000/v1/streaming_load"

Rust currently has a relatively complete library of compression algorithms, so the main work is how to integrate the decompression logic with the existing logic of Databend.


Unzip Workflow

Most decompression algorithms can be abstracted into a state machine like this:

The most complicated of them is the Decode process:

  • When data consumption is complete, more data needs to be obtained
  • Decode needs to be called again when the data has not been consumed

Some compression algorithms support multiple objects, so it is also possible to perform reinit in the Flush state to start a new round of decompression, which will not be repeated here.

This abstraction is used in OpenDAL :

 pub enum DecompressState {  Reading,  Decoding,  Flushing,  Done, } 

DecompressDecoder is exposed externally:

 impl DecompressDecoder {  /// Get decompress state pub fn state ( & self) -> DecompressState {}  /// Fetch more data from underlying reader. pub fn fill ( & mut self, bs: & [ u8 ]) -> usize {}  /// Decode data into output. pub fn decode ( & mut self, output: & mut [ u8 ]) -> Result < usize > {}  /// Finish a decompress press, flushing remaining data into output. pub fn finish ( & mut self, output: & mut [ u8 ]) -> Result < usize > {} } 

For the convenience of users, OpenDAL implements DecompressDecoder based on DecompressReader

 impl < R: BytesRead > futures::io::AsyncRead for DecompressReader < R > {} 

Docking async-compression

async-compression is an asynchronous compression library developed by @Nemo157 that supports most of the commonly used compression algorithms. OpenDAL is implemented based on async-compression:

It uses the Decode trait internally:

 pub trait Decode {  /// Reinitializes this decoder ready to decode a new member/frame of data. fn reinit ( & mut self) -> Result < () > ;   /// Returns whether the end of the stream has been read fn decode (  & mut self,  input: & mut PartialBuffer < impl AsRef < [ u8 ] >> ,  output: & mut PartialBuffer < impl AsRef < [ u8 ] > + AsMut < [ u8 ] >> ,  ) -> Result < bool > ;   /// Returns whether the internal buffers are flushed fn flush ( & mut self, output: & mut PartialBuffer < impl AsRef < [ u8 ] > + AsMut < [ u8 ] >> )  -> Result < bool > ;   /// Returns whether the internal buffers are flushed fn finish (  & mut self,  output: & mut PartialBuffer < impl AsRef < [ u8 ] > + AsMut < [ u8 ] >> ,  ) -> Result < bool > ; } 

async-compression In order to avoid exposing the details of the internal implementation to the user, the codec -related modules are all private, and the user can only call them through bufread and other methods.

But this method does not work in Databend: In order to make full use of machine resources, Databend strictly divides asynchronous and synchronous tasks:

  • The task of CPU Bound must be performed in a synchronous Runtime
  • The task of IO Bound must be performed in an asynchronous Runtime

If decompression is performed in asynchronous runtime, it may block runtime and reduce overall throughput performance.

If we want to control which runtime the decompression takes place, we must be able to directly manipulate the underlying Decode . For this, I submitted a proposal: Export codec and Encode/Decode trait . The author is just doing related work recently, and wants to expose some of the underlying Codec. So I introduced Databend’s Use Case in detail in my proposal, and shared my temporary Workaround. In fact, the code of async-compression is well organized, just make the internal Decode trait and its implementation public.

Since packages published to a crate must use the tagged version, I published this wordaround as a crate: async-compression-issue-150-workaround .

Here is a little trick of Cargo.toml : specify an alias for the package, so that it can use another package specified by itself without modifying the crate name.

 # Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved. async - compression = { package = "async-compression-issue-150-workaround" , version = "0.3.15-issue-150" , features = [ "futures-io" , "all-algorithms" , ], optional = true }

Docking with Databend

After completing the development of OpenDAL test, Databend is mainly for docking and testing. During the docking, it was unexpectedly discovered that Databend currently handles Streaming Loading and COPY FROM STAGE using two completely different paths, so the decompression process must not be implemented twice, and we hope to unify them in the future.

So far, Databend has been able to support GZIP , BZ2 , BROTLI , ZSTD , DEFLATE , RAW_DEFLATE and other compression algorithms. It can be further expanded in the future as needed. Welcome to try the decompression function!

next steps

  • Refactor the logic for streaming and loading from STAGE to reuse the same logic as much as possible
  • Decompression performance test and its optimization
  • Support LZO, SNAPPY and other compression algorithms
  • Support ZIP, TAR and other archive formats

This article is reproduced from: https://xuanwo.io/reports/2022-22/
This site is for inclusion only, and the copyright belongs to the original author.

Leave a Comment