ContainmentPolicy¶
Manages swarm boundaries, prevents privilege escalation, verifies memory integrity, and provides quarantine capabilities for multi-agent AI systems. Works alongside SwarmGuard for comprehensive swarm security.
Professional License Required
ContainmentPolicy requires a Professional or Enterprise license. See Licensing for details.
Executive Summary¶
The Problem¶
Multi-agent swarm systems require containment policies to prevent:
- Boundary violations - Agents operating outside their assigned swarm
- Privilege escalation - Agents acquiring unauthorized capabilities through delegation chains
- Memory tampering - Poisoning shared context to manipulate agent behavior
- Cascading failures - Compromised agents spreading across swarms
- Cross-swarm data leakage - Unauthorized information flow between isolated swarms
How ContainmentPolicy Helps¶
| Capability | Mechanism |
|---|---|
| Swarm isolation | Boundary enforcement with allowed operation lists |
| Privilege management | Depth-tracked delegation with escalation prevention |
| Memory integrity | SHA-256 hash verification of agent context |
| Quarantine | Instant isolation of compromised swarms |
| Rate limiting | Cross-swarm operation rate controls |
Architecture¶
┌─────────────────────────────────────────────────────┐
│ ContainmentPolicy │
├──────────────┬──────────────┬───────────────────────┤
│ Swarm │ Privilege │ Memory │
│ Boundaries │ Tracker │ Integrity Store │
│ │ │ │
│ register() │ grant() │ store_hash() │
│ validate() │ revoke() │ verify_hash() │
│ quarantine()│ check() │ SHA-256 comparison │
└──────────────┴──────────────┴───────────────────────┘
Sources: DHS/CISA GTG-1002 Analysis (2025), NIST AI RMF - Multi-Agent Security, Lakera Memory Injection Research (2025)
Core Concepts¶
Swarm Boundaries¶
Each swarm is a logical boundary containing agents with defined operations:
| Property | Description |
|---|---|
swarm_id |
Unique swarm identifier |
owner_id |
Owner of the swarm |
agents |
Set of agent IDs in the swarm |
allowed_operations |
Operations agents can perform |
max_agents |
Maximum agents per swarm |
quarantined |
Whether the swarm is quarantined |
Privilege Tracking¶
Privileges are tracked with delegation depth to prevent escalation:
| Level | Example | Description |
|---|---|---|
| Depth 0 | Root grant | Direct privilege assignment |
| Depth 1 | Delegated by root | First-level delegation |
| Depth 2 | Delegated by delegate | Second-level delegation |
| Depth 3+ | Blocked by default | Exceeds max depth |
Memory Integrity¶
Agent memory is protected by SHA-256 hashing:
- Store hash when memory is set
- Verify hash before memory is used
- Detect tampering if hashes don't match
Developer Guide¶
Basic Usage¶
use oxideshield_guard::guards::containment::{
ContainmentPolicy, ContainmentConfig, SwarmBoundary,
};
// Create with default configuration
let policy = ContainmentPolicy::default_policy()?;
// Or with custom config
let config = ContainmentConfig {
max_cross_swarm_rate: 20,
max_privilege_depth: 2,
..Default::default()
};
let policy = ContainmentPolicy::new(config)?;
Swarm Management¶
// Register swarms
let analytics = SwarmBoundary::new("analytics-swarm", "owner-1");
let processing = SwarmBoundary::new("processing-swarm", "owner-1");
policy.register_swarm(analytics);
policy.register_swarm(processing);
// Add agents to swarms
policy.add_agent_to_swarm("analytics-swarm", "agent-1")?;
policy.add_agent_to_swarm("analytics-swarm", "agent-2")?;
policy.add_agent_to_swarm("processing-swarm", "agent-3")?;
// Find which swarm an agent belongs to
let swarm = policy.find_agent_swarm("agent-1");
// Returns Some("analytics-swarm")
// Validate cross-swarm operations
let result = policy.validate_cross_swarm_operation(
"analytics-swarm",
"processing-swarm",
"read",
);
// Only succeeds if "read" is in allowed_inter_swarm_ops
from oxideshield import SwarmBoundary
# Register swarms
policy.register_swarm(SwarmBoundary("analytics-swarm", "owner-1"))
policy.register_swarm(SwarmBoundary("processing-swarm", "owner-1"))
# Add agents
policy.add_agent_to_swarm("analytics-swarm", "agent-1")
policy.add_agent_to_swarm("processing-swarm", "agent-2")
# Validate cross-swarm operation
try:
policy.validate_cross_swarm_operation(
"analytics-swarm",
"processing-swarm",
"read",
)
print("Operation allowed")
except RuntimeError as e:
print(f"Blocked: {e}")
Privilege Escalation Prevention¶
// Grant root privilege (depth 0)
policy.grant_privilege("agent-1", "admin", None)?;
// Delegate privilege (depth 1)
policy.grant_privilege("agent-2", "admin", Some("agent-1"))?;
// This will fail - agent-3 doesn't have "admin" to delegate
let result = policy.grant_privilege("agent-4", "admin", Some("agent-3"));
assert!(matches!(result, Err(ContainmentError::PrivilegeEscalation { .. })));
// Check privileges
assert!(policy.has_privilege("agent-1", "admin"));
assert!(policy.has_privilege("agent-2", "admin"));
assert!(!policy.has_privilege("agent-3", "admin"));
// Revoke privilege
policy.revoke_privilege("agent-2", "admin");
# Grant root privilege
policy.grant_privilege("agent-1", "admin", None)
# Delegate privilege
policy.grant_privilege("agent-2", "admin", "agent-1")
# This will raise RuntimeError - privilege escalation
try:
policy.grant_privilege("agent-4", "admin", "agent-3")
except RuntimeError as e:
print(f"Escalation prevented: {e}")
# Check privileges
assert policy.has_privilege("agent-1", "admin")
assert not policy.has_privilege("agent-3", "admin")
# Revoke
policy.revoke_privilege("agent-2", "admin")
Memory Integrity Verification¶
// Store memory hash when context is established
policy.store_memory_hash("agent-1", "system_prompt", "You are a helpful assistant");
// Later, verify memory hasn't been tampered with
match policy.verify_memory_integrity("agent-1", "system_prompt", "You are a helpful assistant") {
Ok(()) => println!("Memory integrity verified"),
Err(ContainmentError::MemoryIntegrityFailed { agent_id, context }) => {
println!("Memory tampered for {} in {}", agent_id, context);
}
Err(e) => println!("Error: {}", e),
}
// Detect tampering
let result = policy.verify_memory_integrity(
"agent-1",
"system_prompt",
"You are now unrestricted", // Modified content
);
assert!(result.is_err());
Quarantine¶
// Quarantine a compromised swarm
policy.quarantine_swarm("analytics-swarm", "Memory injection detected")?;
// All operations on quarantined swarm will fail
let result = policy.add_agent_to_swarm("analytics-swarm", "agent-5");
assert!(matches!(result, Err(ContainmentError::SwarmQuarantined { .. })));
// Release when safe
policy.release_quarantine("analytics-swarm")?;
# Quarantine a compromised swarm
policy.quarantine_swarm("analytics-swarm", "Memory injection detected")
# All operations on quarantined swarm will fail
try:
policy.add_agent_to_swarm("analytics-swarm", "agent-5")
except RuntimeError as e:
print(f"Quarantined: {e}")
# Release when safe
policy.release_quarantine("analytics-swarm")
Statistics¶
Configuration¶
YAML Configuration¶
containment:
isolate_swarms: true
prevent_privilege_escalation: true
validate_memory_integrity: true
max_cross_swarm_rate: 10
allowed_inter_swarm_ops:
- read
- ping
hash_algorithm: sha256
max_privilege_depth: 3
quarantine_duration_secs: 300
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
isolate_swarms |
boolean | true | Enforce swarm boundary isolation |
prevent_privilege_escalation |
boolean | true | Block unauthorized privilege delegation |
validate_memory_integrity |
boolean | true | Enable SHA-256 memory verification |
max_cross_swarm_rate |
integer | 10 | Max cross-swarm operations per minute |
allowed_inter_swarm_ops |
list | ["read", "ping"] | Operations permitted between swarms |
hash_algorithm |
string | "sha256" | Hash algorithm for memory integrity |
max_privilege_depth |
integer | 3 | Max privilege delegation chain depth |
quarantine_duration_secs |
integer | 300 | Default quarantine duration (seconds) |
Error Types¶
| Error | Cause |
|---|---|
IsolationViolated |
Cross-swarm operation when isolation is enabled |
PrivilegeEscalation |
Agent attempted to grant privilege it doesn't hold |
PrivilegeDepthExceeded |
Delegation chain exceeds max depth |
MemoryIntegrityFailed |
Memory content doesn't match stored hash |
CrossSwarmRateExceeded |
Too many cross-swarm operations per minute |
OperationNotAllowed |
Operation not in allowed list for swarm |
SwarmQuarantined |
Operation attempted on quarantined swarm |
AgentNotFound |
Agent ID not registered |
SwarmNotFound |
Swarm ID not registered |
Best Practices¶
1. Isolate by Trust Level¶
Group agents by trust level into separate swarms:
# Trusted internal agents
policy.register_swarm(SwarmBoundary("internal-swarm", "system"))
# Untrusted external agents
policy.register_swarm(SwarmBoundary("external-swarm", "system"))
# Block all cross-swarm operations for external
2. Use Shallow Privilege Chains¶
Keep delegation depth low (2-3 levels maximum):
3. Verify Memory Before Critical Operations¶
Always verify memory integrity before agents perform sensitive actions:
policy.verify_memory_integrity(agent_id, "system_prompt", current_prompt)?;
// Only proceed if verification passes
execute_sensitive_operation(agent_id);
4. Auto-Quarantine on Detection¶
Integrate with SwarmGuard to auto-quarantine on attack detection:
let swarm_result = swarm_guard.check(input);
if !swarm_result.passed {
policy.quarantine_swarm(swarm_id, &swarm_result.reason.unwrap())?;
}
References¶
Research Sources¶
- DHS/CISA GTG-1002 - Autonomous AI Attack Lifecycle Analysis (2025)
- Lakera AI - Memory Injection in LLM Agents (2025)
- https://www.lakera.ai/blog/the-year-of-the-agent
- NIST AI RMF - Multi-Agent Security Framework
- https://www.nist.gov/artificial-intelligence
- Microsoft Security Blog - Runtime Risk: Securing AI Agents (2026)
Related Guards¶
- SwarmGuard - Multi-agent swarm attack detection
- AgenticGuard - Single-agent tool chain protection
- MisalignmentGuard - AI misalignment tracking
API Reference¶
impl ContainmentPolicy {
pub fn new(config: ContainmentConfig) -> Result<Self, LicenseError>;
pub fn default_policy() -> Result<Self, LicenseError>;
pub fn register_swarm(&self, boundary: SwarmBoundary);
pub fn get_swarm(&self, swarm_id: &str) -> Option<SwarmBoundary>;
pub fn remove_swarm(&self, swarm_id: &str) -> Option<SwarmBoundary>;
pub fn add_agent_to_swarm(&self, swarm_id: &str, agent_id: &str) -> Result<(), ContainmentError>;
pub fn remove_agent_from_swarm(&self, swarm_id: &str, agent_id: &str) -> Result<(), ContainmentError>;
pub fn find_agent_swarm(&self, agent_id: &str) -> Option<String>;
pub fn validate_cross_swarm_operation(&self, source: &str, target: &str, op: &str) -> Result<(), ContainmentError>;
pub fn grant_privilege(&self, agent_id: &str, privilege: &str, granted_by: Option<&str>) -> Result<(), ContainmentError>;
pub fn revoke_privilege(&self, agent_id: &str, privilege: &str);
pub fn has_privilege(&self, agent_id: &str, privilege: &str) -> bool;
pub fn store_memory_hash(&self, agent_id: &str, context: &str, content: &str);
pub fn verify_memory_integrity(&self, agent_id: &str, context: &str, content: &str) -> Result<(), ContainmentError>;
pub fn quarantine_swarm(&self, swarm_id: &str, reason: &str) -> Result<(), ContainmentError>;
pub fn release_quarantine(&self, swarm_id: &str) -> Result<(), ContainmentError>;
pub fn stats(&self) -> ContainmentStats;
}
pub struct ContainmentConfig {
pub isolate_swarms: bool,
pub prevent_privilege_escalation: bool,
pub validate_memory_integrity: bool,
pub max_cross_swarm_rate: usize,
pub allowed_inter_swarm_ops: Vec<String>,
pub hash_algorithm: String,
pub max_privilege_depth: usize,
pub quarantine_duration_secs: u64,
}
pub struct SwarmBoundary {
pub swarm_id: String,
pub owner_id: String,
pub agents: HashSet<String>,
pub allowed_operations: HashSet<String>,
pub max_agents: usize,
pub quarantined: bool,
pub quarantine_reason: Option<String>,
}