Skip to content

Streaming Protection

OxideShield can evaluate streaming LLM responses in real-time, detecting threats before the full response completes.

License Tier

Streaming protection requires an Enterprise license.

Why Streaming Protection?

Without Streaming With Streaming
Wait for full response Evaluate in real-time
Harmful content reaches user Early termination
Higher latency perception Immediate feedback
All-or-nothing blocking Partial delivery possible

Quick Start

Proxy Configuration

# oxideshield.yaml
streaming:
  strategy: periodic
  eval_interval_chars: 500
  early_termination: true
  max_buffer_chars: 10000

CLI

oxideshield proxy \
  --listen 0.0.0.0:8080 \
  --upstream openai=https://api.openai.com \
  --streaming-strategy periodic

Evaluation Strategies

Strategy Description Latency Use Case
end_only Evaluate only when complete Lowest Batch processing
periodic Every N characters (default) Low General use
sentence_boundary At sentence ends Medium Content moderation
continuous Every chunk Highest Maximum security

Periodic Strategy

Evaluates at regular intervals:

streaming:
  strategy: periodic
  eval_interval_chars: 500    # Every 500 characters
  eval_interval_tokens: 100   # Or every ~100 tokens
  max_eval_interval_ms: 2000  # At least every 2 seconds

Sentence Boundary Strategy

Evaluates at natural sentence boundaries:

streaming:
  strategy: sentence_boundary
  max_buffer_chars: 5000  # Force eval if no sentence boundary

Continuous Strategy

Evaluates every chunk (maximum security, highest overhead):

streaming:
  strategy: continuous

Configuration Options

Option Type Default Description
strategy enum periodic Evaluation strategy
eval_interval_chars int 500 Characters between evaluations
eval_interval_tokens int 100 Tokens between evaluations
max_eval_interval_ms int 2000 Maximum time between evaluations
early_termination bool true Terminate stream on detection
max_buffer_chars int 10000 Force evaluation threshold

Early Termination

When a threat is detected mid-stream:

  1. Stream is immediately terminated
  2. Client receives error event
  3. Threat details logged
  4. Metrics updated
// SSE error event sent to client
{
  "error": {
    "type": "content_policy_violation",
    "message": "Response blocked by OxideShield",
    "code": "streaming_terminated"
  }
}

Graceful Termination

For less severe detections, allow partial content:

streaming:
  early_termination: false  # Log but don't terminate

Rust Integration

use oxide_proxy::streaming::{StreamingHandler, StreamingConfig, StreamingEvalStrategy};
use oxide_proxy::interceptor::ApiFormat;

// Create handler with custom config
let config = StreamingConfig {
    strategy: StreamingEvalStrategy::Periodic,
    eval_interval_chars: 500,
    early_termination: true,
    ..Default::default()
};

let handler = StreamingHandler::with_config(ApiFormat::OpenAi, config)?;

// Process SSE chunks
for chunk in sse_stream {
    let result = handler.process_chunk(&chunk)?;

    if result.should_evaluate {
        let text = handler.accumulated_text();
        let check_result = defense.check(text);

        if !check_result.passed {
            handler.terminate();
            return Err(StreamError::ContentBlocked);
        }
    }

    // Forward chunk to client
    send_to_client(chunk).await?;
}

// Final evaluation
let final_text = handler.finalize()?;
let final_check = defense.check(&final_text);

Supported API Formats

Provider Format SSE Parsing
OpenAI openai data: {"choices":[{"delta":{"content":"..."}}]}
Anthropic anthropic event: content_block_delta
Generic generic data: {"text": "..."}

OpenAI Format

# Client request with streaming
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True  # OxideShield intercepts and evaluates
)

Anthropic Format

# Client request with streaming
with client.messages.stream(
    model="claude-3-opus-20240229",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for text in stream.text_stream:
        print(text)  # Protected by OxideShield

Performance

Metric Value Notes
Per-chunk overhead <1ms SSE parsing
Per-evaluation overhead <10ms Guard pipeline
Memory per stream <1MB Accumulated buffer
Concurrent streams 1000+ Per proxy instance

Metrics

Streaming-specific metrics:

Metric Type Description
oxideshield_streams_active Gauge Active streams
oxideshield_stream_evals_total Counter Total evaluations
oxideshield_stream_terminations Counter Early terminations
oxideshield_stream_bytes_processed Counter Total bytes

Best Practices

  1. Start with periodic - Good balance of security and performance
  2. Set reasonable intervals - 500 chars works for most use cases
  3. Enable early termination - Stop harmful content immediately
  4. Monitor termination rate - High rates may indicate attack or false positives
  5. Test with realistic streams - Validate latency impact

See Also