Original link: https://xuanwo.io/reports/2022-22/
Iteration 14 starts on 5/21 and ends on 6/4 for two weeks. This cycle is mainly to support the compression of Databend, specifically to support the following functions:
Support reading compressed files in Stage/Location
copy into ontime200 from '@s1' FILES = ( 'ontime_200.csv.gz' ) FILE_FORMAT = ( type = 'CSV' field_delimiter = ',' compression = 'gzip' record_delimiter = '\n' skip_header = 1 );
Support streaming upload of compressed files:
curl -H "insert_sql:insert into ontime_streaming_load format Csv" \ -H "skip_header:1" \ -H "compression:zstd" \ -F "upload=@/tmp/ontime_200.csv.zst" \ -u root: \ -XPUT \ "http://localhost:8000/v1/streaming_load"
Rust currently has a relatively complete library of compression algorithms, so the main work is how to integrate the decompression logic with the existing logic of Databend.
Unzip Workflow
Most decompression algorithms can be abstracted into a state machine like this:
The most complicated of them is the Decode
process:
- When data consumption is complete, more data needs to be obtained
-
Decode
needs to be called again when the data has not been consumed
Some compression algorithms support multiple objects, so it is also possible to perform
reinit
in theFlush
state to start a new round of decompression, which will not be repeated here.
This abstraction is used in OpenDAL :
pub enum DecompressState { Reading, Decoding, Flushing, Done, }
DecompressDecoder
is exposed externally:
impl DecompressDecoder { /// Get decompress state pub fn state ( & self) -> DecompressState {} /// Fetch more data from underlying reader. pub fn fill ( & mut self, bs: & [ u8 ]) -> usize {} /// Decode data into output. pub fn decode ( & mut self, output: & mut [ u8 ]) -> Result < usize > {} /// Finish a decompress press, flushing remaining data into output. pub fn finish ( & mut self, output: & mut [ u8 ]) -> Result < usize > {} }
For the convenience of users, OpenDAL implements DecompressDecoder
based on DecompressReader
impl < R: BytesRead > futures::io::AsyncRead for DecompressReader < R > {}
Docking async-compression
async-compression is an asynchronous compression library developed by @Nemo157 that supports most of the commonly used compression algorithms. OpenDAL is implemented based on async-compression:
It uses the Decode
trait internally:
pub trait Decode { /// Reinitializes this decoder ready to decode a new member/frame of data. fn reinit ( & mut self) -> Result < () > ; /// Returns whether the end of the stream has been read fn decode ( & mut self, input: & mut PartialBuffer < impl AsRef < [ u8 ] >> , output: & mut PartialBuffer < impl AsRef < [ u8 ] > + AsMut < [ u8 ] >> , ) -> Result < bool > ; /// Returns whether the internal buffers are flushed fn flush ( & mut self, output: & mut PartialBuffer < impl AsRef < [ u8 ] > + AsMut < [ u8 ] >> ) -> Result < bool > ; /// Returns whether the internal buffers are flushed fn finish ( & mut self, output: & mut PartialBuffer < impl AsRef < [ u8 ] > + AsMut < [ u8 ] >> , ) -> Result < bool > ; }
async-compression In order to avoid exposing the details of the internal implementation to the user, the codec
-related modules are all private, and the user can only call them through bufread
and other methods.
But this method does not work in Databend: In order to make full use of machine resources, Databend strictly divides asynchronous and synchronous tasks:
- The task of CPU Bound must be performed in a synchronous Runtime
- The task of IO Bound must be performed in an asynchronous Runtime
If decompression is performed in asynchronous runtime, it may block runtime and reduce overall throughput performance.
If we want to control which runtime the decompression takes place, we must be able to directly manipulate the underlying Decode
. For this, I submitted a proposal: Export codec and Encode/Decode trait . The author is just doing related work recently, and wants to expose some of the underlying Codec. So I introduced Databend’s Use Case in detail in my proposal, and shared my temporary Workaround. In fact, the code of async-compression is well organized, just make the internal Decode
trait and its implementation public.
Since packages published to a crate must use the tagged version, I published this wordaround as a crate: async-compression-issue-150-workaround .
Here is a little trick of Cargo.toml
: specify an alias for the package, so that it can use another package specified by itself without modifying the crate name.
# Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved. async - compression = { package = "async-compression-issue-150-workaround" , version = "0.3.15-issue-150" , features = [ "futures-io" , "all-algorithms" , ], optional = true }
Docking with Databend
After completing the development of OpenDAL test, Databend is mainly for docking and testing. During the docking, it was unexpectedly discovered that Databend currently handles Streaming Loading and COPY FROM STAGE using two completely different paths, so the decompression process must not be implemented twice, and we hope to unify them in the future.
- feat: Add decompress support for COPY INTO and streaming loading
- docs: Add compression related docs
- feat(query): Add support for compression auto and raw_deflate refines the details of AUTO and Raw Deflate algorithms
- fix(query): Fix compressed buf not consumed correctly
So far, Databend has been able to support GZIP
, BZ2
, BROTLI
, ZSTD
, DEFLATE
, RAW_DEFLATE
and other compression algorithms. It can be further expanded in the future as needed. Welcome to try the decompression function!
next steps
- Refactor the logic for streaming and loading from STAGE to reuse the same logic as much as possible
- Decompression performance test and its optimization
- Support LZO, SNAPPY and other compression algorithms
- Support ZIP, TAR and other archive formats
This article is reproduced from: https://xuanwo.io/reports/2022-22/
This site is for inclusion only, and the copyright belongs to the original author.