Tracking State
In this chapter, we'll learn how to keep track of some state inside our ExEx.
Let's continue with our Hello World example from the previous chapter.
Turning ExEx into a struct
First, we need to turn our ExEx into a stateful struct.
Before, we had just an async function, but now we'll need to implement
the Future trait manually.
Having a stateful async function is also possible, but it makes testing harder, because you can't access variables inside the function to assert the state of your ExEx.
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use futures_util::{FutureExt, TryStreamExt};
use reth::{api::FullNodeComponents, builder::NodeTypes, primitives::EthPrimitives};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
}
impl<Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>> Future
for MyExEx<Node>
{
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
match ¬ification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
if let Some(committed_chain) = notification.committed_chain() {
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(async move |builder, _| {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", async move |ctx| Ok(MyExEx { ctx }))
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
For those who are not familiar with how async Rust works on a lower level, that may seem scary, but let's unpack what's going on here:
- Our ExEx is now a
structthat contains the context and implements theFuturetrait. It's now pollable (henceawait-able). - We can't use
selfdirectly inside ourpollmethod, and instead need to acquire a mutable reference to the data inside of thePin. Read more about pinning in the book. - We also can't use
awaitdirectly insidepoll, and instead need to poll futures manually. We wrap the call topoll_recv(cx)into aready!macro, so that if the channel of notifications has no value ready, we will instantly returnPoll::Pendingfrom our Future. - We initialize and return the
MyExExstruct directly in theinstall_exexmethod, because it's a Future.
With all that done, we're now free to add more fields to our MyExEx struct, and track some state in them.
Adding state
Our ExEx will count the number of transactions in each block and log it to the console.
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use alloy_primitives::BlockNumber;
use futures_util::{FutureExt, TryStreamExt};
use reth::{
api::{BlockBody, FullNodeComponents},
builder::NodeTypes,
primitives::EthPrimitives,
};
use reth_exex::{ExExContext, ExExEvent};
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
struct MyExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
/// First block that was committed since the start of the ExEx.
first_block: Option<BlockNumber>,
/// Total number of transactions committed.
transactions: u64,
}
impl<Node: FullNodeComponents> MyExEx<Node> {
fn new(ctx: ExExContext<Node>) -> Self {
Self { ctx, first_block: None, transactions: 0 }
}
}
impl<Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>> Future
for MyExEx<Node>
{
type Output = eyre::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.try_next().poll_unpin(cx))? {
if let Some(reverted_chain) = notification.reverted_chain() {
this.transactions = this.transactions.saturating_sub(
reverted_chain.blocks_iter().map(|b| b.body().transaction_count() as u64).sum(),
);
}
if let Some(committed_chain) = notification.committed_chain() {
this.first_block.get_or_insert(committed_chain.first().number);
this.transactions += committed_chain
.blocks_iter()
.map(|b| b.body().transaction_count() as u64)
.sum::<u64>();
this.ctx
.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
}
if let Some(first_block) = this.first_block {
info!(%first_block, transactions = %this.transactions, "Total number of transactions");
}
}
Poll::Ready(Ok(()))
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(async move |builder, _| {
let handle = builder
.node(EthereumNode::default())
.install_exex("my-exex", async move |ctx| Ok(MyExEx::new(ctx)))
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
As you can see, we added two fields to our ExEx struct:
first_blockto keep track of the first block that was committed since the start of the ExEx.transactionsto keep track of the total number of transactions committed, accounting for reorgs and reverts.
We also changed our match block to two if clauses:
- First one checks if there's a reverted chain using
notification.reverted_chain(). If there is:- We subtract the number of transactions in the reverted chain from the total number of transactions.
- It's important to do the
saturating_subhere, because if we just started our node and instantly received a reorg, ourtransactionsfield will still be zero.
- Second one checks if there's a committed chain using
notification.committed_chain(). If there is:- We update the
first_blockfield to the first block of the committed chain. - We add the number of transactions in the committed chain to the total number of transactions.
- We send a
FinishedHeightevent back to the main node.
- We update the
Finally, on every notification, we log the total number of transactions and the first block that was committed since the start of the ExEx.