Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

src/provider.rs

audience: ai

The Compute-module provider loop. Four responsibilities:

  1. Publish a provider card at boot and refresh it on capacity change. The card now folds the union of capabilities across every enabled backend.
  2. Subscribe to ComputeGrants addressed to this provider.
  3. For each grant: resolve the zipnet envelope, cross-check the declared image hash, route to the right backend via backends/mod.rs, seal an SSH receipt via receipt.rs, reply via zipnet_io.rs.
  4. Spawn a watcher that emits a ComputeLog when the instance exits or the grant’s valid_to passes.

The select! in run keeps one action per tick — either handle a grant or refresh the card — rather than racing both concurrently. Keeps accounting honest.

//! Compute-module provider loop.
//!
//! Responsibilities:
//!
//! 1. Publish a `ProviderCard` at boot, refresh it on
//!    capacity change.
//! 2. Subscribe to `ComputeGrant`s addressed to this
//!    provider.
//! 3. For each grant:
//!      a. Read the matching zipnet request envelope to
//!         obtain the requester's `peer_id` public key
//!         and declared image hash.
//!      b. Provision a workload on whichever backend —
//!         AWS, GCP, Azure, bare-metal — the fleet
//!         decides can satisfy it.
//!      c. Encrypt and return an SSH access receipt to
//!         the requester via zipnet.
//!      d. On workload completion or timeout, emit a
//!         `ComputeLog`.

use std::sync::Arc;

use anyhow::Context;
use coalition::ConfluenceConfig;
use coalition_compute::{
    ComputeGrant, ComputeLog, ProviderCard, ProviderId,
    UsageMetrics,
};
use mosaik::tee::tdx::Quote;
use mosaik::Network;

use crate::backends::Fleet;
use crate::config::ProviderBootConfig;
use crate::dashboard::{Dashboard, DashboardEvent, UsageDelta};
use crate::receipt::SshAccessReceipt;
use crate::zipnet_io::ZipnetChannel;

pub struct Provider<'cfg> {
    network:   Arc<Network>,
    compute:   &'cfg ConfluenceConfig<'cfg>,
    tdx_quote: Quote,
    zipnet:    ZipnetChannel,
    fleet:     Fleet,
    config:    ProviderBootConfig,
    dashboard: Arc<Dashboard>,
}

impl<'cfg> Provider<'cfg> {
    pub fn new(
        network: Arc<Network>,
        compute: &'cfg ConfluenceConfig<'cfg>,
        tdx_quote: Quote,
        zipnet: ZipnetChannel,
        fleet: Fleet,
        config: ProviderBootConfig,
        dashboard: Arc<Dashboard>,
    ) -> Self {
        Self { network, compute, tdx_quote, zipnet, fleet, config, dashboard }
    }

    /// Main loop. Runs forever until the process is
    /// terminated; retirement requires a clean operator
    /// stop which emits a `RetirementMarker` on the
    /// provider's own stream.
    pub async fn run(self) -> anyhow::Result<()> {
        let provider_id = self.register_provider_card().await?;

        tracing::info!(provider = %provider_id_hex(&provider_id),
            "provider card registered");

        let mut grants = coalition_compute::grants_for(
            &self.network, self.compute, provider_id,
        ).await?;

        // Capacity-refresh ticker runs alongside grant
        // handling. Using a select! keeps the code honest:
        // this provider does one thing per tick.
        let mut refresh = tokio::time::interval(
            std::time::Duration::from_secs(
                self.config.capacity_refresh_sec.max(1) as u64,
            ),
        );

        loop {
            tokio::select! {
                Some(grant) = grants.next() => {
                    if let Err(err) = self.handle_grant(&grant).await {
                        tracing::warn!(
                            request_id = ?grant.request_id,
                            error = %err,
                            "grant handling failed",
                        );
                    }
                }
                _ = refresh.tick() => {
                    if let Err(err) = self.refresh_provider_card(provider_id).await {
                        tracing::warn!(error = %err,
                            "provider card refresh failed");
                    }
                }
                else => break,
            }
        }

        Ok(())
    }

    /// Assemble and publish the provider card.
    ///
    /// The card folds:
    ///
    /// - the TDX quote (proves the running binary is
    ///   the one whose source is in this crate),
    /// - the per-backend capabilities: enabled backends,
    ///   the union of regions they serve, the union of
    ///   their CPU/RAM maxes, whether any backend is
    ///   `tdx_capable`,
    /// - real-time capacity telemetry (CPU millicores,
    ///   RAM MiB currently provisioned across all
    ///   backends),
    /// - the zipnet return channel the provider will
    ///   listen on.
    async fn register_provider_card(&self) -> anyhow::Result<ProviderId> {
        let capabilities = self.fleet.capabilities().await?;
        let _ = capabilities;
        // TODO: construct ProviderCard folding TDX quote
        // + per-backend capabilities + zipnet channel,
        // sign it, commit via coalition_compute::register.
        anyhow::bail!(
            "Provider::register_provider_card is a prototype stub"
        )
    }

    async fn refresh_provider_card(&self, id: ProviderId) -> anyhow::Result<()> {
        // TODO: fetch telemetry from Fleet; publish
        // updated ProviderCard.
        let _ = id;
        Ok(())
    }

    /// Handle one grant end to end.
    async fn handle_grant(&self, grant: &ComputeGrant<'_>) -> anyhow::Result<()> {
        tracing::info!(
            request_id = ?grant.request_id,
            image_hash = ?grant.image_hash,
            valid_to   = ?grant.valid_to,
            "accepted grant",
        );

        // 1. Decrypt the zipnet envelope to learn the
        //    requester's peer_id and image payload.
        let envelope = self.zipnet
            .resolve(&grant.bearer_pointer)
            .await
            .context("resolving zipnet envelope")?;

        // 2. Cross-check the envelope's declared image
        //    post-hash against the grant's committed
        //    image_hash. If the requester tried to smuggle
        //    a different image, reject the grant and commit
        //    an empty ComputeLog marking the grant invalid.
        if envelope.image_hash() != grant.image_hash {
            anyhow::bail!(
                "envelope/grant image hash mismatch (this should be \
                 impossible under honest zipnet unseal)"
            );
        }

        // 3. Route the grant to whichever backend can
        //    satisfy it.
        let instance = self.fleet
            .provision_for_grant(grant, &envelope)
            .await
            .context("provisioning workload")?;

        tracing::info!(
            request_id = ?grant.request_id,
            backend    = %instance.backend,
            instance   = %instance.instance_id,
            "workload running",
        );

        // Dashboard: operator sees the backend and the
        // aggregate count increment. No requester
        // identity, no instance id beyond the backend
        // label.
        self.dashboard.record(DashboardEvent::GrantAccepted {
            backend: instance.backend.to_string(),
        }).await;

        // 4. Build the SSH access receipt. Contains:
        //    - ssh host (the backend's public endpoint
        //      for the instance),
        //    - per-grant private key,
        //    - host key (for known_hosts),
        //    - the grant's valid_to tick so the requester
        //      can plan workload lifetime.
        let receipt = SshAccessReceipt::build(
            &instance,
            grant,
        )?;

        // 5. Encrypt the receipt to the peer_id's
        //    x25519 public key (published in the zipnet
        //    envelope) and return via the zipnet reply
        //    stream.
        let sealed = receipt.seal_to(envelope.peer_x25519_public())?;
        self.zipnet.reply(&grant.request_id, sealed).await?;

        // 6. Spawn a watcher that emits the ComputeLog
        //    when the instance terminates or the grant
        //    deadline passes, whichever first.
        let fleet = self.fleet.clone();
        let network = self.network.clone();
        let request_id = grant.request_id;
        let valid_to = grant.valid_to;
        let instance_for_log = instance.clone();
        let provider_id = instance.provider_id();
        let backend_name = instance.backend.to_string();
        let dashboard = self.dashboard.clone();
        tokio::spawn(async move {
            let usage: UsageMetrics = fleet
                .watch_until_exit(&instance_for_log, valid_to)
                .await
                .unwrap_or_default();

            let log = ComputeLog {
                grant_id: request_id,
                provider: provider_id,
                window:   UsageMetrics::window_for(&usage),
                usage:    usage.clone(),
                evidence: None,
            };
            if let Err(err) = coalition_compute::append_log(&network, &log).await {
                tracing::warn!(error = %err,
                    "failed to append ComputeLog");
            }

            // Dashboard: operator sees totals go up.
            dashboard.record(DashboardEvent::GrantCompleted {
                backend: backend_name,
                usage:   UsageDelta {
                    cpu_core_seconds: usage.cpu_core_seconds(),
                    ram_mib_seconds:  usage.ram_mib_seconds(),
                    net_bytes:        usage.net_bytes(),
                },
            }).await;
        });

        Ok(())
    }
}

fn provider_id_hex(id: &ProviderId) -> String {
    // Short stable rendering for logs. Real impl hashes
    // the underlying UniqueId; prototype stub returns
    // placeholder.
    let _ = id;
    "<provider-id>".into()
}

Up: compute-bridge.