Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Publishing messages

audience: developers

Everything about getting a typed payload into a finalized broadcast round.

The whole surface

impl<D: ShuffleDatum> Zipnet<D> {
    pub async fn submit(
        network: &Arc<mosaik::Network>,
        config: &zipnet::Config,
    ) -> Result<Submitter<D>>;
}

impl<D: ShuffleDatum> Submitter<D> {
    pub async fn send(&self, datum: D) -> Result<SubmissionId>;
}

pub struct SubmissionId(pub u64);

Submitter::send returns immediately with a SubmissionId once the payload has been enqueued. A background driver owned by the handle watches the deployment’s live-round header, seals one envelope per round that includes your client, and pushes it onto the committee’s submit stream. Dropping the last clone of the Submitter tears the driver down cleanly.

v1 note — landing outcomes. The receipts stream described in the roadmap is deferred to v2. Calling Zipnet::<D>::receipts today returns Error::Protocol("deferred to v2"). Applications that need per-submission confirmation poll their own Reader<D> for the round(s) they care about, or rely on an application-level acknowledgement above the shuffler. The sections below describe both patterns.

Fire-and-forget

use std::sync::Arc;
use mosaik::Network;
use zipnet::{Config, ShuffleWindow, UNIVERSE, Zipnet};
use zipnet::{DecodeError, ShuffleDatum, UniqueId, unique_id};
pub struct Note(pub [u8; 240]);
impl ShuffleDatum for Note {
    const TYPE_TAG: UniqueId = unique_id!("acme.note-v1");
    const WIRE_SIZE: usize = 240;
    fn encode(&self) -> Vec<u8> { self.0.to_vec() }
    fn decode(b: &[u8]) -> Result<Self, DecodeError> {
        <[u8; 240]>::try_from(b).map(Self).map_err(|e| DecodeError(e.to_string()))
    }
}

const ACME_MAINNET: Config = Config::new("acme.mainnet")
    .with_window(ShuffleWindow::interactive())
    .with_init([0u8; 32]);

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let network = Arc::new(Network::new(UNIVERSE).await?);
    let tx      = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;

    let _ = tx.send(Note([0u8; 240])).await?;
    Ok(())
}

Many applications do exactly this — the encryption or ordering layer built on top replays lost messages at the application level, so a per-submission outcome from the SDK is not needed.

Confirming delivery via the reader

For at-least-once delivery, the idiom in v1 is to open a Reader<D> alongside the submitter and check for byte-equality on the stream:

use futures::StreamExt;
use zipnet::Zipnet;

let tx         = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let mut reader = Zipnet::<Note>::read  (&network, &ACME_MAINNET).await?;

let sent = Note([0u8; 240]);
let _    = tx.send(sent.clone()).await?;

// Wait up to a few rounds for `sent` to land in the broadcast.
use tokio::time::{timeout, Duration};
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
    if tokio::time::Instant::now() >= deadline { break; }
    match timeout(Duration::from_secs(2), reader.next()).await {
        Ok(Some(got)) if got == sent => {
            tracing::info!("landed");
            break;
        }
        Ok(Some(_)) => continue,     // someone else's note
        Ok(None)    => break,        // stream closed
        Err(_)      => continue,     // per-round timeout
    }
}

This is strictly more anonymity-preserving than an ECIES receipts stream would be — no receipts exist on the wire for an observer to count — at the cost that your application compares payload bytes rather than consulting a typed Outcome.

Retry policy

Zipnet does not retry for you. If you need at-least-once delivery at the application layer, wrap send in your own loop that consults the Reader:

use futures::StreamExt;
use std::time::Duration;
use tokio::time::timeout;
use zipnet::{Reader, Submitter};

async fn send_with_retry<D>(
    tx: &Submitter<D>,
    reader: &mut Reader<D>,
    datum: D,
    attempts: u32,
) -> zipnet::Result<()>
where
    D: zipnet::ShuffleDatum + Clone + PartialEq,
{
    for _ in 0..attempts {
        let _ = tx.send(datum.clone()).await?;
        // Give each attempt ~3 rounds before declaring it lost.
        let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
        while tokio::time::Instant::now() < deadline {
            match timeout(Duration::from_secs(1), reader.next()).await {
                Ok(Some(got)) if got == datum => return Ok(()),
                Ok(Some(_)) | Err(_)          => continue,
                Ok(None)                      => return Err(zipnet::Error::Shutdown),
            }
        }
    }
    Err(zipnet::Error::Protocol("not delivered after retries".into()))
}

Retry latency is bounded by the deployment’s round cadence — at the interactive() preset’s 1s round period, three attempts cost ~3–10s depending on collision luck. Tune attempts to your SLA.

Payload shape

Submitter::send(D) takes your datum type directly. The SDK calls D::encode to produce exactly D::WIRE_SIZE bytes and rejects any encoding of a different length with Error::Protocol — that is a programmer error in your ShuffleDatum impl, not a runtime condition.

WIRE_SIZE is set once per datum type and folds into the deployment identity. Variable application data must be padded at the application layer before wrapping into D; see Quickstart — Define your datum type for why constant size is non-negotiable.

Cover traffic is on by default

An idle Submitter<D> still keeps its driver alive — whenever the deployment opens a round that includes this client, the driver pulls the next queued payload or seals cover traffic (a zero-message envelope) if nothing is queued. Cover envelopes do not surface on your side; observers cannot distinguish a cover round from a real-payload round for any given participant.

There is no SDK knob to tune cover-traffic rate today. If you hold a Submitter handle, you participate; if you drop it, you don’t. For applications that want to only appear for certain rounds, open the submitter immediately before you need to send and drop immediately after — see Identity.

Parallel sends on one handle

Submitter<D> is Clone and internally Arc-wrapped. Concurrent send calls on one handle are fine; the driver serializes them per-round and emits at most one payload per round per submitter. If you call send twice during the same round window, the second call waits for the next round rather than sharing the slot.

let tx = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let a  = tx.clone();
let b  = tx.clone();

let (_, _) = tokio::join!(
    a.send(Note(*b"message A")),
    b.send(Note(*b"message B")),
);
// The two notes land in different rounds. Match them back via the
// Reader if you need confirmation.

If you need higher throughput per wall-clock second, the right lever is operator-side round cadence or slot count. From the SDK, one submitter is one slot per round.

Shutdown

drop(tx);   // the driver drains pending items, then exits

Dropping the last clone of a Submitter aborts its driver task cleanly. In-flight send calls that have not been picked up by the driver surface as Error::Shutdown. Any Reader<D> bound to the same deployment is unaffected.

Dropping one Submitter handle does not tear down the Network. Other services or other zipnet deployments sharing the same Arc<Network> keep running.