src/provider.rs
audience: ai
The Compute-module provider loop. Four responsibilities:
- Publish a provider card at boot and refresh it on capacity change. The card now folds the union of capabilities across every enabled backend.
- Subscribe to
ComputeGrants addressed to this provider. - For each grant: resolve the zipnet envelope,
cross-check the declared image hash, route to the
right backend via
backends/mod.rs, seal an SSH receipt viareceipt.rs, reply viazipnet_io.rs. - Spawn a watcher that emits a
ComputeLogwhen the instance exits or the grant’svalid_topasses.
The select! in run keeps one action per tick —
either handle a grant or refresh the card — rather than
racing both concurrently. Keeps accounting honest.
//! Compute-module provider loop.
//!
//! Responsibilities:
//!
//! 1. Publish a `ProviderCard` at boot, refresh it on
//! capacity change.
//! 2. Subscribe to `ComputeGrant`s addressed to this
//! provider.
//! 3. For each grant:
//! a. Read the matching zipnet request envelope to
//! obtain the requester's `peer_id` public key
//! and declared image hash.
//! b. Provision a workload on whichever backend —
//! AWS, GCP, Azure, bare-metal — the fleet
//! decides can satisfy it.
//! c. Encrypt and return an SSH access receipt to
//! the requester via zipnet.
//! d. On workload completion or timeout, emit a
//! `ComputeLog`.
use std::sync::Arc;
use anyhow::Context;
use coalition::ConfluenceConfig;
use coalition_compute::{
ComputeGrant, ComputeLog, ProviderCard, ProviderId,
UsageMetrics,
};
use mosaik::tee::tdx::Quote;
use mosaik::Network;
use crate::backends::Fleet;
use crate::config::ProviderBootConfig;
use crate::dashboard::{Dashboard, DashboardEvent, UsageDelta};
use crate::receipt::SshAccessReceipt;
use crate::zipnet_io::ZipnetChannel;
pub struct Provider<'cfg> {
network: Arc<Network>,
compute: &'cfg ConfluenceConfig<'cfg>,
tdx_quote: Quote,
zipnet: ZipnetChannel,
fleet: Fleet,
config: ProviderBootConfig,
dashboard: Arc<Dashboard>,
}
impl<'cfg> Provider<'cfg> {
pub fn new(
network: Arc<Network>,
compute: &'cfg ConfluenceConfig<'cfg>,
tdx_quote: Quote,
zipnet: ZipnetChannel,
fleet: Fleet,
config: ProviderBootConfig,
dashboard: Arc<Dashboard>,
) -> Self {
Self { network, compute, tdx_quote, zipnet, fleet, config, dashboard }
}
/// Main loop. Runs forever until the process is
/// terminated; retirement requires a clean operator
/// stop which emits a `RetirementMarker` on the
/// provider's own stream.
pub async fn run(self) -> anyhow::Result<()> {
let provider_id = self.register_provider_card().await?;
tracing::info!(provider = %provider_id_hex(&provider_id),
"provider card registered");
let mut grants = coalition_compute::grants_for(
&self.network, self.compute, provider_id,
).await?;
// Capacity-refresh ticker runs alongside grant
// handling. Using a select! keeps the code honest:
// this provider does one thing per tick.
let mut refresh = tokio::time::interval(
std::time::Duration::from_secs(
self.config.capacity_refresh_sec.max(1) as u64,
),
);
loop {
tokio::select! {
Some(grant) = grants.next() => {
if let Err(err) = self.handle_grant(&grant).await {
tracing::warn!(
request_id = ?grant.request_id,
error = %err,
"grant handling failed",
);
}
}
_ = refresh.tick() => {
if let Err(err) = self.refresh_provider_card(provider_id).await {
tracing::warn!(error = %err,
"provider card refresh failed");
}
}
else => break,
}
}
Ok(())
}
/// Assemble and publish the provider card.
///
/// The card folds:
///
/// - the TDX quote (proves the running binary is
/// the one whose source is in this crate),
/// - the per-backend capabilities: enabled backends,
/// the union of regions they serve, the union of
/// their CPU/RAM maxes, whether any backend is
/// `tdx_capable`,
/// - real-time capacity telemetry (CPU millicores,
/// RAM MiB currently provisioned across all
/// backends),
/// - the zipnet return channel the provider will
/// listen on.
async fn register_provider_card(&self) -> anyhow::Result<ProviderId> {
let capabilities = self.fleet.capabilities().await?;
let _ = capabilities;
// TODO: construct ProviderCard folding TDX quote
// + per-backend capabilities + zipnet channel,
// sign it, commit via coalition_compute::register.
anyhow::bail!(
"Provider::register_provider_card is a prototype stub"
)
}
async fn refresh_provider_card(&self, id: ProviderId) -> anyhow::Result<()> {
// TODO: fetch telemetry from Fleet; publish
// updated ProviderCard.
let _ = id;
Ok(())
}
/// Handle one grant end to end.
async fn handle_grant(&self, grant: &ComputeGrant<'_>) -> anyhow::Result<()> {
tracing::info!(
request_id = ?grant.request_id,
image_hash = ?grant.image_hash,
valid_to = ?grant.valid_to,
"accepted grant",
);
// 1. Decrypt the zipnet envelope to learn the
// requester's peer_id and image payload.
let envelope = self.zipnet
.resolve(&grant.bearer_pointer)
.await
.context("resolving zipnet envelope")?;
// 2. Cross-check the envelope's declared image
// post-hash against the grant's committed
// image_hash. If the requester tried to smuggle
// a different image, reject the grant and commit
// an empty ComputeLog marking the grant invalid.
if envelope.image_hash() != grant.image_hash {
anyhow::bail!(
"envelope/grant image hash mismatch (this should be \
impossible under honest zipnet unseal)"
);
}
// 3. Route the grant to whichever backend can
// satisfy it.
let instance = self.fleet
.provision_for_grant(grant, &envelope)
.await
.context("provisioning workload")?;
tracing::info!(
request_id = ?grant.request_id,
backend = %instance.backend,
instance = %instance.instance_id,
"workload running",
);
// Dashboard: operator sees the backend and the
// aggregate count increment. No requester
// identity, no instance id beyond the backend
// label.
self.dashboard.record(DashboardEvent::GrantAccepted {
backend: instance.backend.to_string(),
}).await;
// 4. Build the SSH access receipt. Contains:
// - ssh host (the backend's public endpoint
// for the instance),
// - per-grant private key,
// - host key (for known_hosts),
// - the grant's valid_to tick so the requester
// can plan workload lifetime.
let receipt = SshAccessReceipt::build(
&instance,
grant,
)?;
// 5. Encrypt the receipt to the peer_id's
// x25519 public key (published in the zipnet
// envelope) and return via the zipnet reply
// stream.
let sealed = receipt.seal_to(envelope.peer_x25519_public())?;
self.zipnet.reply(&grant.request_id, sealed).await?;
// 6. Spawn a watcher that emits the ComputeLog
// when the instance terminates or the grant
// deadline passes, whichever first.
let fleet = self.fleet.clone();
let network = self.network.clone();
let request_id = grant.request_id;
let valid_to = grant.valid_to;
let instance_for_log = instance.clone();
let provider_id = instance.provider_id();
let backend_name = instance.backend.to_string();
let dashboard = self.dashboard.clone();
tokio::spawn(async move {
let usage: UsageMetrics = fleet
.watch_until_exit(&instance_for_log, valid_to)
.await
.unwrap_or_default();
let log = ComputeLog {
grant_id: request_id,
provider: provider_id,
window: UsageMetrics::window_for(&usage),
usage: usage.clone(),
evidence: None,
};
if let Err(err) = coalition_compute::append_log(&network, &log).await {
tracing::warn!(error = %err,
"failed to append ComputeLog");
}
// Dashboard: operator sees totals go up.
dashboard.record(DashboardEvent::GrantCompleted {
backend: backend_name,
usage: UsageDelta {
cpu_core_seconds: usage.cpu_core_seconds(),
ram_mib_seconds: usage.ram_mib_seconds(),
net_bytes: usage.net_bytes(),
},
}).await;
});
Ok(())
}
}
fn provider_id_hex(id: &ProviderId) -> String {
// Short stable rendering for logs. Real impl hashes
// the underlying UniqueId; prototype stub returns
// placeholder.
let _ = id;
"<provider-id>".into()
}
Up: compute-bridge.