reth_nippy_jar/
lib.rs

1//! Immutable data store format.
2//!
3//! *Warning*: The `NippyJar` encoding format and its implementations are
4//! designed for storing and retrieving data internally. They are not hardened
5//! to safely read potentially malicious data.
6
7#![doc(
8    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10    issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18    error::Error as StdError,
19    fs::File,
20    ops::Range,
21    path::{Path, PathBuf},
22};
23use tracing::*;
24
25/// Compression algorithms supported by `NippyJar`.
26pub mod compression;
27#[cfg(test)]
28use compression::Compression;
29use compression::Compressors;
30
31/// empty enum for backwards compatibility
32#[derive(Debug, Serialize, Deserialize)]
33#[cfg_attr(test, derive(PartialEq, Eq))]
34pub enum Functions {}
35
36/// empty enum for backwards compatibility
37#[derive(Debug, Serialize, Deserialize)]
38#[cfg_attr(test, derive(PartialEq, Eq))]
39pub enum InclusionFilters {}
40
41mod error;
42pub use error::NippyJarError;
43
44mod cursor;
45pub use cursor::NippyJarCursor;
46
47mod writer;
48pub use writer::NippyJarWriter;
49
50mod consistency;
51pub use consistency::NippyJarChecker;
52
53/// The version number of the Nippy Jar format.
54const NIPPY_JAR_VERSION: usize = 1;
55/// The file extension used for index files.
56const INDEX_FILE_EXTENSION: &str = "idx";
57/// The file extension used for offsets files.
58const OFFSETS_FILE_EXTENSION: &str = "off";
59/// The file extension used for configuration files.
60pub const CONFIG_FILE_EXTENSION: &str = "conf";
61
62/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
63/// memory-mapped file.
64type RefRow<'a> = Vec<&'a [u8]>;
65
66/// Alias type for a column value wrapped in `Result`.
67pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
68
69/// A trait for the user-defined header of [`NippyJar`].
70pub trait NippyJarHeader:
71    Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
72{
73}
74
75// Blanket implementation for all types that implement the required traits.
76impl<T> NippyJarHeader for T where
77    T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
78{
79}
80
81/// `NippyJar` is a specialized storage format designed for immutable data.
82///
83/// Data is organized into a columnar format, enabling column-based compression. Data retrieval
84/// entails consulting an offset list and fetching the data from file via `mmap`.
85#[derive(Serialize, Deserialize)]
86#[cfg_attr(test, derive(PartialEq))]
87pub struct NippyJar<H = ()> {
88    /// The version of the `NippyJar` format.
89    version: usize,
90    /// User-defined header data.
91    /// Default: zero-sized unit type: no header data
92    user_header: H,
93    /// Number of data columns in the jar.
94    columns: usize,
95    /// Number of data rows in the jar.
96    rows: usize,
97    /// Optional compression algorithm applied to the data.
98    compressor: Option<Compressors>,
99    #[serde(skip)]
100    /// Optional field for backwards compatibility
101    filter: Option<InclusionFilters>,
102    #[serde(skip)]
103    /// Optional field for backwards compatibility
104    phf: Option<Functions>,
105    /// Maximum uncompressed row size of the set. This will enable decompression without any
106    /// resizing of the output buffer.
107    max_row_size: usize,
108    /// Data path for file. Supporting files will have a format `{path}.{extension}`.
109    #[serde(skip)]
110    path: PathBuf,
111}
112
113impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("NippyJar")
116            .field("version", &self.version)
117            .field("user_header", &self.user_header)
118            .field("rows", &self.rows)
119            .field("columns", &self.columns)
120            .field("compressor", &self.compressor)
121            .field("filter", &self.filter)
122            .field("phf", &self.phf)
123            .field("path", &self.path)
124            .field("max_row_size", &self.max_row_size)
125            .finish_non_exhaustive()
126    }
127}
128
129impl NippyJar<()> {
130    /// Creates a new [`NippyJar`] without an user-defined header data.
131    pub fn new_without_header(columns: usize, path: &Path) -> Self {
132        Self::new(columns, path, ())
133    }
134
135    /// Loads the file configuration and returns [`Self`] on a jar without user-defined header data.
136    pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
137        Self::load(path)
138    }
139}
140
141impl<H: NippyJarHeader> NippyJar<H> {
142    /// Creates a new [`NippyJar`] with a user-defined header data.
143    pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
144        Self {
145            version: NIPPY_JAR_VERSION,
146            user_header,
147            columns,
148            rows: 0,
149            max_row_size: 0,
150            compressor: None,
151            filter: None,
152            phf: None,
153            path: path.to_path_buf(),
154        }
155    }
156
157    /// Adds [`compression::Zstd`] compression.
158    pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
159        self.compressor =
160            Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
161        self
162    }
163
164    /// Adds [`compression::Lz4`] compression.
165    pub fn with_lz4(mut self) -> Self {
166        self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
167        self
168    }
169
170    /// Gets a reference to the user header.
171    pub const fn user_header(&self) -> &H {
172        &self.user_header
173    }
174
175    /// Gets total columns in jar.
176    pub const fn columns(&self) -> usize {
177        self.columns
178    }
179
180    /// Gets total rows in jar.
181    pub const fn rows(&self) -> usize {
182        self.rows
183    }
184
185    /// Gets a reference to the compressor.
186    pub const fn compressor(&self) -> Option<&Compressors> {
187        self.compressor.as_ref()
188    }
189
190    /// Gets a mutable reference to the compressor.
191    pub fn compressor_mut(&mut self) -> Option<&mut Compressors> {
192        self.compressor.as_mut()
193    }
194
195    /// Loads the file configuration and returns [`Self`].
196    ///
197    /// **The user must ensure the header type matches the one used during the jar's creation.**
198    pub fn load(path: &Path) -> Result<Self, NippyJarError> {
199        // Read [`Self`] located at the data file.
200        let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
201        let config_file = File::open(&config_path)
202            .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
203
204        let mut obj: Self = bincode::deserialize_from(&config_file)?;
205        obj.path = path.to_path_buf();
206        Ok(obj)
207    }
208
209    /// Returns the path for the data file
210    pub fn data_path(&self) -> &Path {
211        self.path.as_ref()
212    }
213
214    /// Returns the path for the index file
215    pub fn index_path(&self) -> PathBuf {
216        self.path.with_extension(INDEX_FILE_EXTENSION)
217    }
218
219    /// Returns the path for the offsets file
220    pub fn offsets_path(&self) -> PathBuf {
221        self.path.with_extension(OFFSETS_FILE_EXTENSION)
222    }
223
224    /// Returns the path for the config file
225    pub fn config_path(&self) -> PathBuf {
226        self.path.with_extension(CONFIG_FILE_EXTENSION)
227    }
228
229    /// Deletes from disk this [`NippyJar`] alongside every satellite file.
230    pub fn delete(self) -> Result<(), NippyJarError> {
231        // TODO(joshie): ensure consistency on unexpected shutdown
232
233        for path in
234            [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
235        {
236            if path.exists() {
237                reth_fs_util::remove_file(path)?;
238            }
239        }
240
241        Ok(())
242    }
243
244    /// Returns a [`DataReader`] of the data and offset file
245    pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
246        DataReader::new(self.data_path())
247    }
248
249    /// Writes all necessary configuration to file.
250    fn freeze_config(&self) -> Result<(), NippyJarError> {
251        Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
252            bincode::serialize_into(file, &self)
253        })?)
254    }
255}
256
257#[cfg(test)]
258impl<H: NippyJarHeader> NippyJar<H> {
259    /// If required, prepares any compression algorithm to an early pass of the data.
260    pub fn prepare_compression(
261        &mut self,
262        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
263    ) -> Result<(), NippyJarError> {
264        // Makes any necessary preparations for the compressors
265        if let Some(compression) = &mut self.compressor {
266            debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
267            compression.prepare_compression(columns)?;
268        }
269        Ok(())
270    }
271
272    /// Writes all data and configuration to a file and the offset index to another.
273    pub fn freeze(
274        self,
275        columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
276        total_rows: u64,
277    ) -> Result<Self, NippyJarError> {
278        self.check_before_freeze(&columns)?;
279
280        debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
281
282        // Creates the writer, data and offsets file
283        let mut writer = NippyJarWriter::new(self)?;
284
285        // Append rows to file while holding offsets in memory
286        writer.append_rows(columns, total_rows)?;
287
288        // Flushes configuration and offsets to disk
289        writer.commit()?;
290
291        debug!(target: "nippy-jar", ?writer, "Finished writing data.");
292
293        Ok(writer.into_jar())
294    }
295
296    /// Safety checks before creating and returning a [`File`] handle to write data to.
297    fn check_before_freeze(
298        &self,
299        columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
300    ) -> Result<(), NippyJarError> {
301        if columns.len() != self.columns {
302            return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
303        }
304
305        if let Some(compression) = &self.compressor {
306            if !compression.is_ready() {
307                return Err(NippyJarError::CompressorNotReady)
308            }
309        }
310
311        Ok(())
312    }
313}
314
315/// Manages the reading of static file data using memory-mapped files.
316///
317/// Holds file and mmap descriptors of the data and offsets files of a `static_file`.
318#[derive(Debug)]
319pub struct DataReader {
320    /// Data file descriptor. Needs to be kept alive as long as `data_mmap` handle.
321    #[allow(dead_code)]
322    data_file: File,
323    /// Mmap handle for data.
324    data_mmap: Mmap,
325    /// Offset file descriptor. Needs to be kept alive as long as `offset_mmap` handle.
326    #[allow(dead_code)]
327    offset_file: File,
328    /// Mmap handle for offsets.
329    offset_mmap: Mmap,
330    /// Number of bytes that represent one offset.
331    offset_size: u8,
332}
333
334impl DataReader {
335    /// Reads the respective data and offsets file and returns [`DataReader`].
336    pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
337        let data_file = File::open(path.as_ref())?;
338        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
339        let data_mmap = unsafe { Mmap::map(&data_file)? };
340
341        let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
342        // SAFETY: File is read-only and its descriptor is kept alive as long as the mmap handle.
343        let offset_mmap = unsafe { Mmap::map(&offset_file)? };
344
345        // First byte is the size of one offset in bytes
346        let offset_size = offset_mmap[0];
347
348        // Ensure that the size of an offset is at most 8 bytes.
349        if offset_size > 8 {
350            return Err(NippyJarError::OffsetSizeTooBig { offset_size })
351        } else if offset_size == 0 {
352            return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
353        }
354
355        Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
356    }
357
358    /// Returns the offset for the requested data index
359    pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
360        // + 1 represents the offset_len u8 which is in the beginning of the file
361        let from = index * self.offset_size as usize + 1;
362
363        self.offset_at(from)
364    }
365
366    /// Returns the offset for the requested data index starting from the end
367    pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
368        let offsets_file_size = self.offset_file.metadata()?.len() as usize;
369
370        if offsets_file_size > 1 {
371            let from = offsets_file_size - self.offset_size as usize * (index + 1);
372
373            self.offset_at(from)
374        } else {
375            Ok(0)
376        }
377    }
378
379    /// Returns total number of offsets in the file.
380    /// The size of one offset is determined by the file itself.
381    pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
382        Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
383            as usize)
384    }
385
386    /// Reads one offset-sized (determined by the offset file) u64 at the provided index.
387    fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
388        let mut buffer: [u8; 8] = [0; 8];
389
390        let offset_end = index.saturating_add(self.offset_size as usize);
391        if offset_end > self.offset_mmap.len() {
392            return Err(NippyJarError::OffsetOutOfBounds { index })
393        }
394
395        buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
396        Ok(u64::from_le_bytes(buffer))
397    }
398
399    /// Returns number of bytes that represent one offset.
400    pub const fn offset_size(&self) -> u8 {
401        self.offset_size
402    }
403
404    /// Returns the underlying data as a slice of bytes for the provided range.
405    pub fn data(&self, range: Range<usize>) -> &[u8] {
406        &self.data_mmap[range]
407    }
408
409    /// Returns total size of data
410    pub fn size(&self) -> usize {
411        self.data_mmap.len()
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use compression::Compression;
419    use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
420    use std::{fs::OpenOptions, io::Read};
421
422    type ColumnResults<T> = Vec<ColumnResult<T>>;
423    type ColumnValues = Vec<Vec<u8>>;
424
425    fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
426        let value_length = 32;
427        let num_rows = 100;
428
429        let mut vec: Vec<u8> = vec![0; value_length];
430        let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_entropy);
431
432        let mut gen = || {
433            (0..num_rows)
434                .map(|_| {
435                    rng.fill_bytes(&mut vec[..]);
436                    vec.clone()
437                })
438                .collect()
439        };
440
441        (gen(), gen())
442    }
443
444    fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
445        col.iter().map(|v| Ok(v.clone())).collect()
446    }
447
448    #[test]
449    fn test_config_serialization() {
450        let file = tempfile::NamedTempFile::new().unwrap();
451        let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
452        jar.freeze_config().unwrap();
453
454        let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
455        let config_file_len = config_file.metadata().unwrap().len();
456        assert_eq!(config_file_len, 37);
457
458        let mut buf = Vec::with_capacity(config_file_len as usize);
459        config_file.read_to_end(&mut buf).unwrap();
460
461        assert_eq!(
462            vec![
463                1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
464                0, 0, 0, 0, 0, 0, 0, 0, 0, 0
465            ],
466            buf
467        );
468
469        let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
470        // Path is not ser/de
471        read_jar.path = file.path().to_path_buf();
472        assert_eq!(jar, read_jar);
473    }
474
475    #[test]
476    fn test_zstd_with_dictionaries() {
477        let (col1, col2) = test_data(None);
478        let num_rows = col1.len() as u64;
479        let num_columns = 2;
480        let file_path = tempfile::NamedTempFile::new().unwrap();
481
482        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
483        assert!(nippy.compressor().is_none());
484
485        let mut nippy =
486            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
487        assert!(nippy.compressor().is_some());
488
489        if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
490            assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
491
492            // Make sure the number of column iterators match the initial set up ones.
493            assert!(matches!(
494                zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
495                Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
496            ));
497        }
498
499        // If ZSTD is enabled, do not write to the file unless the column dictionaries have been
500        // calculated.
501        assert!(matches!(
502            nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
503            Err(NippyJarError::CompressorNotReady)
504        ));
505
506        let mut nippy =
507            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
508        assert!(nippy.compressor().is_some());
509
510        nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
511
512        if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
513            assert!(matches!(
514                (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
515                (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
516            ));
517        }
518
519        let nippy = nippy
520            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
521            .unwrap();
522
523        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
524        assert_eq!(nippy.version, loaded_nippy.version);
525        assert_eq!(nippy.columns, loaded_nippy.columns);
526        assert_eq!(nippy.filter, loaded_nippy.filter);
527        assert_eq!(nippy.phf, loaded_nippy.phf);
528        assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
529        assert_eq!(nippy.path, loaded_nippy.path);
530
531        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
532            assert!(zstd.use_dict);
533            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
534
535            // Iterate over compressed values and compare
536            let mut row_index = 0usize;
537            while let Some(row) = cursor.next_row().unwrap() {
538                assert_eq!(
539                    (row[0], row[1]),
540                    (col1[row_index].as_slice(), col2[row_index].as_slice())
541                );
542                row_index += 1;
543            }
544        } else {
545            panic!("Expected Zstd compressor")
546        }
547    }
548
549    #[test]
550    fn test_lz4() {
551        let (col1, col2) = test_data(None);
552        let num_rows = col1.len() as u64;
553        let num_columns = 2;
554        let file_path = tempfile::NamedTempFile::new().unwrap();
555
556        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
557        assert!(nippy.compressor().is_none());
558
559        let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
560        assert!(nippy.compressor().is_some());
561
562        let nippy = nippy
563            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
564            .unwrap();
565
566        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
567        assert_eq!(nippy, loaded_nippy);
568
569        if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
570            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
571
572            // Iterate over compressed values and compare
573            let mut row_index = 0usize;
574            while let Some(row) = cursor.next_row().unwrap() {
575                assert_eq!(
576                    (row[0], row[1]),
577                    (col1[row_index].as_slice(), col2[row_index].as_slice())
578                );
579                row_index += 1;
580            }
581        } else {
582            panic!("Expected Lz4 compressor")
583        }
584    }
585
586    #[test]
587    fn test_zstd_no_dictionaries() {
588        let (col1, col2) = test_data(None);
589        let num_rows = col1.len() as u64;
590        let num_columns = 2;
591        let file_path = tempfile::NamedTempFile::new().unwrap();
592
593        let nippy = NippyJar::new_without_header(num_columns, file_path.path());
594        assert!(nippy.compressor().is_none());
595
596        let nippy =
597            NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
598        assert!(nippy.compressor().is_some());
599
600        let nippy = nippy
601            .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
602            .unwrap();
603
604        let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
605        assert_eq!(nippy, loaded_nippy);
606
607        if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
608            assert!(!zstd.use_dict);
609
610            let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
611
612            // Iterate over compressed values and compare
613            let mut row_index = 0usize;
614            while let Some(row) = cursor.next_row().unwrap() {
615                assert_eq!(
616                    (row[0], row[1]),
617                    (col1[row_index].as_slice(), col2[row_index].as_slice())
618                );
619                row_index += 1;
620            }
621        } else {
622            panic!("Expected Zstd compressor")
623        }
624    }
625
626    /// Tests `NippyJar` with everything enabled.
627    #[test]
628    fn test_full_nippy_jar() {
629        let (col1, col2) = test_data(None);
630        let num_rows = col1.len() as u64;
631        let num_columns = 2;
632        let file_path = tempfile::NamedTempFile::new().unwrap();
633        let data = vec![col1.clone(), col2.clone()];
634
635        let block_start = 500;
636
637        #[derive(Serialize, Deserialize, Debug)]
638        struct BlockJarHeader {
639            block_start: usize,
640        }
641
642        // Create file
643        {
644            let mut nippy =
645                NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
646                    .with_zstd(true, 5000);
647
648            nippy.prepare_compression(data.clone()).unwrap();
649            nippy
650                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
651                .unwrap();
652        }
653
654        // Read file
655        {
656            let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
657
658            assert!(loaded_nippy.compressor().is_some());
659            assert_eq!(loaded_nippy.user_header().block_start, block_start);
660
661            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
662                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
663
664                // Iterate over compressed values and compare
665                let mut row_num = 0usize;
666                while let Some(row) = cursor.next_row().unwrap() {
667                    assert_eq!(
668                        (row[0], row[1]),
669                        (data[0][row_num].as_slice(), data[1][row_num].as_slice())
670                    );
671                    row_num += 1;
672                }
673
674                // Shuffled for chaos.
675                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
676                data.shuffle(&mut rand::thread_rng());
677
678                for (row_num, (v0, v1)) in data {
679                    // Simulates `by_number` queries
680                    let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
681                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
682                }
683            }
684        }
685    }
686
687    #[test]
688    fn test_selectable_column_values() {
689        let (col1, col2) = test_data(None);
690        let num_rows = col1.len() as u64;
691        let num_columns = 2;
692        let file_path = tempfile::NamedTempFile::new().unwrap();
693        let data = vec![col1.clone(), col2.clone()];
694
695        // Create file
696        {
697            let mut nippy =
698                NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
699            nippy.prepare_compression(data).unwrap();
700            nippy
701                .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
702                .unwrap();
703        }
704
705        // Read file
706        {
707            let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
708
709            if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
710                let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
711
712                // Shuffled for chaos.
713                let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
714                data.shuffle(&mut rand::thread_rng());
715
716                // Imagine `Blocks` static file has two columns: `Block | StoredWithdrawals`
717                const BLOCKS_FULL_MASK: usize = 0b11;
718
719                // Read both columns
720                for (row_num, (v0, v1)) in &data {
721                    // Simulates `by_number` queries
722                    let row_by_num = cursor
723                        .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
724                        .unwrap()
725                        .unwrap();
726                    assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
727                }
728
729                // Read first column only: `Block`
730                const BLOCKS_BLOCK_MASK: usize = 0b01;
731                for (row_num, (v0, _)) in &data {
732                    // Simulates `by_number` queries
733                    let row_by_num = cursor
734                        .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
735                        .unwrap()
736                        .unwrap();
737                    assert_eq!(row_by_num.len(), 1);
738                    assert_eq!(&row_by_num[0].to_vec(), *v0);
739                }
740
741                // Read second column only: `Block`
742                const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
743                for (row_num, (_, v1)) in &data {
744                    // Simulates `by_number` queries
745                    let row_by_num = cursor
746                        .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
747                        .unwrap()
748                        .unwrap();
749                    assert_eq!(row_by_num.len(), 1);
750                    assert_eq!(&row_by_num[0].to_vec(), *v1);
751                }
752
753                // Read nothing
754                const BLOCKS_EMPTY_MASK: usize = 0b00;
755                for (row_num, _) in &data {
756                    // Simulates `by_number` queries
757                    assert!(cursor
758                        .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
759                        .unwrap()
760                        .unwrap()
761                        .is_empty());
762                }
763            }
764        }
765    }
766
767    #[test]
768    fn test_writer() {
769        let (col1, col2) = test_data(None);
770        let num_columns = 2;
771        let file_path = tempfile::NamedTempFile::new().unwrap();
772
773        append_two_rows(num_columns, file_path.path(), &col1, &col2);
774
775        // Appends a third row and prunes two rows, to make sure we prune from memory and disk
776        // offset list
777        prune_rows(num_columns, file_path.path(), &col1, &col2);
778
779        // Should be able to append new rows
780        append_two_rows(num_columns, file_path.path(), &col1, &col2);
781
782        // Simulate an unexpected shutdown before there's a chance to commit, and see that it
783        // unwinds successfully
784        test_append_consistency_no_commit(file_path.path(), &col1, &col2);
785
786        // Simulate an unexpected shutdown during commit, and see that it unwinds successfully
787        test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
788    }
789
790    #[test]
791    fn test_pruner() {
792        let (col1, col2) = test_data(None);
793        let num_columns = 2;
794        let num_rows = 2;
795
796        // (missing_offsets, expected number of rows)
797        // If a row wasn't fully pruned, then it should clear it up as well
798        let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
799
800        for (missing_offsets, expected_rows) in missing_offsets_scenarios {
801            let file_path = tempfile::NamedTempFile::new().unwrap();
802
803            append_two_rows(num_columns, file_path.path(), &col1, &col2);
804
805            simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
806
807            let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
808            assert_eq!(nippy.rows, expected_rows);
809        }
810    }
811
812    fn test_append_consistency_partial_commit(
813        file_path: &Path,
814        col1: &[Vec<u8>],
815        col2: &[Vec<u8>],
816    ) {
817        let nippy = NippyJar::load_without_header(file_path).unwrap();
818
819        // Set the baseline that should be unwinded to
820        let initial_rows = nippy.rows;
821        let initial_data_size =
822            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
823        let initial_offset_size =
824            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
825        assert!(initial_data_size > 0);
826        assert!(initial_offset_size > 0);
827
828        // Appends a third row
829        let mut writer = NippyJarWriter::new(nippy).unwrap();
830        writer.append_column(Some(Ok(&col1[2]))).unwrap();
831        writer.append_column(Some(Ok(&col2[2]))).unwrap();
832
833        // Makes sure it doesn't write the last one offset (which is the expected file data size)
834        let _ = writer.offsets_mut().pop();
835
836        // `commit_offsets` is not a pub function. we call it here to simulate the shutdown before
837        // it can flush nippy.rows (config) to disk.
838        writer.commit_offsets().unwrap();
839
840        // Simulate an unexpected shutdown of the writer, before it can finish commit()
841        drop(writer);
842
843        let nippy = NippyJar::load_without_header(file_path).unwrap();
844        assert_eq!(initial_rows, nippy.rows);
845
846        // Data was written successfully
847        let new_data_size =
848            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
849        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
850
851        // It should be + 16 (two columns were added), but there's a missing one (the one we pop)
852        assert_eq!(
853            initial_offset_size + 8,
854            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
855        );
856
857        // Writer will execute a consistency check and verify first that the offset list on disk
858        // doesn't match the nippy.rows, and prune it. Then, it will prune the data file
859        // accordingly as well.
860        let writer = NippyJarWriter::new(nippy).unwrap();
861        assert_eq!(initial_rows, writer.rows());
862        assert_eq!(
863            initial_offset_size,
864            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
865        );
866        assert_eq!(
867            initial_data_size,
868            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
869        );
870    }
871
872    fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
873        let nippy = NippyJar::load_without_header(file_path).unwrap();
874
875        // Set the baseline that should be unwinded to
876        let initial_rows = nippy.rows;
877        let initial_data_size =
878            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
879        let initial_offset_size =
880            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
881        assert!(initial_data_size > 0);
882        assert!(initial_offset_size > 0);
883
884        // Appends a third row, so we have an offset list in memory, which is not flushed to disk,
885        // while the data has been.
886        let mut writer = NippyJarWriter::new(nippy).unwrap();
887        writer.append_column(Some(Ok(&col1[2]))).unwrap();
888        writer.append_column(Some(Ok(&col2[2]))).unwrap();
889
890        // Simulate an unexpected shutdown of the writer, before it can call commit()
891        drop(writer);
892
893        let nippy = NippyJar::load_without_header(file_path).unwrap();
894        assert_eq!(initial_rows, nippy.rows);
895
896        // Data was written successfully
897        let new_data_size =
898            File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
899        assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
900
901        // Since offsets only get written on commit(), this remains the same
902        assert_eq!(
903            initial_offset_size,
904            File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
905        );
906
907        // Writer will execute a consistency check and verify that the data file has more data than
908        // it should, and resets it to the last offset of the list (on disk here)
909        let writer = NippyJarWriter::new(nippy).unwrap();
910        assert_eq!(initial_rows, writer.rows());
911        assert_eq!(
912            initial_data_size,
913            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
914        );
915    }
916
917    fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
918        // Create and add 1 row
919        {
920            let nippy = NippyJar::new_without_header(num_columns, file_path);
921            nippy.freeze_config().unwrap();
922            assert_eq!(nippy.max_row_size, 0);
923            assert_eq!(nippy.rows, 0);
924
925            let mut writer = NippyJarWriter::new(nippy).unwrap();
926            assert_eq!(writer.column(), 0);
927
928            writer.append_column(Some(Ok(&col1[0]))).unwrap();
929            assert_eq!(writer.column(), 1);
930            assert!(writer.is_dirty());
931
932            writer.append_column(Some(Ok(&col2[0]))).unwrap();
933            assert!(writer.is_dirty());
934
935            // Adding last column of a row resets writer and updates jar config
936            assert_eq!(writer.column(), 0);
937
938            // One offset per column + 1 offset at the end representing the expected file data size
939            assert_eq!(writer.offsets().len(), 3);
940            let expected_data_file_size = *writer.offsets().last().unwrap();
941            writer.commit().unwrap();
942            assert!(!writer.is_dirty());
943
944            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
945            assert_eq!(writer.rows(), 1);
946            assert_eq!(
947                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
948                1 + num_columns as u64 * 8 + 8
949            );
950            assert_eq!(
951                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
952                expected_data_file_size
953            );
954        }
955
956        // Load and add 1 row
957        {
958            let nippy = NippyJar::load_without_header(file_path).unwrap();
959            // Check if it was committed successfully
960            assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
961            assert_eq!(nippy.rows, 1);
962
963            let mut writer = NippyJarWriter::new(nippy).unwrap();
964            assert_eq!(writer.column(), 0);
965
966            writer.append_column(Some(Ok(&col1[1]))).unwrap();
967            assert_eq!(writer.column(), 1);
968
969            writer.append_column(Some(Ok(&col2[1]))).unwrap();
970
971            // Adding last column of a row resets writer and updates jar config
972            assert_eq!(writer.column(), 0);
973
974            // One offset per column + 1 offset at the end representing the expected file data size
975            assert_eq!(writer.offsets().len(), 3);
976            let expected_data_file_size = *writer.offsets().last().unwrap();
977            writer.commit().unwrap();
978
979            assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
980            assert_eq!(writer.rows(), 2);
981            assert_eq!(
982                File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
983                1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
984            );
985            assert_eq!(
986                File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
987                expected_data_file_size
988            );
989        }
990    }
991
992    fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
993        let nippy = NippyJar::load_without_header(file_path).unwrap();
994        let mut writer = NippyJarWriter::new(nippy).unwrap();
995
996        // Appends a third row, so we have an offset list in memory, which is not flushed to disk
997        writer.append_column(Some(Ok(&col1[2]))).unwrap();
998        writer.append_column(Some(Ok(&col2[2]))).unwrap();
999        assert!(writer.is_dirty());
1000
1001        // This should prune from the on-memory offset list and ondisk offset list
1002        writer.prune_rows(2).unwrap();
1003        assert_eq!(writer.rows(), 1);
1004
1005        assert_eq!(
1006            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1007            1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1008        );
1009
1010        let expected_data_size = col1[0].len() + col2[0].len();
1011        assert_eq!(
1012            File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1013            expected_data_size
1014        );
1015
1016        let nippy = NippyJar::load_without_header(file_path).unwrap();
1017        {
1018            let data_reader = nippy.open_data_reader().unwrap();
1019            // there are only two valid offsets. so index 2 actually represents the expected file
1020            // data size.
1021            assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1022        }
1023
1024        // This should prune from the ondisk offset list and clear the jar.
1025        let mut writer = NippyJarWriter::new(nippy).unwrap();
1026        writer.prune_rows(1).unwrap();
1027        assert!(writer.is_dirty());
1028
1029        assert_eq!(writer.rows(), 0);
1030        assert_eq!(writer.max_row_size(), 0);
1031        assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1032        // Only the byte that indicates how many bytes per offset should be left
1033        assert_eq!(
1034            File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1035            1
1036        );
1037        writer.commit().unwrap();
1038        assert!(!writer.is_dirty());
1039    }
1040
1041    fn simulate_interrupted_prune(
1042        num_columns: usize,
1043        file_path: &Path,
1044        num_rows: u64,
1045        missing_offsets: u64,
1046    ) {
1047        let nippy = NippyJar::load_without_header(file_path).unwrap();
1048        let reader = nippy.open_data_reader().unwrap();
1049        let offsets_file =
1050            OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1051        let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1052        assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1053
1054        let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1055        let data_len = reader.reverse_offset(0).unwrap();
1056        assert_eq!(data_len, data_file.metadata().unwrap().len());
1057
1058        // each data column is 32 bytes long
1059        // by deleting from the data file, the `consistency_check` will go through both branches:
1060        //      when the offset list wasn't updated after clearing the data (data_len > last
1061        // offset).      fixing above, will lead to offset count not match the rows (*
1062        // columns) of the configuration file
1063        data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1064
1065        // runs the consistency check.
1066        let _ = NippyJarWriter::new(nippy).unwrap();
1067    }
1068}