Reading the broadcast log
audience: users
Zipnet::<D>::read returns a typed stream of shuffled outputs.
Every subscriber bonded to the same deployment sees the same
decoded values in the same order.
The whole surface
impl<D: ShuffleDatum> Zipnet<D> {
pub async fn read(
network: &Arc<mosaik::Network>,
config: &zipnet::Config,
) -> Result<Reader<D>>;
}
impl<D: ShuffleDatum> futures::Stream for Reader<D> {
type Item = D;
}
Every call to read returns a fresh Reader<D>; handles are cheap
and independent. Reader<D>::Item is your datum type directly —
tag-verify failures, slot collisions, and D::decode errors are
filtered silently inside the SDK.
Tail the log as it grows
use std::sync::Arc;
use futures::StreamExt;
use mosaik::Network;
use zipnet::{Config, ShuffleWindow, UNIVERSE, Zipnet};
use zipnet::{DecodeError, ShuffleDatum, UniqueId, unique_id};
pub struct Note(pub [u8; 240]);
impl ShuffleDatum for Note {
const TYPE_TAG: UniqueId = unique_id!("acme.note-v1");
const WIRE_SIZE: usize = 240;
fn encode(&self) -> Vec<u8> { self.0.to_vec() }
fn decode(b: &[u8]) -> Result<Self, DecodeError> {
<[u8; 240]>::try_from(b).map(Self).map_err(|e| DecodeError(e.to_string()))
}
}
const ACME_MAINNET: Config = Config::new("acme.mainnet")
.with_window(ShuffleWindow::interactive())
.with_init([0u8; 32]);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network = Arc::new(Network::new(UNIVERSE).await?);
let mut reader = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;
while let Some(note) = reader.next().await {
println!("got {} bytes", note.0.len());
}
Ok(())
}
Reader<D> yields fully decoded values. There is no Round or
Message wrapper to pick apart — round-level metadata is an
implementation detail of the protocol, not a consumer concern. If
you need round / slot numbers for a specific submission, use the
matching Receipts<D>
stream instead.
Starting point and catch-up
A fresh Reader<D> begins from whatever the committee finalizes
next. Earlier rounds are not replayed. If you need the full history,
open the reader before any submissions you care about and buffer
yourself.
Handling a slow consumer
If your reader falls behind — usually because your per-item handler is slower than the round cadence — the SDK’s internal broadcast channel lags. Stay ahead of the protocol by draining the stream in a dedicated task and handing work off to a bounded channel you control:
use futures::StreamExt;
use tokio::sync::mpsc;
use zipnet::Zipnet;
let mut reader = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;
let (tx, mut rx) = mpsc::channel(1024);
// Producer: drain the SDK stream as fast as it delivers.
tokio::spawn(async move {
while let Some(note) = reader.next().await {
if tx.send(note).await.is_err() {
break;
}
}
});
// Consumer: heavy per-item work that can tolerate small bursts.
while let Some(note) = rx.recv().await {
handle(note).await;
}
With this shape, the SDK’s internal buffer drains continuously; the bounded channel between tasks is the one that can fill up, and you control its size.
Multiple readers
One Arc<Network> can host many readers bonded to the same
deployment:
let reader_a = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;
let reader_b = Zipnet::<Note>::read(&network, &ACME_MAINNET).await?;
Both receive the same values in the same order. Independent lag:
slowing down reader A does not affect reader B. Readers bonded to
different deployments (different Config) see disjoint streams.
Shutdown
Dropping the Reader<D> is enough. The SDK’s driver stays up as
long as the Arc<Network> lives; opening a fresh reader later gives
you a stream from the then-current point in the log.