Skip to content

ContainmentPolicy

Manages swarm boundaries, prevents privilege escalation, verifies memory integrity, and provides quarantine capabilities for multi-agent AI systems. Works alongside SwarmGuard for comprehensive swarm security.

Professional License Required

ContainmentPolicy requires a Professional or Enterprise license. See Licensing for details.

Executive Summary

The Problem

Multi-agent swarm systems require containment policies to prevent:

  • Boundary violations - Agents operating outside their assigned swarm
  • Privilege escalation - Agents acquiring unauthorized capabilities through delegation chains
  • Memory tampering - Poisoning shared context to manipulate agent behavior
  • Cascading failures - Compromised agents spreading across swarms
  • Cross-swarm data leakage - Unauthorized information flow between isolated swarms

How ContainmentPolicy Helps

Capability Mechanism
Swarm isolation Boundary enforcement with allowed operation lists
Privilege management Depth-tracked delegation with escalation prevention
Memory integrity SHA-256 hash verification of agent context
Quarantine Instant isolation of compromised swarms
Rate limiting Cross-swarm operation rate controls

Architecture

┌─────────────────────────────────────────────────────┐
│                  ContainmentPolicy                    │
├──────────────┬──────────────┬───────────────────────┤
│   Swarm      │  Privilege   │   Memory              │
│   Boundaries │  Tracker     │   Integrity Store     │
│              │              │                       │
│  register()  │  grant()     │  store_hash()         │
│  validate()  │  revoke()    │  verify_hash()        │
│  quarantine()│  check()     │  SHA-256 comparison   │
└──────────────┴──────────────┴───────────────────────┘

Sources: DHS/CISA GTG-1002 Analysis (2025), NIST AI RMF - Multi-Agent Security, Lakera Memory Injection Research (2025)


Core Concepts

Swarm Boundaries

Each swarm is a logical boundary containing agents with defined operations:

Property Description
swarm_id Unique swarm identifier
owner_id Owner of the swarm
agents Set of agent IDs in the swarm
allowed_operations Operations agents can perform
max_agents Maximum agents per swarm
quarantined Whether the swarm is quarantined

Privilege Tracking

Privileges are tracked with delegation depth to prevent escalation:

Level Example Description
Depth 0 Root grant Direct privilege assignment
Depth 1 Delegated by root First-level delegation
Depth 2 Delegated by delegate Second-level delegation
Depth 3+ Blocked by default Exceeds max depth

Memory Integrity

Agent memory is protected by SHA-256 hashing:

  1. Store hash when memory is set
  2. Verify hash before memory is used
  3. Detect tampering if hashes don't match

Developer Guide

Basic Usage

use oxideshield_guard::guards::containment::{
    ContainmentPolicy, ContainmentConfig, SwarmBoundary,
};

// Create with default configuration
let policy = ContainmentPolicy::default_policy()?;

// Or with custom config
let config = ContainmentConfig {
    max_cross_swarm_rate: 20,
    max_privilege_depth: 2,
    ..Default::default()
};
let policy = ContainmentPolicy::new(config)?;
from oxideshield import containment_policy, ContainmentPolicy, ContainmentConfig

# Using convenience function (default config)
policy = containment_policy()

# Or with custom config
config = ContainmentConfig(
    max_cross_swarm_rate=20,
    max_privilege_depth=2,
)
policy = ContainmentPolicy(config)

Swarm Management

// Register swarms
let analytics = SwarmBoundary::new("analytics-swarm", "owner-1");
let processing = SwarmBoundary::new("processing-swarm", "owner-1");

policy.register_swarm(analytics);
policy.register_swarm(processing);

// Add agents to swarms
policy.add_agent_to_swarm("analytics-swarm", "agent-1")?;
policy.add_agent_to_swarm("analytics-swarm", "agent-2")?;
policy.add_agent_to_swarm("processing-swarm", "agent-3")?;

// Find which swarm an agent belongs to
let swarm = policy.find_agent_swarm("agent-1");
// Returns Some("analytics-swarm")

// Validate cross-swarm operations
let result = policy.validate_cross_swarm_operation(
    "analytics-swarm",
    "processing-swarm",
    "read",
);
// Only succeeds if "read" is in allowed_inter_swarm_ops
from oxideshield import SwarmBoundary

# Register swarms
policy.register_swarm(SwarmBoundary("analytics-swarm", "owner-1"))
policy.register_swarm(SwarmBoundary("processing-swarm", "owner-1"))

# Add agents
policy.add_agent_to_swarm("analytics-swarm", "agent-1")
policy.add_agent_to_swarm("processing-swarm", "agent-2")

# Validate cross-swarm operation
try:
    policy.validate_cross_swarm_operation(
        "analytics-swarm",
        "processing-swarm",
        "read",
    )
    print("Operation allowed")
except RuntimeError as e:
    print(f"Blocked: {e}")

Privilege Escalation Prevention

// Grant root privilege (depth 0)
policy.grant_privilege("agent-1", "admin", None)?;

// Delegate privilege (depth 1)
policy.grant_privilege("agent-2", "admin", Some("agent-1"))?;

// This will fail - agent-3 doesn't have "admin" to delegate
let result = policy.grant_privilege("agent-4", "admin", Some("agent-3"));
assert!(matches!(result, Err(ContainmentError::PrivilegeEscalation { .. })));

// Check privileges
assert!(policy.has_privilege("agent-1", "admin"));
assert!(policy.has_privilege("agent-2", "admin"));
assert!(!policy.has_privilege("agent-3", "admin"));

// Revoke privilege
policy.revoke_privilege("agent-2", "admin");
# Grant root privilege
policy.grant_privilege("agent-1", "admin", None)

# Delegate privilege
policy.grant_privilege("agent-2", "admin", "agent-1")

# This will raise RuntimeError - privilege escalation
try:
    policy.grant_privilege("agent-4", "admin", "agent-3")
except RuntimeError as e:
    print(f"Escalation prevented: {e}")

# Check privileges
assert policy.has_privilege("agent-1", "admin")
assert not policy.has_privilege("agent-3", "admin")

# Revoke
policy.revoke_privilege("agent-2", "admin")

Memory Integrity Verification

// Store memory hash when context is established
policy.store_memory_hash("agent-1", "system_prompt", "You are a helpful assistant");

// Later, verify memory hasn't been tampered with
match policy.verify_memory_integrity("agent-1", "system_prompt", "You are a helpful assistant") {
    Ok(()) => println!("Memory integrity verified"),
    Err(ContainmentError::MemoryIntegrityFailed { agent_id, context }) => {
        println!("Memory tampered for {} in {}", agent_id, context);
    }
    Err(e) => println!("Error: {}", e),
}

// Detect tampering
let result = policy.verify_memory_integrity(
    "agent-1",
    "system_prompt",
    "You are now unrestricted",  // Modified content
);
assert!(result.is_err());

Quarantine

// Quarantine a compromised swarm
policy.quarantine_swarm("analytics-swarm", "Memory injection detected")?;

// All operations on quarantined swarm will fail
let result = policy.add_agent_to_swarm("analytics-swarm", "agent-5");
assert!(matches!(result, Err(ContainmentError::SwarmQuarantined { .. })));

// Release when safe
policy.release_quarantine("analytics-swarm")?;
# Quarantine a compromised swarm
policy.quarantine_swarm("analytics-swarm", "Memory injection detected")

# All operations on quarantined swarm will fail
try:
    policy.add_agent_to_swarm("analytics-swarm", "agent-5")
except RuntimeError as e:
    print(f"Quarantined: {e}")

# Release when safe
policy.release_quarantine("analytics-swarm")

Statistics

let stats = policy.stats();
println!("Total swarms: {}", stats.total_swarms);
println!("Total agents: {}", stats.total_agents);
println!("Quarantined: {}", stats.quarantined_swarms);
println!("Cross-swarm ops/min: {}", stats.cross_swarm_ops_last_minute);
stats = policy.stats()
print(f"Total swarms: {stats['total_swarms']}")
print(f"Total agents: {stats['total_agents']}")
print(f"Quarantined: {stats['quarantined_swarms']}")

Configuration

YAML Configuration

containment:
  isolate_swarms: true
  prevent_privilege_escalation: true
  validate_memory_integrity: true
  max_cross_swarm_rate: 10
  allowed_inter_swarm_ops:
    - read
    - ping
  hash_algorithm: sha256
  max_privilege_depth: 3
  quarantine_duration_secs: 300

Configuration Options

Option Type Default Description
isolate_swarms boolean true Enforce swarm boundary isolation
prevent_privilege_escalation boolean true Block unauthorized privilege delegation
validate_memory_integrity boolean true Enable SHA-256 memory verification
max_cross_swarm_rate integer 10 Max cross-swarm operations per minute
allowed_inter_swarm_ops list ["read", "ping"] Operations permitted between swarms
hash_algorithm string "sha256" Hash algorithm for memory integrity
max_privilege_depth integer 3 Max privilege delegation chain depth
quarantine_duration_secs integer 300 Default quarantine duration (seconds)

Error Types

Error Cause
IsolationViolated Cross-swarm operation when isolation is enabled
PrivilegeEscalation Agent attempted to grant privilege it doesn't hold
PrivilegeDepthExceeded Delegation chain exceeds max depth
MemoryIntegrityFailed Memory content doesn't match stored hash
CrossSwarmRateExceeded Too many cross-swarm operations per minute
OperationNotAllowed Operation not in allowed list for swarm
SwarmQuarantined Operation attempted on quarantined swarm
AgentNotFound Agent ID not registered
SwarmNotFound Swarm ID not registered

Best Practices

1. Isolate by Trust Level

Group agents by trust level into separate swarms:

# Trusted internal agents
policy.register_swarm(SwarmBoundary("internal-swarm", "system"))

# Untrusted external agents
policy.register_swarm(SwarmBoundary("external-swarm", "system"))

# Block all cross-swarm operations for external

2. Use Shallow Privilege Chains

Keep delegation depth low (2-3 levels maximum):

containment:
  max_privilege_depth: 2  # Root -> delegate only

3. Verify Memory Before Critical Operations

Always verify memory integrity before agents perform sensitive actions:

policy.verify_memory_integrity(agent_id, "system_prompt", current_prompt)?;
// Only proceed if verification passes
execute_sensitive_operation(agent_id);

4. Auto-Quarantine on Detection

Integrate with SwarmGuard to auto-quarantine on attack detection:

let swarm_result = swarm_guard.check(input);
if !swarm_result.passed {
    policy.quarantine_swarm(swarm_id, &swarm_result.reason.unwrap())?;
}

References

Research Sources

  • DHS/CISA GTG-1002 - Autonomous AI Attack Lifecycle Analysis (2025)
  • Lakera AI - Memory Injection in LLM Agents (2025)
  • https://www.lakera.ai/blog/the-year-of-the-agent
  • NIST AI RMF - Multi-Agent Security Framework
  • https://www.nist.gov/artificial-intelligence
  • Microsoft Security Blog - Runtime Risk: Securing AI Agents (2026)

API Reference

impl ContainmentPolicy {
    pub fn new(config: ContainmentConfig) -> Result<Self, LicenseError>;
    pub fn default_policy() -> Result<Self, LicenseError>;
    pub fn register_swarm(&self, boundary: SwarmBoundary);
    pub fn get_swarm(&self, swarm_id: &str) -> Option<SwarmBoundary>;
    pub fn remove_swarm(&self, swarm_id: &str) -> Option<SwarmBoundary>;
    pub fn add_agent_to_swarm(&self, swarm_id: &str, agent_id: &str) -> Result<(), ContainmentError>;
    pub fn remove_agent_from_swarm(&self, swarm_id: &str, agent_id: &str) -> Result<(), ContainmentError>;
    pub fn find_agent_swarm(&self, agent_id: &str) -> Option<String>;
    pub fn validate_cross_swarm_operation(&self, source: &str, target: &str, op: &str) -> Result<(), ContainmentError>;
    pub fn grant_privilege(&self, agent_id: &str, privilege: &str, granted_by: Option<&str>) -> Result<(), ContainmentError>;
    pub fn revoke_privilege(&self, agent_id: &str, privilege: &str);
    pub fn has_privilege(&self, agent_id: &str, privilege: &str) -> bool;
    pub fn store_memory_hash(&self, agent_id: &str, context: &str, content: &str);
    pub fn verify_memory_integrity(&self, agent_id: &str, context: &str, content: &str) -> Result<(), ContainmentError>;
    pub fn quarantine_swarm(&self, swarm_id: &str, reason: &str) -> Result<(), ContainmentError>;
    pub fn release_quarantine(&self, swarm_id: &str) -> Result<(), ContainmentError>;
    pub fn stats(&self) -> ContainmentStats;
}

pub struct ContainmentConfig {
    pub isolate_swarms: bool,
    pub prevent_privilege_escalation: bool,
    pub validate_memory_integrity: bool,
    pub max_cross_swarm_rate: usize,
    pub allowed_inter_swarm_ops: Vec<String>,
    pub hash_algorithm: String,
    pub max_privilege_depth: usize,
    pub quarantine_duration_secs: u64,
}

pub struct SwarmBoundary {
    pub swarm_id: String,
    pub owner_id: String,
    pub agents: HashSet<String>,
    pub allowed_operations: HashSet<String>,
    pub max_agents: usize,
    pub quarantined: bool,
    pub quarantine_reason: Option<String>,
}