Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Reading the broadcast log

audience: users

Zipnet::<D>::read returns a typed stream of shuffled outputs. Every subscriber bonded to the same deployment sees the same decoded values in the same order.

The whole surface

impl<D: ShuffleDatum> Zipnet<D> {
    pub async fn read(
        network: &Arc<mosaik::Network>,
        config: &zipnet::Config,
    ) -> Result<Reader<D>>;
}

impl<D: ShuffleDatum> futures::Stream for Reader<D> {
    type Item = D;
}

Every call to read returns a fresh Reader<D>; handles are cheap and independent. Reader<D>::Item is your datum type directly — tag-verify failures, slot collisions, and D::decode errors are filtered silently inside the SDK.

Tail the log as it grows

use std::sync::Arc;
use futures::StreamExt;
use mosaik::Network;
use zipnet::{Config, ShuffleWindow, UNIVERSE, Zipnet};
use zipnet::{DecodeError, ShuffleDatum, UniqueId, unique_id};
pub struct Note(pub [u8; 240]);
impl ShuffleDatum for Note {
    const TYPE_TAG: UniqueId = unique_id!("acme.note-v1");
    const WIRE_SIZE: usize = 240;
    fn encode(&self) -> Vec<u8> { self.0.to_vec() }
    fn decode(b: &[u8]) -> Result<Self, DecodeError> {
        <[u8; 240]>::try_from(b).map(Self).map_err(|e| DecodeError(e.to_string()))
    }
}

const ACME_MAINNET: Config = Config::new("acme.mainnet")
    .with_window(ShuffleWindow::interactive())
    .with_init([0u8; 32]);

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network    = Arc::new(Network::new(UNIVERSE).await?);
    let mut reader = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;

    while let Some(note) = reader.next().await {
        println!("got {} bytes", note.0.len());
    }
    Ok(())
}

Reader<D> yields fully decoded values. There is no Round or Message wrapper to pick apart — round-level metadata is an implementation detail of the protocol, not a consumer concern. If you need round / slot numbers for a specific submission, use the matching Receipts<D> stream instead.

Starting point and catch-up

A fresh Reader<D> begins from whatever the committee finalizes next. Earlier rounds are not replayed. If you need the full history, open the reader before any submissions you care about and buffer yourself.

Handling a slow consumer

If your reader falls behind — usually because your per-item handler is slower than the round cadence — the SDK’s internal broadcast channel lags. Stay ahead of the protocol by draining the stream in a dedicated task and handing work off to a bounded channel you control:

use futures::StreamExt;
use tokio::sync::mpsc;
use zipnet::Zipnet;

let mut reader = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;
let (tx, mut rx) = mpsc::channel(1024);

// Producer: drain the SDK stream as fast as it delivers.
tokio::spawn(async move {
    while let Some(note) = reader.next().await {
        if tx.send(note).await.is_err() {
            break;
        }
    }
});

// Consumer: heavy per-item work that can tolerate small bursts.
while let Some(note) = rx.recv().await {
    handle(note).await;
}

With this shape, the SDK’s internal buffer drains continuously; the bounded channel between tasks is the one that can fill up, and you control its size.

Multiple readers

One Arc<Network> can host many readers bonded to the same deployment:

let reader_a = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;
let reader_b = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;

Both receive the same values in the same order. Independent lag: slowing down reader A does not affect reader B. Readers bonded to different deployments (different Config) see disjoint streams.

Shutdown

Dropping the Reader<D> is enough. The SDK’s driver stays up as long as the Arc<Network> lives; opening a fresh reader later gives you a stream from the then-current point in the log.