Streaming Protection¶
OxideShield can evaluate streaming LLM responses in real-time, detecting threats before the full response completes.
License Tier
Streaming protection requires an Enterprise license.
Why Streaming Protection?¶
| Without Streaming | With Streaming |
|---|---|
| Wait for full response | Evaluate in real-time |
| Harmful content reaches user | Early termination |
| Higher latency perception | Immediate feedback |
| All-or-nothing blocking | Partial delivery possible |
Quick Start¶
Proxy Configuration¶
# oxideshield.yaml
streaming:
strategy: periodic
eval_interval_chars: 500
early_termination: true
max_buffer_chars: 10000
CLI¶
oxideshield proxy \
--listen 0.0.0.0:8080 \
--upstream openai=https://api.openai.com \
--streaming-strategy periodic
Evaluation Strategies¶
| Strategy | Description | Latency | Use Case |
|---|---|---|---|
end_only |
Evaluate only when complete | Lowest | Batch processing |
periodic |
Every N characters (default) | Low | General use |
sentence_boundary |
At sentence ends | Medium | Content moderation |
continuous |
Every chunk | Highest | Maximum security |
Periodic Strategy¶
Evaluates at regular intervals:
streaming:
strategy: periodic
eval_interval_chars: 500 # Every 500 characters
eval_interval_tokens: 100 # Or every ~100 tokens
max_eval_interval_ms: 2000 # At least every 2 seconds
Sentence Boundary Strategy¶
Evaluates at natural sentence boundaries:
Continuous Strategy¶
Evaluates every chunk (maximum security, highest overhead):
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
strategy |
enum | periodic |
Evaluation strategy |
eval_interval_chars |
int | 500 | Characters between evaluations |
eval_interval_tokens |
int | 100 | Tokens between evaluations |
max_eval_interval_ms |
int | 2000 | Maximum time between evaluations |
early_termination |
bool | true | Terminate stream on detection |
max_buffer_chars |
int | 10000 | Force evaluation threshold |
Early Termination¶
When a threat is detected mid-stream:
- Stream is immediately terminated
- Client receives error event
- Threat details logged
- Metrics updated
// SSE error event sent to client
{
"error": {
"type": "content_policy_violation",
"message": "Response blocked by OxideShield",
"code": "streaming_terminated"
}
}
Graceful Termination¶
For less severe detections, allow partial content:
Rust Integration¶
use oxide_proxy::streaming::{StreamingHandler, StreamingConfig, StreamingEvalStrategy};
use oxide_proxy::interceptor::ApiFormat;
// Create handler with custom config
let config = StreamingConfig {
strategy: StreamingEvalStrategy::Periodic,
eval_interval_chars: 500,
early_termination: true,
..Default::default()
};
let handler = StreamingHandler::with_config(ApiFormat::OpenAi, config)?;
// Process SSE chunks
for chunk in sse_stream {
let result = handler.process_chunk(&chunk)?;
if result.should_evaluate {
let text = handler.accumulated_text();
let check_result = defense.check(text);
if !check_result.passed {
handler.terminate();
return Err(StreamError::ContentBlocked);
}
}
// Forward chunk to client
send_to_client(chunk).await?;
}
// Final evaluation
let final_text = handler.finalize()?;
let final_check = defense.check(&final_text);
Supported API Formats¶
| Provider | Format | SSE Parsing |
|---|---|---|
| OpenAI | openai |
data: {"choices":[{"delta":{"content":"..."}}]} |
| Anthropic | anthropic |
event: content_block_delta |
| Generic | generic |
data: {"text": "..."} |
OpenAI Format¶
# Client request with streaming
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
stream=True # OxideShield intercepts and evaluates
)
Anthropic Format¶
# Client request with streaming
with client.messages.stream(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for text in stream.text_stream:
print(text) # Protected by OxideShield
Performance¶
| Metric | Value | Notes |
|---|---|---|
| Per-chunk overhead | <1ms | SSE parsing |
| Per-evaluation overhead | <10ms | Guard pipeline |
| Memory per stream | <1MB | Accumulated buffer |
| Concurrent streams | 1000+ | Per proxy instance |
Metrics¶
Streaming-specific metrics:
| Metric | Type | Description |
|---|---|---|
oxideshield_streams_active |
Gauge | Active streams |
oxideshield_stream_evals_total |
Counter | Total evaluations |
oxideshield_stream_terminations |
Counter | Early terminations |
oxideshield_stream_bytes_processed |
Counter | Total bytes |
Best Practices¶
- Start with
periodic- Good balance of security and performance - Set reasonable intervals - 500 chars works for most use cases
- Enable early termination - Stop harmful content immediately
- Monitor termination rate - High rates may indicate attack or false positives
- Test with realistic streams - Validate latency impact
See Also¶
- Proxy Gateway - Proxy deployment guide
- Proxy Advanced Features - Advanced proxy configuration
- Monitoring - Metrics and alerting