Publishing messages
audience: users
Everything about getting a typed payload into a finalized broadcast round.
The whole surface
impl<D: ShuffleDatum> Zipnet<D> {
pub async fn submit(
network: &Arc<mosaik::Network>,
config: &zipnet::Config,
) -> Result<Submitter<D>>;
}
impl<D: ShuffleDatum> Submitter<D> {
pub async fn send(&self, datum: D) -> Result<SubmissionId>;
}
pub struct SubmissionId(pub u64);
pub struct Receipt {
pub submission_id: SubmissionId,
pub round: zipnet::RoundId,
pub slot: usize,
pub outcome: Outcome,
}
pub enum Outcome { Landed, Collided, Dropped }
Submitter::send returns immediately with a SubmissionId once the
payload has been enqueued. The landing outcome arrives asynchronously
on the receipts stream, correlated by
the same SubmissionId. Open both handles from the same Config; they bond
to the same deployment.
Fire-and-forget
use std::sync::Arc;
use mosaik::Network;
use zipnet::{Config, ShuffleWindow, UNIVERSE, Zipnet};
use zipnet::{DecodeError, ShuffleDatum, UniqueId, unique_id};
pub struct Note(pub [u8; 240]);
impl ShuffleDatum for Note {
const TYPE_TAG: UniqueId = unique_id!("acme.note-v1");
const WIRE_SIZE: usize = 240;
fn encode(&self) -> Vec<u8> { self.0.to_vec() }
fn decode(b: &[u8]) -> Result<Self, DecodeError> {
<[u8; 240]>::try_from(b).map(Self).map_err(|e| DecodeError(e.to_string()))
}
}
const ACME_MAINNET: Config = Config::new("acme.mainnet")
.with_window(ShuffleWindow::interactive())
.with_init([0u8; 32]);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network = Arc::new(Network::new(UNIVERSE).await?);
let tx = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let _ = tx.send(Note([0u8; 240])).await?;
Ok(())
}
If you don’t care whether the message landed, collided, or was
dropped, discard the SubmissionId and never open a Receipts
stream. Many applications do exactly this — the encryption or
ordering layer built on top replays lost messages at the application
level.
Inspecting the outcome
Open a receipts stream alongside the submitter and correlate by
SubmissionId:
use futures::StreamExt;
use zipnet::{Outcome, Zipnet};
let tx = Zipnet::<Note>::submit (&network, &ACME_MAINNET).await?;
let mut rx = Zipnet::<Note>::receipts(&network, &ACME_MAINNET).await?;
let sent = tx.send(Note([0u8; 240])).await?;
while let Some(receipt) = rx.next().await {
if receipt.submission_id != sent {
continue;
}
match receipt.outcome {
Outcome::Landed => {
tracing::info!(round = %receipt.round, "published");
break;
}
Outcome::Collided => {
// Another client hashed to the same slot this round.
// Both payloads are XOR-corrupted. Retry.
tracing::warn!(round = %receipt.round, "collision, retrying");
break;
}
Outcome::Dropped => {
// The aggregator never forwarded the envelope into a
// committed aggregate. Usually transient.
tracing::warn!(round = %receipt.round, "dropped, retrying");
break;
}
}
}
Landed is the happy path. Under default parameters and a small
active set, most rounds produce Landed for everyone.
Retry policy
Zipnet does not retry for you. If you need at-least-once delivery at
the application layer, wrap send in your own loop that consults the
receipts stream:
use futures::StreamExt;
use zipnet::{Outcome, Receipts, Submitter, SubmissionId};
async fn send_with_retry<D: zipnet::ShuffleDatum + Clone>(
tx: &Submitter<D>,
rx: &mut Receipts<D>,
datum: D,
attempts: u32,
) -> zipnet::Result<SubmissionId> {
for _ in 0..attempts {
let id = tx.send(datum.clone()).await?;
while let Some(receipt) = rx.next().await {
if receipt.submission_id != id {
continue;
}
if matches!(receipt.outcome, Outcome::Landed) {
return Ok(id);
}
break; // try again
}
}
tx.send(datum).await
}
Retry latency is bounded by the deployment’s round cadence — at the
interactive() preset’s 1s round period, three attempts cost up to
~3s. Tune attempts to your SLA.
Payload shape
Submitter::send(D) takes your datum type directly. The SDK calls
D::encode to produce exactly D::WIRE_SIZE bytes and rejects any
encoding of a different length with Error::Protocol — that is a
programmer error in your ShuffleDatum impl, not a runtime
condition.
WIRE_SIZE is set once per datum type and folds into the deployment
identity. Variable application data must be padded at the
application layer before wrapping into D; see
Quickstart — Define your datum type
for why constant size is non-negotiable.
Cover traffic is on by default
An idle Submitter<D> sends a cover envelope each round to widen
the anonymity set. Cover envelopes do not show up as send calls on
your side — they are generated automatically by the handle’s driver
task. Observers cannot distinguish a cover round from a real-payload
round for any given participant.
There is no SDK knob to tune cover-traffic rate today. If you hold a
Submitter handle, you participate; if you drop it, you don’t. For
applications that want to only appear for certain rounds, open the
submitter immediately before you need to send and drop immediately
after — see Identity.
Parallel sends on one handle
Submitter<D> is Clone and internally Arc-wrapped. Concurrent
send calls on one handle are fine; the driver serializes them
per-round and emits at most one payload per round per submitter. If
you call send twice during the same round window, the second call
waits for the next round rather than sharing the slot.
let tx = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let a = tx.clone();
let b = tx.clone();
let (ra, rb) = tokio::join!(
a.send(Note(*b"message A")),
b.send(Note(*b"message B")),
);
// ra and rb come from different rounds; check their outcomes on
// the receipts stream.
If you need higher throughput per wall-clock second, the right lever is operator-side round cadence or slot count. From the SDK, one submitter is one slot per round.
Shutdown
drop(tx); // close the writer; receipts/reader stay open
Dropping the last clone of a Submitter handle closes it; in-flight
send calls that haven’t been picked up by the driver surface as
Error::Shutdown. Other handles (Receipts<D>, Reader<D>) are
unaffected and keep delivering as long as they hold the Network.
Dropping one Submitter handle does not tear down the Network.
Other services or other zipnet deployments sharing the same
Arc<Network> keep running.