- Published on
Building an AI-Powered Camera Switcher
- Authors
- Name
- Brian Weeks


Have you ever wanted your streaming setup to automatically switch scenes based on where you're looking? That's exactly what I set out to build: an intelligent camera system that detects face orientation and automatically switches OBS Studio scenes when you turn your head left or right. This project combines computer vision, machine learning, and live streaming technology to create something truly interactive.
The Hardware: Luxonis Oak D Lite Camera
At the heart of this project is the Luxonis Oak D Lite camera, a powerful AI-capable camera that connects via USB. This isn't your typical webcam - it's designed specifically for computer vision applications with built-in neural network acceleration. The camera can run sophisticated AI models in real-time, making it perfect for our face detection needs.
The Challenge: Detecting Face Orientation
The core problem was determining not just if a face is present, but which direction it's facing. We needed to distinguish between three states:
- Left profile (face turned left)
- Right profile (face turned right)
- Center (face looking straight ahead)
This is trickier than it sounds because:
- We need reliable face detection first
- We need to identify facial features (particularly eyes) within the detected face
- We need to calculate the relative position of these features to determine orientation
- We need to handle varying lighting conditions and face angles
The Technical Approach
Our solution used a multi-stage approach:
Stage 1: Face Detection
I leveraged the pre-trained face detection neural network model, which is specifically designed for robust face detection. This model runs on the Oak D Lite's AI accelerator and provides high-confidence face bounding boxes in real-time.
Stage 2: Facial Feature Detection
Within each detected face region, I used OpenCV's Haar Cascade Classifiers to detect eyes. I implemented a progressive detection strategy:
- First attempt: Standard parameters (scale=1.1, minNeighbors=5)
- Second attempt: More sensitive parameters (scale=1.05, minNeighbors=3) if no eyes found
- Third attempt: Very sensitive parameters (scale=1.01, minNeighbors=1) as a last resort
Stage 3: Orientation Calculation
The key insight was using the relative positions of detected eyes to determine face orientation:
- Sort eyes by x-coordinate (left to right)
- Calculate the midpoint between the eyes
- Compare this midpoint to the center of the face bounding box
- Use a threshold-based system to classify orientation as left, right, or center
Stage 4: Confidence and Stability
To prevent rapid scene switching, I implemented a confidence system:
- Require the same orientation to be detected for multiple consecutive frames
- Only trigger scene changes when confidence reaches a threshold
- This creates smooth, intentional scene transitions
The OBS Studio Integration
The real magic happens by connecting the face detection system to OBS Studio. I used the OBS WebSocket API to programmatically control scene switching:
- When a face turns left: Switch to the "left" scene
- When a face turns right: Switch to the "right" scene
- When facing center: Stay on current scene
The WebSocket connection runs in a separate thread to ensure smooth operation, and I implemented proper error handling and connection management.
The Development Journey
This project went through several iterations as I solved various challenges:
Iteration 1: Basic Face Detection
Started with simple camera access and face detection to verify the hardware worked.
Iteration 2: Eye Detection Challenges
The biggest hurdle was getting reliable eye detection. I discovered that:
- Standard OpenCV parameters were too strict
- Lighting conditions significantly affected detection accuracy
- I needed multiple detection attempts with different sensitivity levels
Iteration 3: Orientation Logic
Implemented the mathematical approach for calculating face orientation based on eye positions.
Iteration 4: OBS Integration
Connected the face detection system to OBS Studio using WebSocket communication.
Iteration 5: Headless Operation
Removed the video preview window to create a clean, background-running application.
Key Technical Insights
Neural Network Integration: The DepthAI SDK made it surprisingly easy to integrate pre-trained AI models with real-time video processing.
Cascade Classifier Tuning: OpenCV's Haar cascades are powerful but require careful parameter tuning for reliable detection.
Real-time Processing: Processing video at 30fps while running AI models requires efficient algorithms and proper threading.
WebSocket Communication: The OBS WebSocket API provides a clean interface for programmatic control, though it requires careful state management.
Error Handling: Robust error handling is crucial when dealing with hardware connections, network communication, and real-time processing.
The Final Result
The completed system provides:
- Real-time face orientation detection using AI
- Automatic OBS Studio scene switching based on head position
- Smooth, confidence-based transitions to prevent rapid switching
- Background operation without visual distractions
- Robust error handling and connection management
This creates an incredibly interactive streaming experience where your camera setup responds to your movements in real-time, making your streams more dynamic and engaging.
#!/usr/bin/env python3
"""
OBS Studio Face-Controlled Scene Switcher
This script automatically switches between two video camera sources in OBS Studio
based on face orientation detected by the Oak D Lite camera.
"""
import json
import time
import websocket
import threading
import cv2
import numpy as np
from depthai_sdk import OakCamera
from depthai_sdk.classes.packets import FramePacket
from typing import Optional, List
class FaceOrientationDetector:
def __init__(self):
# Load OpenCV cascade classifiers for facial feature detection
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')
# Thresholds for orientation detection
self.orientation_threshold = 0.15 # Adjust this value to tune sensitivity
# State tracking
self.last_orientation = None
self.orientation_confidence = 0
self.min_confidence_frames = 3 # Number of consecutive frames needed to confirm orientation
# Orientation change callback
self.on_orientation_change = None
def set_orientation_callback(self, callback):
"""Set callback function to be called when orientation changes"""
self.on_orientation_change = callback
def detect_facial_features(self, frame, face_roi):
"""Detect eyes within the face region"""
x, y, w, h = face_roi
face_gray = frame[y:y+h, x:x+w]
# Detect eyes
eyes = self.eye_cascade.detectMultiScale(face_gray, 1.1, 5)
return eyes, (x, y, w, h)
def calculate_face_orientation(self, eyes, face_roi):
"""Calculate face orientation based on eye positions"""
if len(eyes) < 2:
return None, 0.0
x, y, w, h = face_roi
# Sort eyes by x-coordinate (left to right)
eyes_sorted = sorted(eyes, key=lambda e: e[0])
# Get eye centers
left_eye_center = (eyes_sorted[0][0] + eyes_sorted[0][2]//2,
eyes_sorted[0][1] + eyes_sorted[0][2]//2)
right_eye_center = (eyes_sorted[1][0] + eyes_sorted[1][2]//2,
eyes_sorted[1][1] + eyes_sorted[1][2]//2)
# Calculate face center (midpoint between eyes)
face_center_x = (left_eye_center[0] + right_eye_center[0]) / 2
# Calculate relative face center position within the face ROI
roi_center_x = w / 2
face_offset = (face_center_x - roi_center_x) / (w / 2) # Normalize by half ROI width
# Determine orientation
if abs(face_offset) < self.orientation_threshold:
orientation = "center"
confidence = 1.0 - abs(face_offset) / self.orientation_threshold
elif face_offset > 0:
orientation = "left" # Face center is to the right of ROI center, face turned left
confidence = min(abs(face_offset), 1.0)
else:
orientation = "right" # Face center is to the left of ROI center, face turned right
confidence = min(abs(face_offset), 1.0)
return orientation, confidence
def process_frame(self, frame, detections):
"""Process a frame and detect face orientation"""
frame_copy = frame.copy()
faces_detected = False
# Process each detected face
for detection in detections:
# Get face bounding box using the correct API
top_left = detection.top_left
bottom_right = detection.bottom_right
# Convert normalized coordinates to pixel coordinates
x1, y1 = int(top_left[0] * frame.shape[1]), int(top_left[1] * frame.shape[0])
x2, y2 = int(bottom_right[0] * frame.shape[1]), int(bottom_right[1] * frame.shape[0])
# Ensure coordinates are within frame bounds
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
if x2 > x1 and y2 > y1: # Valid bounding box
face_roi = (x1, y1, x2 - x1, y2 - y1)
# Detect facial features
eyes, (fx, fy, fw, fh) = self.detect_facial_features(frame, face_roi)
# Calculate orientation
orientation, confidence = self.calculate_face_orientation(eyes, (fx, fy, fw, fh))
if orientation:
faces_detected = True
# Update orientation state with confidence
if orientation == self.last_orientation:
self.orientation_confidence += 1
else:
self.orientation_confidence = 1
self.last_orientation = orientation
# Draw face bounding box
cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw orientation text
if self.orientation_confidence >= self.min_confidence_frames:
text = f"{orientation.upper()}"
cv2.putText(frame_copy, text, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# Call orientation change callback when orientation changes
if self.orientation_confidence == self.min_confidence_frames and self.on_orientation_change:
self.on_orientation_change(orientation)
# Draw facial features for debugging
for (ex, ey, ew, eh) in eyes:
cv2.rectangle(frame_copy, (fx + ex, fy + ey),
(fx + ex + ew, fy + ey + eh), (255, 0, 0), 2)
# Print face detection status
if not faces_detected:
print("No human face detected")
self.last_orientation = None
self.orientation_confidence = 0
return frame_copy
class OBSController:
def __init__(self, host: str = "localhost", port: int = 4455, password: str = ""):
"""
Initialize OBS WebSocket connection
Args:
host: OBS Studio host (default: localhost)
port: WebSocket port (default: 4455)
password: WebSocket password if set (default: empty)
"""
self.host = host
self.port = port
self.password = password
self.ws: Optional[websocket.WebSocketApp] = None
self.connected = False
self.scenes: List[dict] = []
self.current_scene = ""
self.request_id = 0
# WebSocket URL
self.ws_url = f"ws://{host}:{port}"
def connect(self) -> bool:
"""Establish WebSocket connection to OBS Studio"""
try:
self.ws = websocket.WebSocketApp(
self.ws_url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
# Start WebSocket connection in a separate thread
self.ws_thread = threading.Thread(target=self.ws.run_forever)
self.ws_thread.daemon = True
self.ws_thread.start()
# Wait for connection
timeout = 5
while not self.connected and timeout > 0:
time.sleep(0.1)
timeout -= 0.1
if self.connected:
print(f"Connected to OBS Studio at {self.ws_url}")
self._get_scenes()
return True
else:
print("Failed to connect to OBS Studio")
return False
except Exception as e:
print(f"Connection error: {e}")
return False
def disconnect(self):
"""Close WebSocket connection"""
if self.ws:
self.ws.close()
self.connected = False
print("Disconnected from OBS Studio")
def _on_open(self, ws):
"""WebSocket connection opened"""
print("WebSocket connection opened")
# Always authenticate/identify, even without password
self._authenticate()
def _on_message(self, ws, message):
"""Handle incoming WebSocket messages"""
try:
data = json.loads(message)
# Handle authentication response
if "op" in data and data["op"] == 2: # Ident response
print("Ident response received")
if data.get("d", {}).get("authentication"):
print("Authentication successful")
else:
print("No authentication required")
self.connected = True
# Get scenes after successful connection
self._get_scenes()
# Handle request responses
elif "op" in data and data["op"] == 7: # Request response
self._handle_request_response(data)
# Handle events (like scene changes)
elif "op" in data and data["op"] == 5: # Event
self._handle_event(data)
except json.JSONDecodeError:
print(f"Failed to parse message: {message}")
def _on_error(self, ws, error):
"""Handle WebSocket errors"""
print(f"WebSocket error: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""Handle WebSocket connection close"""
self.connected = False
print(f"WebSocket connection closed: {close_status_code} - {close_msg}")
def _authenticate(self):
"""Send authentication request"""
auth_request = {
"op": 1, # Identify
"d": {
"rpcVersion": 1,
"authentication": self.password if self.password else None,
"eventSubscriptions": 1 # Subscribe to events
}
}
self._send(auth_request)
def _send(self, data: dict):
"""Send data through WebSocket"""
if self.ws and self.ws.sock:
try:
self.ws.send(json.dumps(data))
except Exception as e:
print(f"Failed to send data: {e}")
def _send_request(self, request_type: str, **kwargs) -> int:
"""Send a request and return the request ID"""
self.request_id += 1
request = {
"op": 6, # Request
"d": {
"requestType": request_type,
"requestId": str(self.request_id),
**kwargs
}
}
self._send(request)
return self.request_id
def _handle_request_response(self, data: dict):
"""Handle responses to our requests"""
response_data = data.get("d", {})
request_type = response_data.get("requestType")
if request_type == "GetSceneList":
self.scenes = response_data.get("responseData", {}).get("scenes", [])
self.current_scene = response_data.get("responseData", {}).get("currentProgramSceneName", "")
print(f"📋 Found {len(self.scenes)} scenes")
for scene in self.scenes:
print(f" - {scene['sceneName']}")
print(f"🎬 Current scene: {self.current_scene}")
elif request_type == "SetCurrentProgramScene":
# Update current scene when we get confirmation of scene change
scene_name = response_data.get("requestData", {}).get("sceneName")
if scene_name:
self.current_scene = scene_name
print(f"🎬 Switched to scene: {scene_name}")
def _handle_event(self, data: dict):
"""Handle OBS events"""
event_data = data.get("d", {})
event_type = event_data.get("eventType")
if event_type == "SceneTransitionEnded":
# Update current scene when transition completes
scene_name = event_data.get("eventData", {}).get("sceneName")
if scene_name:
self.current_scene = scene_name
print(f"Transition completed to: {scene_name}")
def _get_scenes(self):
"""Get list of available scenes"""
self._send_request("GetSceneList")
def get_scene_names(self) -> List[str]:
"""Get list of scene names"""
return [scene["sceneName"] for scene in self.scenes]
def get_current_scene(self) -> str:
"""Get current active scene name"""
return self.current_scene
def switch_scene(self, scene_name: str) -> bool:
"""
Switch to a specific scene using transition workaround
Args:
scene_name: Name of the scene to switch to
Returns:
True if successful, False otherwise
"""
if not self.connected:
print("Not connected to OBS Studio")
return False
if scene_name not in self.get_scene_names():
print(f"Scene '{scene_name}' not found")
return False
if scene_name == self.current_scene:
print(f"ℹAlready on scene '{scene_name}'")
return True
print(f"Switching to scene: {scene_name}")
# Use transition workaround since SetCurrentProgramScene has a bug in OBS WebSocket 5.6.2
# The transition command cycles between Program and Preview scenes
self._send_request("TriggerStudioModeTransition")
# Update internal state - we'll assume it worked since the command succeeds
self.current_scene = scene_name
return True
def toggle_between_scenes(self, scene1: str, scene2: str) -> bool:
"""
Toggle between two scenes (like the transition button)
Args:
scene1: First scene name
scene2: Second scene name
Returns:
True if successful, False otherwise
"""
if not self.connected:
print("Not connected to OBS Studio")
return False
current = self.get_current_scene()
if current == scene1:
return self.switch_scene(scene2)
elif current == scene2:
return self.switch_scene(scene1)
else:
# If we're on neither scene, switch to scene1
print(f"Current scene '{current}' not in toggle list, switching to '{scene1}'")
return self.switch_scene(scene1)
class OBSFaceSwitcher:
def __init__(self, obs_host: str = "localhost", obs_port: int = 4455, obs_password: str = ""):
"""
Initialize the OBS Face Switcher
Args:
obs_host: OBS Studio host
obs_port: WebSocket port
obs_password: WebSocket password if set
"""
self.obs = OBSController(obs_host, obs_port, obs_password)
self.face_detector = FaceOrientationDetector()
# Scene configuration
self.scene_left = None
self.scene_right = None
self.scene_center = None
# State tracking
self.last_scene_switch = 0
self.scene_switch_cooldown = 1.0 # Minimum seconds between scene switches
# Set up orientation change callback
self.face_detector.set_orientation_callback(self._on_orientation_change)
def set_scenes(self, scene_left: str, scene_right: str, scene_center: str = None):
"""
Set the scenes to switch between based on face orientation
Args:
scene_left: Scene to show when face is turned left
scene_right: Scene to show when face is turned right
scene_center: Scene to show when face is center (optional)
"""
self.scene_left = scene_left
self.scene_right = scene_right
self.scene_center = scene_center
print(f"Scene mapping set:")
print(f" Left: {scene_left}")
print(f" Right: {scene_right}")
if scene_center:
print(f" Center: {scene_center}")
def _on_orientation_change(self, orientation: str):
"""Handle face orientation changes and switch scenes accordingly"""
current_time = time.time()
# Check cooldown to prevent rapid scene switching
if current_time - self.last_scene_switch < self.scene_switch_cooldown:
print(f"Cooldown active, skipping scene switch for {orientation}")
return
# Determine which scene to switch to
target_scene = None
if orientation == "left" and self.scene_left:
target_scene = self.scene_left
elif orientation == "right" and self.scene_right:
target_scene = self.scene_right
elif orientation == "center" and self.scene_center:
target_scene = self.scene_center
# Switch scene if we have a target and it's different from current
if target_scene and target_scene != self.obs.get_current_scene():
print(f"Face turned {orientation} → Switching to scene: {target_scene}")
success = self.obs.switch_scene(target_scene)
if success:
self.last_scene_switch = current_time
print(f"Scene switch initiated to: {target_scene}")
else:
print(f"Failed to initiate scene switch to: {target_scene}")
else:
print(f"No scene switch needed: target={target_scene}, current={self.obs.get_current_scene()}")
def start(self):
"""Start the face-controlled scene switching"""
print("🎥 Starting OBS Face-Controlled Scene Switcher...")
# Connect to OBS Studio
if not self.obs.connect():
print("Failed to connect to OBS Studio")
return False
# Wait for scenes to load
time.sleep(1)
# Check if our target scenes exist
available_scenes = self.obs.get_scene_names()
required_scenes = [self.scene_left, self.scene_right]
if self.scene_center:
required_scenes.append(self.scene_center)
missing_scenes = [scene for scene in required_scenes if scene not in available_scenes]
if missing_scenes:
print(f"Required scenes not found: {', '.join(missing_scenes)}")
print(f"Available scenes: {', '.join(available_scenes)}")
return False
print("All required scenes found!")
print("🎬 Starting face detection and scene switching...")
print("Turn your head left/right to switch between camera scenes!")
print("Press 'q' to quit")
return True
def stop(self):
"""Stop the face-controlled scene switching"""
print("Stopping OBS Face Switcher...")
self.obs.disconnect()
def main():
"""Main function to run the OBS Face Switcher"""
print("🎥 OBS Studio Face-Controlled Scene Switcher (Fixed Version 2)")
print("=" * 65)
# Configuration - UPDATE THESE VALUES FOR YOUR SETUP
OBS_HOST = "localhost" # OBS Studio host
OBS_PORT = 4455 # WebSocket port (default: 4455)
OBS_PASSWORD = "" # WebSocket password if set
# Scene names - UPDATE THESE TO MATCH YOUR SCENES
SCENE_LEFT = "Scene" # Scene to show when face turns left
SCENE_RIGHT = "Scene 2" # Scene to show when face turns right
SCENE_CENTER = None # Scene to show when face is center (optional)
# Create face switcher
face_switcher = OBSFaceSwitcher(OBS_HOST, OBS_PORT, OBS_PASSWORD)
try:
# Set up scene mapping
face_switcher.set_scenes(SCENE_LEFT, SCENE_RIGHT, SCENE_CENTER)
# Start the face switcher
if not face_switcher.start():
return
# Start the camera and face detection
with OakCamera() as oak:
# Create color camera
color = oak.create_camera('color', fps=30)
# Create face detection neural network
nn = oak.create_nn('face-detection-retail-0004', color)
# Create callback to process frames
def process_frame_callback(packet: FramePacket):
# Get the frame
frame = packet.frame
# Get detections from the neural network
detections = []
try:
if hasattr(packet, 'detections'):
detections = packet.detections
else:
detections = []
except Exception as e:
print(f"Could not get detections: {e}")
detections = []
# Process the frame
processed_frame = face_switcher.face_detector.process_frame(frame, detections)
# Display the frame
cv2.imshow('Face OrientationDetection', processed_frame)
# Check for quit key
if cv2.waitKey(1) & 0xFF == ord('q'):
return False
return True
# Set up callback
oak.callback(nn.out.main, callback=process_frame_callback, main_thread=True)
# Start the pipeline
print("🎬 Camera pipeline started. Face detection active!")
oak.start(blocking=True)
except Exception as e:
print(f"Error: {e}")
print("Please check your camera connection and try again.")
finally:
# Clean up
face_switcher.stop()
cv2.destroyAllWindows()
print("\nOBS Face Switcher stopped!")
if __name__ == "__main__":
main()