1use std::{fmt, io, marker::PhantomData};
2
3use futures::Future;
4use reth_primitives::{Receipt, Receipts};
5use tokio::io::AsyncReadExt;
6use tokio_stream::StreamExt;
7use tokio_util::codec::{Decoder, FramedRead};
8use tracing::{trace, warn};
9
10use crate::{DecodedFileChunk, FileClientError};
11
12#[derive(Debug)]
15pub struct ReceiptFileClient<D> {
16 pub receipts: Receipts,
18 pub first_block: u64,
20 pub total_receipts: usize,
22 _marker: PhantomData<D>,
24}
25
26pub trait FromReceiptReader<D> {
28 type Error: From<io::Error>;
30
31 fn decoder() -> D;
33
34 fn from_receipt_reader<B>(
36 reader: B,
37 num_bytes: u64,
38 prev_chunk_highest_block: Option<u64>,
39 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
40 where
41 Self: Sized,
42 B: AsyncReadExt + Unpin;
43}
44
45impl<D> FromReceiptReader<D> for ReceiptFileClient<D>
46where
47 D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
48 + fmt::Debug
49 + Default,
50{
51 type Error = D::Error;
52
53 fn decoder() -> D {
54 D::default()
55 }
56
57 fn from_receipt_reader<B>(
60 reader: B,
61 num_bytes: u64,
62 prev_chunk_highest_block: Option<u64>,
63 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
64 where
65 B: AsyncReadExt + Unpin,
66 {
67 let mut receipts = Receipts::default();
68
69 let mut stream = FramedRead::with_capacity(reader, Self::decoder(), num_bytes as usize);
71
72 trace!(target: "downloaders::file",
73 target_num_bytes=num_bytes,
74 capacity=stream.read_buffer().capacity(),
75 codec=?Self::decoder(),
76 "init decode stream"
77 );
78
79 let mut remaining_bytes = vec![];
80
81 let mut log_interval = 0;
82 let mut log_interval_start_block = 0;
83
84 let mut block_number = 0;
85 let mut total_receipts = 0;
86 let mut receipts_for_block = vec![];
87 let mut first_block = None;
88
89 async move {
90 while let Some(receipt_res) = stream.next().await {
91 let receipt = match receipt_res {
92 Ok(receipt) => receipt,
93 Err(FileClientError::Rlp(err, bytes)) => {
94 trace!(target: "downloaders::file",
95 %err,
96 bytes_len=bytes.len(),
97 "partial receipt returned from decoding chunk"
98 );
99
100 remaining_bytes = bytes;
101
102 break
103 }
104 Err(err) => return Err(err),
105 };
106
107 match receipt {
108 Some(ReceiptWithBlockNumber { receipt, number }) => {
109 if block_number > number {
110 warn!(target: "downloaders::file", previous_block_number = block_number, "skipping receipt from a lower block: {number}");
111 continue
112 }
113
114 total_receipts += 1;
115
116 if first_block.is_none() {
117 first_block = Some(number);
118 block_number = number;
119 }
120
121 if block_number == number {
122 receipts_for_block.push(Some(receipt));
123 } else {
124 receipts.push(receipts_for_block);
125
126 block_number = number;
128 receipts_for_block = vec![Some(receipt)];
129 }
130 }
131 None => {
132 match first_block {
133 Some(num) => {
134 receipts.push(receipts_for_block);
137 block_number = num + receipts.len() as u64;
139 }
140 None => {
141 if let Some(highest_block) = prev_chunk_highest_block {
143 block_number = highest_block + 1;
145 } else {
146 block_number = 0;
149 }
150 first_block = Some(block_number);
151 }
152 }
153
154 receipts_for_block = vec![];
155 }
156 }
157
158 if log_interval == 0 {
159 trace!(target: "downloaders::file",
160 block_number,
161 total_receipts,
162 "read first receipt"
163 );
164 log_interval_start_block = block_number;
165 } else if log_interval % 100_000 == 0 {
166 trace!(target: "downloaders::file",
167 blocks=?log_interval_start_block..=block_number,
168 total_receipts,
169 "read receipts from file"
170 );
171 log_interval_start_block = block_number + 1;
172 }
173 log_interval += 1;
174 }
175
176 trace!(target: "downloaders::file",
177 blocks=?log_interval_start_block..=block_number,
178 total_receipts,
179 "read receipts from file"
180 );
181
182 receipts.push(receipts_for_block);
184
185 trace!(target: "downloaders::file",
186 blocks = receipts.len(),
187 total_receipts,
188 "Initialized receipt file client"
189 );
190
191 Ok(DecodedFileChunk {
192 file_client: Self {
193 receipts,
194 first_block: first_block.unwrap_or_default(),
195 total_receipts,
196 _marker: Default::default(),
197 },
198 remaining_bytes,
199 highest_block: Some(block_number),
200 })
201 }
202 }
203}
204
205#[derive(Debug, PartialEq, Eq)]
207pub struct ReceiptWithBlockNumber {
208 pub receipt: Receipt,
210 pub number: u64,
212}
213
214#[cfg(test)]
215mod test {
216 use alloy_primitives::{
217 bytes::{Buf, BytesMut},
218 hex, Address, Bytes, Log, LogData, B256,
219 };
220 use alloy_rlp::{Decodable, RlpDecodable};
221 use reth_primitives::{Receipt, TxType};
222 use reth_tracing::init_test_tracing;
223 use tokio_util::codec::Decoder;
224
225 use super::{FromReceiptReader, ReceiptFileClient, ReceiptWithBlockNumber};
226 use crate::{DecodedFileChunk, FileClientError};
227
228 #[derive(Debug, PartialEq, Eq, RlpDecodable)]
229 struct MockReceipt {
230 tx_type: u8,
231 status: u64,
232 cumulative_gas_used: u64,
233 logs: Vec<Log>,
234 block_number: u64,
235 }
236
237 #[derive(Debug, PartialEq, Eq, RlpDecodable)]
238 #[rlp(trailing)]
239 struct MockReceiptContainer(Option<MockReceipt>);
240
241 impl TryFrom<MockReceipt> for ReceiptWithBlockNumber {
242 type Error = &'static str;
243 fn try_from(exported_receipt: MockReceipt) -> Result<Self, Self::Error> {
244 let MockReceipt { tx_type, status, cumulative_gas_used, logs, block_number: number } =
245 exported_receipt;
246
247 #[allow(clippy::needless_update)]
248 let receipt = Receipt {
249 tx_type: TxType::try_from(tx_type.to_be_bytes()[0])?,
250 success: status != 0,
251 cumulative_gas_used,
252 logs,
253 ..Default::default()
254 };
255
256 Ok(Self { receipt, number })
257 }
258 }
259
260 #[derive(Debug, Default)]
261 struct MockReceiptFileCodec;
262
263 impl Decoder for MockReceiptFileCodec {
264 type Item = Option<ReceiptWithBlockNumber>;
265 type Error = FileClientError;
266
267 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
268 if src.is_empty() {
269 return Ok(None)
270 }
271
272 let buf_slice = &mut src.as_ref();
273 let receipt = MockReceiptContainer::decode(buf_slice)
274 .map_err(|err| Self::Error::Rlp(err, src.to_vec()))?
275 .0;
276 src.advance(src.len() - buf_slice.len());
277
278 Ok(Some(
279 receipt
280 .map(|receipt| receipt.try_into().map_err(FileClientError::from))
281 .transpose()?,
282 ))
283 }
284 }
285
286 const MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS: &[u8] = &hex!("c0");
288
289 const MOCK_RECEIPT_ENCODED_BLOCK_1: &[u8] = &hex!("f901a4f901a1800183031843f90197f89b948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef863a00109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac6027ba00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2da000000000000000000000000000000000000000000000000000000000618d8837f89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68ba000000000000000000000000000000000000000000000000000000000d0e3ebf0a00000000000000000000000000000000000000000000000000000000000014218a000000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d80f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234fa000000000000000000000000000000000000000000000007edc6ca0bb683480008001");
290
291 const MOCK_RECEIPT_ENCODED_BLOCK_2: &[u8] = &hex!("f90106f9010380018301c60df8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68da000000000000000000000000000000000000000000000000000000000d0ea0e40a00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b24080f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234ea000000000000000000000000000000000000000000000007eda7867e0c7d480008002");
292
293 const MOCK_RECEIPT_ENCODED_BLOCK_3: &[u8] = &hex!("f90106f9010380018301c60df8faf89c948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef884a092e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68da000000000000000000000000000000000000000000000000000000000d101e54ba00000000000000000000000000000000000000000000000000000000000014218a0000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a9980f85a948ce8c13d816fe6daf12d6fd9e4952e1fc88850aef842a0fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234ea000000000000000000000000000000000000000000000007ed8842f06277480008003");
294
295 fn mock_receipt_1() -> MockReceipt {
296 let receipt = receipt_block_1();
297 MockReceipt {
298 tx_type: receipt.receipt.tx_type as u8,
299 status: receipt.receipt.success as u64,
300
301 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
302 logs: receipt.receipt.logs,
303 block_number: 1,
304 }
305 }
306
307 fn mock_receipt_2() -> MockReceipt {
308 let receipt = receipt_block_2();
309 MockReceipt {
310 tx_type: receipt.receipt.tx_type as u8,
311 status: receipt.receipt.success as u64,
312
313 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
314 logs: receipt.receipt.logs,
315 block_number: 2,
316 }
317 }
318
319 fn mock_receipt_3() -> MockReceipt {
320 let receipt = receipt_block_3();
321 MockReceipt {
322 tx_type: receipt.receipt.tx_type as u8,
323 status: receipt.receipt.success as u64,
324
325 cumulative_gas_used: receipt.receipt.cumulative_gas_used,
326 logs: receipt.receipt.logs,
327 block_number: 3,
328 }
329 }
330
331 fn receipt_block_1() -> ReceiptWithBlockNumber {
332 let log_1 = Log {
333 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
334 data: LogData::new(
335 vec![
336 B256::from(hex!(
337 "0109fc6f55cf40689f02fbaad7af7fe7bbac8a3d2186600afc7d3e10cac6027b"
338 )),
339 B256::from(hex!(
340 "0000000000000000000000000000000000000000000000000000000000014218"
341 )),
342 B256::from(hex!(
343 "00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"
344 )),
345 ],
346 Bytes::from(hex!(
347 "00000000000000000000000000000000000000000000000000000000618d8837"
348 )),
349 )
350 .unwrap(),
351 };
352
353 let log_2 = Log {
354 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
355 data: LogData::new(
356 vec![
357 B256::from(hex!(
358 "92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68b"
359 )),
360 B256::from(hex!(
361 "00000000000000000000000000000000000000000000000000000000d0e3ebf0"
362 )),
363 B256::from(hex!(
364 "0000000000000000000000000000000000000000000000000000000000014218"
365 )),
366 B256::from(hex!(
367 "00000000000000000000000070b17c0fe982ab4a7ac17a4c25485643151a1f2d"
368 )),
369 ],
370 Bytes::default(),
371 )
372 .unwrap(),
373 };
374
375 let log_3 = Log {
376 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
377 data: LogData::new(
378 vec![
379 B256::from(hex!(
380 "fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234f"
381 )),
382 B256::from(hex!(
383 "00000000000000000000000000000000000000000000007edc6ca0bb68348000"
384 )),
385 ],
386 Bytes::default(),
387 )
388 .unwrap(),
389 };
390
391 let mut receipt = Receipt {
394 tx_type: TxType::Legacy,
395 success: true,
396 cumulative_gas_used: 202819,
397 ..Default::default()
398 };
399 receipt.logs = vec![log_1, log_2, log_3];
400
401 ReceiptWithBlockNumber { receipt, number: 1 }
402 }
403
404 fn receipt_block_2() -> ReceiptWithBlockNumber {
405 let log_1 = Log {
406 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
407 data: LogData::new(
408 vec![
409 B256::from(hex!(
410 "92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68d"
411 )),
412 B256::from(hex!(
413 "00000000000000000000000000000000000000000000000000000000d0ea0e40"
414 )),
415 B256::from(hex!(
416 "0000000000000000000000000000000000000000000000000000000000014218"
417 )),
418 B256::from(hex!(
419 "000000000000000000000000e5e7492282fd1e3bfac337a0beccd29b15b7b240"
420 )),
421 ],
422 Bytes::default(),
423 )
424 .unwrap(),
425 };
426
427 let log_2 = Log {
428 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
429 data: LogData::new(
430 vec![
431 B256::from(hex!(
432 "fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234e"
433 )),
434 B256::from(hex!(
435 "00000000000000000000000000000000000000000000007eda7867e0c7d48000"
436 )),
437 ],
438 Bytes::default(),
439 )
440 .unwrap(),
441 };
442
443 let mut receipt = Receipt {
446 tx_type: TxType::Legacy,
447 success: true,
448 cumulative_gas_used: 116237,
449 ..Default::default()
450 };
451 receipt.logs = vec![log_1, log_2];
452
453 ReceiptWithBlockNumber { receipt, number: 2 }
454 }
455
456 fn receipt_block_3() -> ReceiptWithBlockNumber {
457 let log_1 = Log {
458 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
459 data: LogData::new(
460 vec![
461 B256::from(hex!(
462 "92e98423f8adac6e64d0608e519fd1cefb861498385c6dee70d58fc926ddc68d"
463 )),
464 B256::from(hex!(
465 "00000000000000000000000000000000000000000000000000000000d101e54b"
466 )),
467 B256::from(hex!(
468 "0000000000000000000000000000000000000000000000000000000000014218"
469 )),
470 B256::from(hex!(
471 "000000000000000000000000fa011d8d6c26f13abe2cefed38226e401b2b8a99"
472 )),
473 ],
474 Bytes::default(),
475 )
476 .unwrap(),
477 };
478
479 let log_2 = Log {
480 address: Address::from(hex!("8ce8c13d816fe6daf12d6fd9e4952e1fc88850ae")),
481 data: LogData::new(
482 vec![
483 B256::from(hex!(
484 "fe25c73e3b9089fac37d55c4c7efcba6f04af04cebd2fc4d6d7dbb07e1e5234e"
485 )),
486 B256::from(hex!(
487 "00000000000000000000000000000000000000000000007ed8842f0627748000"
488 )),
489 ],
490 Bytes::default(),
491 )
492 .unwrap(),
493 };
494
495 let mut receipt = Receipt {
498 tx_type: TxType::Legacy,
499 success: true,
500 cumulative_gas_used: 116237,
501 ..Default::default()
502 };
503 receipt.logs = vec![log_1, log_2];
504
505 ReceiptWithBlockNumber { receipt, number: 3 }
506 }
507
508 #[test]
509 fn decode_mock_receipt() {
510 let receipt1 = mock_receipt_1();
511 let decoded1 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_1[..])
512 .unwrap()
513 .0
514 .unwrap();
515 assert_eq!(receipt1, decoded1);
516
517 let receipt2 = mock_receipt_2();
518 let decoded2 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_2[..])
519 .unwrap()
520 .0
521 .unwrap();
522 assert_eq!(receipt2, decoded2);
523
524 let receipt3 = mock_receipt_3();
525 let decoded3 = MockReceiptContainer::decode(&mut &MOCK_RECEIPT_ENCODED_BLOCK_3[..])
526 .unwrap()
527 .0
528 .unwrap();
529 assert_eq!(receipt3, decoded3);
530 }
531
532 #[test]
533 #[allow(clippy::needless_update)]
534 fn receipts_codec() {
535 let mut receipt_1_to_3 = MOCK_RECEIPT_ENCODED_BLOCK_1.to_vec();
538 receipt_1_to_3.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
539 receipt_1_to_3.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
540
541 let encoded = &mut BytesMut::from(&receipt_1_to_3[..]);
542
543 let mut codec = MockReceiptFileCodec;
544
545 let first_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
548
549 assert_eq!(receipt_block_1(), first_decoded_receipt);
550
551 let second_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
552
553 assert_eq!(receipt_block_2(), second_decoded_receipt);
554
555 let third_decoded_receipt = codec.decode(encoded).unwrap().unwrap().unwrap();
556
557 assert_eq!(receipt_block_3(), third_decoded_receipt);
558 }
559
560 #[tokio::test]
561 async fn receipt_file_client_ovm_codec() {
562 init_test_tracing();
563
564 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
566 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
568 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
569 encoded_receipts.extend_from_slice(MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS);
571
572 let encoded_byte_len = encoded_receipts.len() as u64;
573 let reader = &mut &encoded_receipts[..];
574
575 let DecodedFileChunk {
576 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
577 ..
578 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
579 reader,
580 encoded_byte_len,
581 None,
582 )
583 .await
584 .unwrap();
585
586 assert_eq!(2, total_receipts);
588 assert_eq!(0, first_block);
589 assert!(receipts[0].is_empty());
590 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone().unwrap());
591 assert_eq!(receipt_block_2().receipt, receipts[2][0].clone().unwrap());
592 assert!(receipts[3].is_empty());
593 }
594
595 #[tokio::test]
596 async fn no_receipts_middle_block() {
597 init_test_tracing();
598
599 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
601 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
603 encoded_receipts.extend_from_slice(MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS);
605 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
607
608 let encoded_byte_len = encoded_receipts.len() as u64;
609 let reader = &mut &encoded_receipts[..];
610
611 let DecodedFileChunk {
612 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
613 ..
614 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
615 reader,
616 encoded_byte_len,
617 None,
618 )
619 .await
620 .unwrap();
621
622 assert_eq!(2, total_receipts);
624 assert_eq!(0, first_block);
625 assert!(receipts[0].is_empty());
626 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone().unwrap());
627 assert!(receipts[2].is_empty());
628 assert_eq!(receipt_block_3().receipt, receipts[3][0].clone().unwrap());
629 }
630
631 #[tokio::test]
632 async fn two_receipts_same_block() {
633 init_test_tracing();
634
635 let mut encoded_receipts = MOCK_RECEIPT_BLOCK_NO_TRANSACTIONS.to_vec();
637 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_1);
639 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
641 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_2);
642 encoded_receipts.extend_from_slice(MOCK_RECEIPT_ENCODED_BLOCK_3);
644
645 let encoded_byte_len = encoded_receipts.len() as u64;
646 let reader = &mut &encoded_receipts[..];
647
648 let DecodedFileChunk {
649 file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
650 ..
651 } = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
652 reader,
653 encoded_byte_len,
654 None,
655 )
656 .await
657 .unwrap();
658
659 assert_eq!(4, total_receipts);
661 assert_eq!(0, first_block);
662 assert!(receipts[0].is_empty());
663 assert_eq!(receipt_block_1().receipt, receipts[1][0].clone().unwrap());
664 assert_eq!(receipt_block_2().receipt, receipts[2][0].clone().unwrap());
665 assert_eq!(receipt_block_2().receipt, receipts[2][1].clone().unwrap());
666 assert_eq!(receipt_block_3().receipt, receipts[3][0].clone().unwrap());
667 }
668}