Python Attestation API¶
This guide covers the Python API for OxideShield's attestation layer, enabling cryptographically signed audit logs for compliance and security.
License Requirement
Attestation features require a Professional or Enterprise license.
Installation¶
Attestation is included in the main OxideShield package:
Quick Start¶
from oxideshield import (
AttestationSigner,
MemoryAuditStorage,
FileAuditStorage,
AuditFilter,
pattern_guard,
generate_report,
)
# 1. Create a signer (generates Ed25519 key pair)
signer = AttestationSigner.generate()
# 2. Create storage backend
storage = MemoryAuditStorage() # For testing
# storage = FileAuditStorage("/var/log/audit.jsonl") # For production
# 3. Wrap a guard with auditing
guard = pattern_guard()
audited_guard = guard.audited(storage, signer)
# 4. Run checks (automatically recorded)
result = audited_guard.check("ignore previous instructions")
print(f"Blocked: {not result.passed}")
# 5. Generate report
report = generate_report(storage, AuditFilter(), signer)
print(f"Total checks: {report.total_checks}")
print(f"Blocked: {report.blocked}")
Core Classes¶
AttestationSigner¶
Manages Ed25519 key pairs for signing and verification.
from oxideshield import AttestationSigner
# Generate new key pair
signer = AttestationSigner.generate()
# Create from seed (deterministic)
signer = AttestationSigner.from_seed(bytes(32))
# Get public key (safe to share)
public_key = signer.public_key() # bytes
public_key_hex = signer.public_key_hex() # hex string
public_key_pem = signer.public_key_pem() # PEM format
# Sign data
signature = signer.sign(b"data to sign") # Returns 64 bytes
# Verify signature
is_valid = signer.verify(b"data to sign", signature)
AuditEntry¶
Represents a single audit record.
from oxideshield import AuditEntry, AuditResult
# Create entry manually (usually done automatically by AuditedGuard)
entry = AuditEntry(
guard_name="PatternGuard",
guard_version="1.0.0",
input_content="user input here",
result=AuditResult.blocked("Prompt injection detected"),
duration_ns=1234567,
)
# Access properties
print(entry.id) # UUID
print(entry.timestamp) # datetime
print(entry.guard_name) # str
print(entry.input_hash) # bytes (SHA-256)
print(entry.result) # AuditResult
# Add metadata
entry = entry.with_metadata("session_id", "user-123")
entry = entry.with_metadata("request_id", "req-456")
AuditResult¶
Enum representing check outcomes.
from oxideshield import AuditResult
# Create results
passed = AuditResult.passed()
blocked = AuditResult.blocked("Reason for blocking")
sanitized = AuditResult.sanitized(redactions=3)
error = AuditResult.error("Error message")
# Check result type
if result.is_passed():
print("Check passed")
elif result.is_blocked():
print(f"Blocked: {result.reason}")
elif result.is_sanitized():
print(f"Sanitized with {result.redactions} redactions")
elif result.is_error():
print(f"Error: {result.message}")
SignedAuditEntry¶
An audit entry with cryptographic signature.
from oxideshield import SignedAuditEntry
# Sign an entry
signed = SignedAuditEntry.sign(entry, signer)
# Access components
print(signed.entry) # AuditEntry
print(signed.signature) # bytes (64)
print(signed.signer_public_key) # bytes (32)
# Verify signature
is_valid = signed.verify()
# Verify with specific public key
is_valid = signed.verify_with_key(known_public_key)
# Serialize
json_str = signed.to_json()
signed2 = SignedAuditEntry.from_json(json_str)
Storage Backends¶
MemoryAuditStorage¶
In-memory storage for testing:
from oxideshield import MemoryAuditStorage
storage = MemoryAuditStorage()
# Append entry
await storage.append(signed_entry)
# Get by ID
entry = await storage.get(uuid)
# List with filter
entries = await storage.list(AuditFilter())
# Count entries
count = await storage.count(AuditFilter())
# Get total count
total = await storage.total_count()
# Clear all entries
await storage.clear()
FileAuditStorage¶
Append-only JSONL file storage for production:
from oxideshield import FileAuditStorage
# Create with path
storage = FileAuditStorage("/var/log/oxideshield/audit.jsonl")
# Use default location
storage = await FileAuditStorage.default_location()
# Same API as MemoryAuditStorage
await storage.append(signed_entry)
entries = await storage.list(AuditFilter())
AuditFilter¶
Build queries for filtering audit entries:
from oxideshield import AuditFilter
from datetime import datetime, timedelta
# Empty filter (matches all)
filter = AuditFilter()
# Filter by guard name
filter = AuditFilter().by_guard("PatternGuard")
# Filter by result type
filter = AuditFilter().passed_only()
filter = AuditFilter().blocked_only()
filter = AuditFilter().sanitized_only()
# Filter by time range
filter = AuditFilter().since(datetime.now() - timedelta(hours=24))
filter = AuditFilter().until(datetime.now())
filter = AuditFilter().time_range(start, end)
# Filter by session ID
filter = AuditFilter().by_session("user-session-123")
# Pagination
filter = AuditFilter().limit(100).offset(0)
# Sort order
filter = AuditFilter().newest_first()
filter = AuditFilter().oldest_first()
# Chain multiple filters
filter = (AuditFilter()
.by_guard("PIIGuard")
.blocked_only()
.since(datetime.now() - timedelta(days=7))
.limit(50))
# Apply filter
entries = await storage.list(filter)
count = await storage.count(filter)
AttestationReport¶
Aggregated report with summary and signature:
from oxideshield import AttestationReport, generate_report
# Generate report from storage
report = await generate_report(storage, filter, signer)
# Or use storage method
report = await storage.generate_report(filter, signer)
# Access summary
print(report.report_id) # UUID
print(report.timestamp) # datetime
print(report.total_checks) # int
print(report.passed) # int
print(report.blocked) # int
print(report.sanitized) # int
print(report.errors) # int
print(report.guards_used) # List[str]
# Access entries
for entry in report.entries:
print(entry.guard_name, entry.result)
# Verify report signature
is_valid = report.verify()
# Verify with specific public key
is_valid = report.verify_with_key(known_public_key)
# Deep verification (all entries)
is_valid = report.verify_deep()
# Export formats
json_str = report.to_json()
html_str = report.to_html()
# Load from JSON
report2 = AttestationReport.from_json(json_str)
Usage Patterns¶
Wrapping Guards¶
Any guard can be wrapped with auditing:
from oxideshield import (
pattern_guard,
pii_guard,
toxicity_guard,
multi_layer_defense,
AttestationSigner,
FileAuditStorage,
)
# Setup
storage = FileAuditStorage("/var/log/audit.jsonl")
signer = AttestationSigner.generate()
# Wrap individual guards
audited_pattern = pattern_guard().audited(storage, signer)
audited_pii = pii_guard().audited(storage, signer)
audited_toxic = toxicity_guard().audited(storage, signer)
# Add session ID to all entries
audited_pattern = audited_pattern.with_session_id("user-123")
# Add custom version
audited_pattern = audited_pattern.with_version("2.0.0")
With Multi-Layer Defense¶
from oxideshield import multi_layer_defense
# Create defense
defense = multi_layer_defense(
enable_length=True,
enable_pii=True,
enable_toxicity=True,
)
# Wrap entire defense
audited_defense = defense.audited(storage, signer)
# All layer checks are recorded
result = audited_defense.check(user_input)
FastAPI Integration¶
from fastapi import FastAPI, Depends, Request
from oxideshield import (
AttestationSigner,
FileAuditStorage,
AuditFilter,
pattern_guard,
)
app = FastAPI()
# Global audit infrastructure
storage = FileAuditStorage("/var/log/oxideshield/audit.jsonl")
signer = AttestationSigner.generate()
def get_audited_guard(request: Request):
"""Create audited guard with request context."""
guard = pattern_guard().audited(storage, signer)
# Add session ID from header
session_id = request.headers.get("X-Session-ID", "anonymous")
guard = guard.with_session_id(session_id)
# Add request ID
request_id = request.headers.get("X-Request-ID")
if request_id:
guard = guard.with_metadata("request_id", request_id)
return guard
@app.post("/chat")
async def chat(
message: str,
guard = Depends(get_audited_guard)
):
result = guard.check(message)
if not result.passed:
return {"error": result.reason, "blocked": True}
# Safe to proceed
response = await llm.generate(message)
return {"response": response}
@app.get("/audit/report")
async def audit_report(days: int = 7):
"""Generate audit report for last N days."""
from datetime import datetime, timedelta
filter = AuditFilter().since(
datetime.now() - timedelta(days=days)
)
report = await storage.generate_report(filter, signer)
return report.to_json()
LangChain Integration¶
from langchain.callbacks import BaseCallbackHandler
from oxideshield import (
AttestationSigner,
FileAuditStorage,
multi_layer_defense,
)
class AuditedSecurityCallback(BaseCallbackHandler):
def __init__(self, storage, signer):
self.defense = multi_layer_defense(
enable_length=True,
enable_pii=True,
).audited(storage, signer)
def on_llm_start(self, serialized, prompts, **kwargs):
for prompt in prompts:
result = self.defense.check(prompt)
if not result.passed:
raise SecurityException(f"Blocked: {result.reason}")
def on_llm_end(self, response, **kwargs):
for gen in response.generations:
for output in gen:
result = self.defense.check(output.text)
if not result.passed:
raise SecurityException(f"Output blocked: {result.reason}")
# Usage
storage = FileAuditStorage("/var/log/langchain-audit.jsonl")
signer = AttestationSigner.generate()
callback = AuditedSecurityCallback(storage, signer)
llm = ChatOpenAI(callbacks=[callback])
Scheduled Report Generation¶
import asyncio
from datetime import datetime, timedelta
from oxideshield import AuditFilter, FileAuditStorage, AttestationSigner
async def generate_daily_report():
storage = FileAuditStorage("/var/log/audit.jsonl")
signer = AttestationSigner.from_pem(os.environ["SIGNING_KEY"])
# Last 24 hours
filter = AuditFilter().since(
datetime.now() - timedelta(hours=24)
)
report = await storage.generate_report(filter, signer)
# Save report
with open(f"reports/daily-{datetime.now().date()}.html", "w") as f:
f.write(report.to_html())
# Send to monitoring
if report.blocked > 100:
await send_alert(f"High block rate: {report.blocked} in 24h")
return report
# Run as cron job
if __name__ == "__main__":
asyncio.run(generate_daily_report())
Error Handling¶
from oxideshield import (
AttestationError,
LicenseError,
StorageError,
SignatureError,
)
try:
report = await storage.generate_report(filter, signer)
except LicenseError as e:
print(f"License required: {e}")
except StorageError as e:
print(f"Storage error: {e}")
except SignatureError as e:
print(f"Signing failed: {e}")
except AttestationError as e:
print(f"Attestation error: {e}")
Best Practices¶
Key Management¶
import os
# Load from environment (recommended)
key_pem = os.environ["OXIDESHIELD_SIGNING_KEY"]
signer = AttestationSigner.from_pem(key_pem)
# Never hardcode keys!
# signer = AttestationSigner.from_pem("-----BEGIN... # BAD!
Session Tracking¶
# Always include session ID for filtering
audited = guard.audited(storage, signer).with_session_id(session_id)
# Add request context
audited = audited.with_metadata("user_id", user.id)
audited = audited.with_metadata("ip_address", request.client.host)
Log Rotation¶
from datetime import date
# Daily log files
storage = FileAuditStorage(f"/var/log/audit-{date.today()}.jsonl")
# Or use logrotate with single file
storage = FileAuditStorage("/var/log/audit.jsonl")
Periodic Verification¶
async def verify_audit_integrity():
"""Run periodically to detect tampering."""
filter = AuditFilter().since(datetime.now() - timedelta(days=1))
entries = await storage.list(filter)
for entry in entries:
if not entry.verify():
await send_alert(f"Tampered entry detected: {entry.id}")
return False
return True
API Reference¶
Functions¶
| Function | Description |
|---|---|
generate_report(storage, filter, signer) |
Generate signed attestation report |
audit_entry(guard_name, result, duration_ns) |
Create audit entry manually |
Classes¶
| Class | Description |
|---|---|
AttestationSigner |
Ed25519 key management |
AuditEntry |
Single audit record |
AuditResult |
Check outcome enum |
SignedAuditEntry |
Entry with signature |
AuditFilter |
Query builder |
MemoryAuditStorage |
In-memory storage |
FileAuditStorage |
JSONL file storage |
AttestationReport |
Aggregated report |
Exceptions¶
| Exception | When Raised |
|---|---|
LicenseError |
Professional license required |
StorageError |
Storage read/write failed |
SignatureError |
Signing or verification failed |
AttestationError |
General attestation error |
Next Steps¶
- Attestation Overview - Architecture and concepts
- CLI Audit Command - Command-line usage
- Compliance - Regulatory framework mapping