1#![doc(
8 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
9 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
10 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
11)]
12#![cfg_attr(not(test), warn(unused_crate_dependencies))]
13#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
14
15use memmap2::Mmap;
16use serde::{Deserialize, Serialize};
17use std::{
18 error::Error as StdError,
19 fs::File,
20 ops::Range,
21 path::{Path, PathBuf},
22};
23use tracing::*;
24
25pub mod compression;
27#[cfg(test)]
28use compression::Compression;
29use compression::Compressors;
30
31#[derive(Debug, Serialize, Deserialize)]
33#[cfg_attr(test, derive(PartialEq, Eq))]
34pub enum Functions {}
35
36#[derive(Debug, Serialize, Deserialize)]
38#[cfg_attr(test, derive(PartialEq, Eq))]
39pub enum InclusionFilters {}
40
41mod error;
42pub use error::NippyJarError;
43
44mod cursor;
45pub use cursor::NippyJarCursor;
46
47mod writer;
48pub use writer::NippyJarWriter;
49
50mod consistency;
51pub use consistency::NippyJarChecker;
52
53const NIPPY_JAR_VERSION: usize = 1;
55const INDEX_FILE_EXTENSION: &str = "idx";
57const OFFSETS_FILE_EXTENSION: &str = "off";
59pub const CONFIG_FILE_EXTENSION: &str = "conf";
61
62type RefRow<'a> = Vec<&'a [u8]>;
65
66pub type ColumnResult<T> = Result<T, Box<dyn StdError + Send + Sync>>;
68
69pub trait NippyJarHeader:
71 Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
72{
73}
74
75impl<T> NippyJarHeader for T where
77 T: Send + Sync + Serialize + for<'b> Deserialize<'b> + std::fmt::Debug + 'static
78{
79}
80
81#[derive(Serialize, Deserialize)]
86#[cfg_attr(test, derive(PartialEq))]
87pub struct NippyJar<H = ()> {
88 version: usize,
90 user_header: H,
93 columns: usize,
95 rows: usize,
97 compressor: Option<Compressors>,
99 #[serde(skip)]
100 filter: Option<InclusionFilters>,
102 #[serde(skip)]
103 phf: Option<Functions>,
105 max_row_size: usize,
108 #[serde(skip)]
110 path: PathBuf,
111}
112
113impl<H: NippyJarHeader> std::fmt::Debug for NippyJar<H> {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_struct("NippyJar")
116 .field("version", &self.version)
117 .field("user_header", &self.user_header)
118 .field("rows", &self.rows)
119 .field("columns", &self.columns)
120 .field("compressor", &self.compressor)
121 .field("filter", &self.filter)
122 .field("phf", &self.phf)
123 .field("path", &self.path)
124 .field("max_row_size", &self.max_row_size)
125 .finish_non_exhaustive()
126 }
127}
128
129impl NippyJar<()> {
130 pub fn new_without_header(columns: usize, path: &Path) -> Self {
132 Self::new(columns, path, ())
133 }
134
135 pub fn load_without_header(path: &Path) -> Result<Self, NippyJarError> {
137 Self::load(path)
138 }
139}
140
141impl<H: NippyJarHeader> NippyJar<H> {
142 pub fn new(columns: usize, path: &Path, user_header: H) -> Self {
144 Self {
145 version: NIPPY_JAR_VERSION,
146 user_header,
147 columns,
148 rows: 0,
149 max_row_size: 0,
150 compressor: None,
151 filter: None,
152 phf: None,
153 path: path.to_path_buf(),
154 }
155 }
156
157 pub fn with_zstd(mut self, use_dict: bool, max_dict_size: usize) -> Self {
159 self.compressor =
160 Some(Compressors::Zstd(compression::Zstd::new(use_dict, max_dict_size, self.columns)));
161 self
162 }
163
164 pub fn with_lz4(mut self) -> Self {
166 self.compressor = Some(Compressors::Lz4(compression::Lz4::default()));
167 self
168 }
169
170 pub const fn user_header(&self) -> &H {
172 &self.user_header
173 }
174
175 pub const fn columns(&self) -> usize {
177 self.columns
178 }
179
180 pub const fn rows(&self) -> usize {
182 self.rows
183 }
184
185 pub const fn compressor(&self) -> Option<&Compressors> {
187 self.compressor.as_ref()
188 }
189
190 pub fn compressor_mut(&mut self) -> Option<&mut Compressors> {
192 self.compressor.as_mut()
193 }
194
195 pub fn load(path: &Path) -> Result<Self, NippyJarError> {
199 let config_path = path.with_extension(CONFIG_FILE_EXTENSION);
201 let config_file = File::open(&config_path)
202 .map_err(|err| reth_fs_util::FsPathError::open(err, config_path))?;
203
204 let mut obj: Self = bincode::deserialize_from(&config_file)?;
205 obj.path = path.to_path_buf();
206 Ok(obj)
207 }
208
209 pub fn data_path(&self) -> &Path {
211 self.path.as_ref()
212 }
213
214 pub fn index_path(&self) -> PathBuf {
216 self.path.with_extension(INDEX_FILE_EXTENSION)
217 }
218
219 pub fn offsets_path(&self) -> PathBuf {
221 self.path.with_extension(OFFSETS_FILE_EXTENSION)
222 }
223
224 pub fn config_path(&self) -> PathBuf {
226 self.path.with_extension(CONFIG_FILE_EXTENSION)
227 }
228
229 pub fn delete(self) -> Result<(), NippyJarError> {
231 for path in
234 [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
235 {
236 if path.exists() {
237 reth_fs_util::remove_file(path)?;
238 }
239 }
240
241 Ok(())
242 }
243
244 pub fn open_data_reader(&self) -> Result<DataReader, NippyJarError> {
246 DataReader::new(self.data_path())
247 }
248
249 fn freeze_config(&self) -> Result<(), NippyJarError> {
251 Ok(reth_fs_util::atomic_write_file(&self.config_path(), |file| {
252 bincode::serialize_into(file, &self)
253 })?)
254 }
255}
256
257#[cfg(test)]
258impl<H: NippyJarHeader> NippyJar<H> {
259 pub fn prepare_compression(
261 &mut self,
262 columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
263 ) -> Result<(), NippyJarError> {
264 if let Some(compression) = &mut self.compressor {
266 debug!(target: "nippy-jar", columns=columns.len(), "Preparing compression.");
267 compression.prepare_compression(columns)?;
268 }
269 Ok(())
270 }
271
272 pub fn freeze(
274 self,
275 columns: Vec<impl IntoIterator<Item = ColumnResult<Vec<u8>>>>,
276 total_rows: u64,
277 ) -> Result<Self, NippyJarError> {
278 self.check_before_freeze(&columns)?;
279
280 debug!(target: "nippy-jar", path=?self.data_path(), "Opening data file.");
281
282 let mut writer = NippyJarWriter::new(self)?;
284
285 writer.append_rows(columns, total_rows)?;
287
288 writer.commit()?;
290
291 debug!(target: "nippy-jar", ?writer, "Finished writing data.");
292
293 Ok(writer.into_jar())
294 }
295
296 fn check_before_freeze(
298 &self,
299 columns: &[impl IntoIterator<Item = ColumnResult<Vec<u8>>>],
300 ) -> Result<(), NippyJarError> {
301 if columns.len() != self.columns {
302 return Err(NippyJarError::ColumnLenMismatch(self.columns, columns.len()))
303 }
304
305 if let Some(compression) = &self.compressor {
306 if !compression.is_ready() {
307 return Err(NippyJarError::CompressorNotReady)
308 }
309 }
310
311 Ok(())
312 }
313}
314
315#[derive(Debug)]
319pub struct DataReader {
320 #[allow(dead_code)]
322 data_file: File,
323 data_mmap: Mmap,
325 #[allow(dead_code)]
327 offset_file: File,
328 offset_mmap: Mmap,
330 offset_size: u8,
332}
333
334impl DataReader {
335 pub fn new(path: impl AsRef<Path>) -> Result<Self, NippyJarError> {
337 let data_file = File::open(path.as_ref())?;
338 let data_mmap = unsafe { Mmap::map(&data_file)? };
340
341 let offset_file = File::open(path.as_ref().with_extension(OFFSETS_FILE_EXTENSION))?;
342 let offset_mmap = unsafe { Mmap::map(&offset_file)? };
344
345 let offset_size = offset_mmap[0];
347
348 if offset_size > 8 {
350 return Err(NippyJarError::OffsetSizeTooBig { offset_size })
351 } else if offset_size == 0 {
352 return Err(NippyJarError::OffsetSizeTooSmall { offset_size })
353 }
354
355 Ok(Self { data_file, data_mmap, offset_file, offset_size, offset_mmap })
356 }
357
358 pub fn offset(&self, index: usize) -> Result<u64, NippyJarError> {
360 let from = index * self.offset_size as usize + 1;
362
363 self.offset_at(from)
364 }
365
366 pub fn reverse_offset(&self, index: usize) -> Result<u64, NippyJarError> {
368 let offsets_file_size = self.offset_file.metadata()?.len() as usize;
369
370 if offsets_file_size > 1 {
371 let from = offsets_file_size - self.offset_size as usize * (index + 1);
372
373 self.offset_at(from)
374 } else {
375 Ok(0)
376 }
377 }
378
379 pub fn offsets_count(&self) -> Result<usize, NippyJarError> {
382 Ok((self.offset_file.metadata()?.len().saturating_sub(1) / self.offset_size as u64)
383 as usize)
384 }
385
386 fn offset_at(&self, index: usize) -> Result<u64, NippyJarError> {
388 let mut buffer: [u8; 8] = [0; 8];
389
390 let offset_end = index.saturating_add(self.offset_size as usize);
391 if offset_end > self.offset_mmap.len() {
392 return Err(NippyJarError::OffsetOutOfBounds { index })
393 }
394
395 buffer[..self.offset_size as usize].copy_from_slice(&self.offset_mmap[index..offset_end]);
396 Ok(u64::from_le_bytes(buffer))
397 }
398
399 pub const fn offset_size(&self) -> u8 {
401 self.offset_size
402 }
403
404 pub fn data(&self, range: Range<usize>) -> &[u8] {
406 &self.data_mmap[range]
407 }
408
409 pub fn size(&self) -> usize {
411 self.data_mmap.len()
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use compression::Compression;
419 use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng};
420 use std::{fs::OpenOptions, io::Read};
421
422 type ColumnResults<T> = Vec<ColumnResult<T>>;
423 type ColumnValues = Vec<Vec<u8>>;
424
425 fn test_data(seed: Option<u64>) -> (ColumnValues, ColumnValues) {
426 let value_length = 32;
427 let num_rows = 100;
428
429 let mut vec: Vec<u8> = vec![0; value_length];
430 let mut rng = seed.map(SmallRng::seed_from_u64).unwrap_or_else(SmallRng::from_entropy);
431
432 let mut gen = || {
433 (0..num_rows)
434 .map(|_| {
435 rng.fill_bytes(&mut vec[..]);
436 vec.clone()
437 })
438 .collect()
439 };
440
441 (gen(), gen())
442 }
443
444 fn clone_with_result(col: &ColumnValues) -> ColumnResults<Vec<u8>> {
445 col.iter().map(|v| Ok(v.clone())).collect()
446 }
447
448 #[test]
449 fn test_config_serialization() {
450 let file = tempfile::NamedTempFile::new().unwrap();
451 let jar = NippyJar::new_without_header(23, file.path()).with_lz4();
452 jar.freeze_config().unwrap();
453
454 let mut config_file = OpenOptions::new().read(true).open(jar.config_path()).unwrap();
455 let config_file_len = config_file.metadata().unwrap().len();
456 assert_eq!(config_file_len, 37);
457
458 let mut buf = Vec::with_capacity(config_file_len as usize);
459 config_file.read_to_end(&mut buf).unwrap();
460
461 assert_eq!(
462 vec![
463 1, 0, 0, 0, 0, 0, 0, 0, 23, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
464 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
465 ],
466 buf
467 );
468
469 let mut read_jar = bincode::deserialize_from::<_, NippyJar>(&buf[..]).unwrap();
470 read_jar.path = file.path().to_path_buf();
472 assert_eq!(jar, read_jar);
473 }
474
475 #[test]
476 fn test_zstd_with_dictionaries() {
477 let (col1, col2) = test_data(None);
478 let num_rows = col1.len() as u64;
479 let num_columns = 2;
480 let file_path = tempfile::NamedTempFile::new().unwrap();
481
482 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
483 assert!(nippy.compressor().is_none());
484
485 let mut nippy =
486 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
487 assert!(nippy.compressor().is_some());
488
489 if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor_mut() {
490 assert!(matches!(zstd.compressors(), Err(NippyJarError::CompressorNotReady)));
491
492 assert!(matches!(
494 zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]),
495 Err(NippyJarError::ColumnLenMismatch(columns, 3)) if columns == num_columns
496 ));
497 }
498
499 assert!(matches!(
502 nippy.freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows),
503 Err(NippyJarError::CompressorNotReady)
504 ));
505
506 let mut nippy =
507 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
508 assert!(nippy.compressor().is_some());
509
510 nippy.prepare_compression(vec![col1.clone(), col2.clone()]).unwrap();
511
512 if let Some(Compressors::Zstd(zstd)) = &nippy.compressor() {
513 assert!(matches!(
514 (&zstd.state, zstd.dictionaries.as_ref().map(|dict| dict.len())),
515 (compression::ZstdState::Ready, Some(columns)) if columns == num_columns
516 ));
517 }
518
519 let nippy = nippy
520 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
521 .unwrap();
522
523 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
524 assert_eq!(nippy.version, loaded_nippy.version);
525 assert_eq!(nippy.columns, loaded_nippy.columns);
526 assert_eq!(nippy.filter, loaded_nippy.filter);
527 assert_eq!(nippy.phf, loaded_nippy.phf);
528 assert_eq!(nippy.max_row_size, loaded_nippy.max_row_size);
529 assert_eq!(nippy.path, loaded_nippy.path);
530
531 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
532 assert!(zstd.use_dict);
533 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
534
535 let mut row_index = 0usize;
537 while let Some(row) = cursor.next_row().unwrap() {
538 assert_eq!(
539 (row[0], row[1]),
540 (col1[row_index].as_slice(), col2[row_index].as_slice())
541 );
542 row_index += 1;
543 }
544 } else {
545 panic!("Expected Zstd compressor")
546 }
547 }
548
549 #[test]
550 fn test_lz4() {
551 let (col1, col2) = test_data(None);
552 let num_rows = col1.len() as u64;
553 let num_columns = 2;
554 let file_path = tempfile::NamedTempFile::new().unwrap();
555
556 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
557 assert!(nippy.compressor().is_none());
558
559 let nippy = NippyJar::new_without_header(num_columns, file_path.path()).with_lz4();
560 assert!(nippy.compressor().is_some());
561
562 let nippy = nippy
563 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
564 .unwrap();
565
566 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
567 assert_eq!(nippy, loaded_nippy);
568
569 if let Some(Compressors::Lz4(_)) = loaded_nippy.compressor() {
570 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
571
572 let mut row_index = 0usize;
574 while let Some(row) = cursor.next_row().unwrap() {
575 assert_eq!(
576 (row[0], row[1]),
577 (col1[row_index].as_slice(), col2[row_index].as_slice())
578 );
579 row_index += 1;
580 }
581 } else {
582 panic!("Expected Lz4 compressor")
583 }
584 }
585
586 #[test]
587 fn test_zstd_no_dictionaries() {
588 let (col1, col2) = test_data(None);
589 let num_rows = col1.len() as u64;
590 let num_columns = 2;
591 let file_path = tempfile::NamedTempFile::new().unwrap();
592
593 let nippy = NippyJar::new_without_header(num_columns, file_path.path());
594 assert!(nippy.compressor().is_none());
595
596 let nippy =
597 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(false, 5000);
598 assert!(nippy.compressor().is_some());
599
600 let nippy = nippy
601 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
602 .unwrap();
603
604 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
605 assert_eq!(nippy, loaded_nippy);
606
607 if let Some(Compressors::Zstd(zstd)) = loaded_nippy.compressor() {
608 assert!(!zstd.use_dict);
609
610 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
611
612 let mut row_index = 0usize;
614 while let Some(row) = cursor.next_row().unwrap() {
615 assert_eq!(
616 (row[0], row[1]),
617 (col1[row_index].as_slice(), col2[row_index].as_slice())
618 );
619 row_index += 1;
620 }
621 } else {
622 panic!("Expected Zstd compressor")
623 }
624 }
625
626 #[test]
628 fn test_full_nippy_jar() {
629 let (col1, col2) = test_data(None);
630 let num_rows = col1.len() as u64;
631 let num_columns = 2;
632 let file_path = tempfile::NamedTempFile::new().unwrap();
633 let data = vec![col1.clone(), col2.clone()];
634
635 let block_start = 500;
636
637 #[derive(Serialize, Deserialize, Debug)]
638 struct BlockJarHeader {
639 block_start: usize,
640 }
641
642 {
644 let mut nippy =
645 NippyJar::new(num_columns, file_path.path(), BlockJarHeader { block_start })
646 .with_zstd(true, 5000);
647
648 nippy.prepare_compression(data.clone()).unwrap();
649 nippy
650 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
651 .unwrap();
652 }
653
654 {
656 let loaded_nippy = NippyJar::<BlockJarHeader>::load(file_path.path()).unwrap();
657
658 assert!(loaded_nippy.compressor().is_some());
659 assert_eq!(loaded_nippy.user_header().block_start, block_start);
660
661 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
662 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
663
664 let mut row_num = 0usize;
666 while let Some(row) = cursor.next_row().unwrap() {
667 assert_eq!(
668 (row[0], row[1]),
669 (data[0][row_num].as_slice(), data[1][row_num].as_slice())
670 );
671 row_num += 1;
672 }
673
674 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
676 data.shuffle(&mut rand::thread_rng());
677
678 for (row_num, (v0, v1)) in data {
679 let row_by_num = cursor.row_by_number(row_num).unwrap().unwrap();
681 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (v0, v1));
682 }
683 }
684 }
685 }
686
687 #[test]
688 fn test_selectable_column_values() {
689 let (col1, col2) = test_data(None);
690 let num_rows = col1.len() as u64;
691 let num_columns = 2;
692 let file_path = tempfile::NamedTempFile::new().unwrap();
693 let data = vec![col1.clone(), col2.clone()];
694
695 {
697 let mut nippy =
698 NippyJar::new_without_header(num_columns, file_path.path()).with_zstd(true, 5000);
699 nippy.prepare_compression(data).unwrap();
700 nippy
701 .freeze(vec![clone_with_result(&col1), clone_with_result(&col2)], num_rows)
702 .unwrap();
703 }
704
705 {
707 let loaded_nippy = NippyJar::load_without_header(file_path.path()).unwrap();
708
709 if let Some(Compressors::Zstd(_zstd)) = loaded_nippy.compressor() {
710 let mut cursor = NippyJarCursor::new(&loaded_nippy).unwrap();
711
712 let mut data = col1.iter().zip(col2.iter()).enumerate().collect::<Vec<_>>();
714 data.shuffle(&mut rand::thread_rng());
715
716 const BLOCKS_FULL_MASK: usize = 0b11;
718
719 for (row_num, (v0, v1)) in &data {
721 let row_by_num = cursor
723 .row_by_number_with_cols(*row_num, BLOCKS_FULL_MASK)
724 .unwrap()
725 .unwrap();
726 assert_eq!((&row_by_num[0].to_vec(), &row_by_num[1].to_vec()), (*v0, *v1));
727 }
728
729 const BLOCKS_BLOCK_MASK: usize = 0b01;
731 for (row_num, (v0, _)) in &data {
732 let row_by_num = cursor
734 .row_by_number_with_cols(*row_num, BLOCKS_BLOCK_MASK)
735 .unwrap()
736 .unwrap();
737 assert_eq!(row_by_num.len(), 1);
738 assert_eq!(&row_by_num[0].to_vec(), *v0);
739 }
740
741 const BLOCKS_WITHDRAWAL_MASK: usize = 0b10;
743 for (row_num, (_, v1)) in &data {
744 let row_by_num = cursor
746 .row_by_number_with_cols(*row_num, BLOCKS_WITHDRAWAL_MASK)
747 .unwrap()
748 .unwrap();
749 assert_eq!(row_by_num.len(), 1);
750 assert_eq!(&row_by_num[0].to_vec(), *v1);
751 }
752
753 const BLOCKS_EMPTY_MASK: usize = 0b00;
755 for (row_num, _) in &data {
756 assert!(cursor
758 .row_by_number_with_cols(*row_num, BLOCKS_EMPTY_MASK)
759 .unwrap()
760 .unwrap()
761 .is_empty());
762 }
763 }
764 }
765 }
766
767 #[test]
768 fn test_writer() {
769 let (col1, col2) = test_data(None);
770 let num_columns = 2;
771 let file_path = tempfile::NamedTempFile::new().unwrap();
772
773 append_two_rows(num_columns, file_path.path(), &col1, &col2);
774
775 prune_rows(num_columns, file_path.path(), &col1, &col2);
778
779 append_two_rows(num_columns, file_path.path(), &col1, &col2);
781
782 test_append_consistency_no_commit(file_path.path(), &col1, &col2);
785
786 test_append_consistency_partial_commit(file_path.path(), &col1, &col2);
788 }
789
790 #[test]
791 fn test_pruner() {
792 let (col1, col2) = test_data(None);
793 let num_columns = 2;
794 let num_rows = 2;
795
796 let missing_offsets_scenarios = [(1, 1), (2, 1), (3, 0)];
799
800 for (missing_offsets, expected_rows) in missing_offsets_scenarios {
801 let file_path = tempfile::NamedTempFile::new().unwrap();
802
803 append_two_rows(num_columns, file_path.path(), &col1, &col2);
804
805 simulate_interrupted_prune(num_columns, file_path.path(), num_rows, missing_offsets);
806
807 let nippy = NippyJar::load_without_header(file_path.path()).unwrap();
808 assert_eq!(nippy.rows, expected_rows);
809 }
810 }
811
812 fn test_append_consistency_partial_commit(
813 file_path: &Path,
814 col1: &[Vec<u8>],
815 col2: &[Vec<u8>],
816 ) {
817 let nippy = NippyJar::load_without_header(file_path).unwrap();
818
819 let initial_rows = nippy.rows;
821 let initial_data_size =
822 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
823 let initial_offset_size =
824 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
825 assert!(initial_data_size > 0);
826 assert!(initial_offset_size > 0);
827
828 let mut writer = NippyJarWriter::new(nippy).unwrap();
830 writer.append_column(Some(Ok(&col1[2]))).unwrap();
831 writer.append_column(Some(Ok(&col2[2]))).unwrap();
832
833 let _ = writer.offsets_mut().pop();
835
836 writer.commit_offsets().unwrap();
839
840 drop(writer);
842
843 let nippy = NippyJar::load_without_header(file_path).unwrap();
844 assert_eq!(initial_rows, nippy.rows);
845
846 let new_data_size =
848 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
849 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
850
851 assert_eq!(
853 initial_offset_size + 8,
854 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
855 );
856
857 let writer = NippyJarWriter::new(nippy).unwrap();
861 assert_eq!(initial_rows, writer.rows());
862 assert_eq!(
863 initial_offset_size,
864 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize
865 );
866 assert_eq!(
867 initial_data_size,
868 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
869 );
870 }
871
872 fn test_append_consistency_no_commit(file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
873 let nippy = NippyJar::load_without_header(file_path).unwrap();
874
875 let initial_rows = nippy.rows;
877 let initial_data_size =
878 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
879 let initial_offset_size =
880 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize;
881 assert!(initial_data_size > 0);
882 assert!(initial_offset_size > 0);
883
884 let mut writer = NippyJarWriter::new(nippy).unwrap();
887 writer.append_column(Some(Ok(&col1[2]))).unwrap();
888 writer.append_column(Some(Ok(&col2[2]))).unwrap();
889
890 drop(writer);
892
893 let nippy = NippyJar::load_without_header(file_path).unwrap();
894 assert_eq!(initial_rows, nippy.rows);
895
896 let new_data_size =
898 File::open(nippy.data_path()).unwrap().metadata().unwrap().len() as usize;
899 assert_eq!(new_data_size, initial_data_size + col1[2].len() + col2[2].len());
900
901 assert_eq!(
903 initial_offset_size,
904 File::open(nippy.offsets_path()).unwrap().metadata().unwrap().len() as usize
905 );
906
907 let writer = NippyJarWriter::new(nippy).unwrap();
910 assert_eq!(initial_rows, writer.rows());
911 assert_eq!(
912 initial_data_size,
913 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize
914 );
915 }
916
917 fn append_two_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
918 {
920 let nippy = NippyJar::new_without_header(num_columns, file_path);
921 nippy.freeze_config().unwrap();
922 assert_eq!(nippy.max_row_size, 0);
923 assert_eq!(nippy.rows, 0);
924
925 let mut writer = NippyJarWriter::new(nippy).unwrap();
926 assert_eq!(writer.column(), 0);
927
928 writer.append_column(Some(Ok(&col1[0]))).unwrap();
929 assert_eq!(writer.column(), 1);
930 assert!(writer.is_dirty());
931
932 writer.append_column(Some(Ok(&col2[0]))).unwrap();
933 assert!(writer.is_dirty());
934
935 assert_eq!(writer.column(), 0);
937
938 assert_eq!(writer.offsets().len(), 3);
940 let expected_data_file_size = *writer.offsets().last().unwrap();
941 writer.commit().unwrap();
942 assert!(!writer.is_dirty());
943
944 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
945 assert_eq!(writer.rows(), 1);
946 assert_eq!(
947 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
948 1 + num_columns as u64 * 8 + 8
949 );
950 assert_eq!(
951 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
952 expected_data_file_size
953 );
954 }
955
956 {
958 let nippy = NippyJar::load_without_header(file_path).unwrap();
959 assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
961 assert_eq!(nippy.rows, 1);
962
963 let mut writer = NippyJarWriter::new(nippy).unwrap();
964 assert_eq!(writer.column(), 0);
965
966 writer.append_column(Some(Ok(&col1[1]))).unwrap();
967 assert_eq!(writer.column(), 1);
968
969 writer.append_column(Some(Ok(&col2[1]))).unwrap();
970
971 assert_eq!(writer.column(), 0);
973
974 assert_eq!(writer.offsets().len(), 3);
976 let expected_data_file_size = *writer.offsets().last().unwrap();
977 writer.commit().unwrap();
978
979 assert_eq!(writer.max_row_size(), col1[0].len() + col2[0].len());
980 assert_eq!(writer.rows(), 2);
981 assert_eq!(
982 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
983 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
984 );
985 assert_eq!(
986 File::open(writer.data_path()).unwrap().metadata().unwrap().len(),
987 expected_data_file_size
988 );
989 }
990 }
991
992 fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
993 let nippy = NippyJar::load_without_header(file_path).unwrap();
994 let mut writer = NippyJarWriter::new(nippy).unwrap();
995
996 writer.append_column(Some(Ok(&col1[2]))).unwrap();
998 writer.append_column(Some(Ok(&col2[2]))).unwrap();
999 assert!(writer.is_dirty());
1000
1001 writer.prune_rows(2).unwrap();
1003 assert_eq!(writer.rows(), 1);
1004
1005 assert_eq!(
1006 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len(),
1007 1 + writer.rows() as u64 * num_columns as u64 * 8 + 8
1008 );
1009
1010 let expected_data_size = col1[0].len() + col2[0].len();
1011 assert_eq!(
1012 File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize,
1013 expected_data_size
1014 );
1015
1016 let nippy = NippyJar::load_without_header(file_path).unwrap();
1017 {
1018 let data_reader = nippy.open_data_reader().unwrap();
1019 assert_eq!(data_reader.offset(2).unwrap(), expected_data_size as u64);
1022 }
1023
1024 let mut writer = NippyJarWriter::new(nippy).unwrap();
1026 writer.prune_rows(1).unwrap();
1027 assert!(writer.is_dirty());
1028
1029 assert_eq!(writer.rows(), 0);
1030 assert_eq!(writer.max_row_size(), 0);
1031 assert_eq!(File::open(writer.data_path()).unwrap().metadata().unwrap().len() as usize, 0);
1032 assert_eq!(
1034 File::open(writer.offsets_path()).unwrap().metadata().unwrap().len() as usize,
1035 1
1036 );
1037 writer.commit().unwrap();
1038 assert!(!writer.is_dirty());
1039 }
1040
1041 fn simulate_interrupted_prune(
1042 num_columns: usize,
1043 file_path: &Path,
1044 num_rows: u64,
1045 missing_offsets: u64,
1046 ) {
1047 let nippy = NippyJar::load_without_header(file_path).unwrap();
1048 let reader = nippy.open_data_reader().unwrap();
1049 let offsets_file =
1050 OpenOptions::new().read(true).write(true).open(nippy.offsets_path()).unwrap();
1051 let offsets_len = 1 + num_rows * num_columns as u64 * 8 + 8;
1052 assert_eq!(offsets_len, offsets_file.metadata().unwrap().len());
1053
1054 let data_file = OpenOptions::new().read(true).write(true).open(nippy.data_path()).unwrap();
1055 let data_len = reader.reverse_offset(0).unwrap();
1056 assert_eq!(data_len, data_file.metadata().unwrap().len());
1057
1058 data_file.set_len(data_len - 32 * missing_offsets).unwrap();
1064
1065 let _ = NippyJarWriter::new(nippy).unwrap();
1067 }
1068}