Publishing messages
audience: developers
Everything about getting a typed payload into a finalized broadcast round.
The whole surface
impl<D: ShuffleDatum> Zipnet<D> {
pub async fn submit(
network: &Arc<mosaik::Network>,
config: &zipnet::Config,
) -> Result<Submitter<D>>;
}
impl<D: ShuffleDatum> Submitter<D> {
pub async fn send(&self, datum: D) -> Result<SubmissionId>;
}
pub struct SubmissionId(pub u64);
Submitter::send returns immediately with a SubmissionId once the
payload has been enqueued. A background driver owned by the handle
watches the deployment’s live-round header, seals one envelope per
round that includes your client, and pushes it onto the committee’s
submit stream. Dropping the last clone of the Submitter tears the
driver down cleanly.
v1 note — landing outcomes. The receipts stream described in the roadmap is deferred to v2. Calling
Zipnet::<D>::receiptstoday returnsError::Protocol("deferred to v2"). Applications that need per-submission confirmation poll their ownReader<D>for the round(s) they care about, or rely on an application-level acknowledgement above the shuffler. The sections below describe both patterns.
Fire-and-forget
use std::sync::Arc;
use mosaik::Network;
use zipnet::{Config, ShuffleWindow, UNIVERSE, Zipnet};
use zipnet::{DecodeError, ShuffleDatum, UniqueId, unique_id};
pub struct Note(pub [u8; 240]);
impl ShuffleDatum for Note {
const TYPE_TAG: UniqueId = unique_id!("acme.note-v1");
const WIRE_SIZE: usize = 240;
fn encode(&self) -> Vec<u8> { self.0.to_vec() }
fn decode(b: &[u8]) -> Result<Self, DecodeError> {
<[u8; 240]>::try_from(b).map(Self).map_err(|e| DecodeError(e.to_string()))
}
}
const ACME_MAINNET: Config = Config::new("acme.mainnet")
.with_window(ShuffleWindow::interactive())
.with_init([0u8; 32]);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let network = Arc::new(Network::new(UNIVERSE).await?);
let tx = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let _ = tx.send(Note([0u8; 240])).await?;
Ok(())
}
Many applications do exactly this — the encryption or ordering layer built on top replays lost messages at the application level, so a per-submission outcome from the SDK is not needed.
Confirming delivery via the reader
For at-least-once delivery, the idiom in v1 is to open a Reader<D>
alongside the submitter and check for byte-equality on the stream:
use futures::StreamExt;
use zipnet::Zipnet;
let tx = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let mut reader = Zipnet::<Note>::read (&network, &ACME_MAINNET).await?;
let sent = Note([0u8; 240]);
let _ = tx.send(sent.clone()).await?;
// Wait up to a few rounds for `sent` to land in the broadcast.
use tokio::time::{timeout, Duration};
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if tokio::time::Instant::now() >= deadline { break; }
match timeout(Duration::from_secs(2), reader.next()).await {
Ok(Some(got)) if got == sent => {
tracing::info!("landed");
break;
}
Ok(Some(_)) => continue, // someone else's note
Ok(None) => break, // stream closed
Err(_) => continue, // per-round timeout
}
}
This is strictly more anonymity-preserving than an ECIES receipts
stream would be — no receipts exist on the wire for an observer to
count — at the cost that your application compares payload bytes
rather than consulting a typed Outcome.
Retry policy
Zipnet does not retry for you. If you need at-least-once delivery at
the application layer, wrap send in your own loop that consults
the Reader:
use futures::StreamExt;
use std::time::Duration;
use tokio::time::timeout;
use zipnet::{Reader, Submitter};
async fn send_with_retry<D>(
tx: &Submitter<D>,
reader: &mut Reader<D>,
datum: D,
attempts: u32,
) -> zipnet::Result<()>
where
D: zipnet::ShuffleDatum + Clone + PartialEq,
{
for _ in 0..attempts {
let _ = tx.send(datum.clone()).await?;
// Give each attempt ~3 rounds before declaring it lost.
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
match timeout(Duration::from_secs(1), reader.next()).await {
Ok(Some(got)) if got == datum => return Ok(()),
Ok(Some(_)) | Err(_) => continue,
Ok(None) => return Err(zipnet::Error::Shutdown),
}
}
}
Err(zipnet::Error::Protocol("not delivered after retries".into()))
}
Retry latency is bounded by the deployment’s round cadence — at the
interactive() preset’s 1s round period, three attempts cost
~3–10s depending on collision luck. Tune attempts to your SLA.
Payload shape
Submitter::send(D) takes your datum type directly. The SDK calls
D::encode to produce exactly D::WIRE_SIZE bytes and rejects any
encoding of a different length with Error::Protocol — that is a
programmer error in your ShuffleDatum impl, not a runtime
condition.
WIRE_SIZE is set once per datum type and folds into the deployment
identity. Variable application data must be padded at the
application layer before wrapping into D; see
Quickstart — Define your datum type
for why constant size is non-negotiable.
Cover traffic is on by default
An idle Submitter<D> still keeps its driver alive — whenever the
deployment opens a round that includes this client, the driver
pulls the next queued payload or seals cover traffic (a zero-message
envelope) if nothing is queued. Cover envelopes do not surface on
your side; observers cannot distinguish a cover round from a
real-payload round for any given participant.
There is no SDK knob to tune cover-traffic rate today. If you hold a
Submitter handle, you participate; if you drop it, you don’t. For
applications that want to only appear for certain rounds, open the
submitter immediately before you need to send and drop immediately
after — see Identity.
Parallel sends on one handle
Submitter<D> is Clone and internally Arc-wrapped. Concurrent
send calls on one handle are fine; the driver serializes them
per-round and emits at most one payload per round per submitter. If
you call send twice during the same round window, the second call
waits for the next round rather than sharing the slot.
let tx = Zipnet::<Note>::submit(&network, &ACME_MAINNET).await?;
let a = tx.clone();
let b = tx.clone();
let (_, _) = tokio::join!(
a.send(Note(*b"message A")),
b.send(Note(*b"message B")),
);
// The two notes land in different rounds. Match them back via the
// Reader if you need confirmation.
If you need higher throughput per wall-clock second, the right lever is operator-side round cadence or slot count. From the SDK, one submitter is one slot per round.
Shutdown
drop(tx); // the driver drains pending items, then exits
Dropping the last clone of a Submitter aborts its driver task
cleanly. In-flight send calls that have not been picked up by the
driver surface as Error::Shutdown. Any Reader<D> bound to the
same deployment is unaffected.
Dropping one Submitter handle does not tear down the
Network. Other services or other zipnet deployments sharing the
same Arc<Network> keep running.