Published on

Building an AI-Powered Camera Switcher

Authors
  • avatar
    Name
    Brian Weeks
    Twitter
Face Detection Camera Switch

Have you ever wanted your streaming setup to automatically switch scenes based on where you're looking? That's exactly what I set out to build: an intelligent camera system that detects face orientation and automatically switches OBS Studio scenes when you turn your head left or right. This project combines computer vision, machine learning, and live streaming technology to create something truly interactive.

The Hardware: Luxonis Oak D Lite Camera

At the heart of this project is the Luxonis Oak D Lite camera, a powerful AI-capable camera that connects via USB. This isn't your typical webcam - it's designed specifically for computer vision applications with built-in neural network acceleration. The camera can run sophisticated AI models in real-time, making it perfect for our face detection needs.

The Challenge: Detecting Face Orientation

The core problem was determining not just if a face is present, but which direction it's facing. We needed to distinguish between three states:

  • Left profile (face turned left)
  • Right profile (face turned right)
  • Center (face looking straight ahead)

This is trickier than it sounds because:

  1. We need reliable face detection first
  2. We need to identify facial features (particularly eyes) within the detected face
  3. We need to calculate the relative position of these features to determine orientation
  4. We need to handle varying lighting conditions and face angles

The Technical Approach

Our solution used a multi-stage approach:

Stage 1: Face Detection

I leveraged the pre-trained face detection neural network model, which is specifically designed for robust face detection. This model runs on the Oak D Lite's AI accelerator and provides high-confidence face bounding boxes in real-time.

Stage 2: Facial Feature Detection

Within each detected face region, I used OpenCV's Haar Cascade Classifiers to detect eyes. I implemented a progressive detection strategy:

  • First attempt: Standard parameters (scale=1.1, minNeighbors=5)
  • Second attempt: More sensitive parameters (scale=1.05, minNeighbors=3) if no eyes found
  • Third attempt: Very sensitive parameters (scale=1.01, minNeighbors=1) as a last resort

Stage 3: Orientation Calculation

The key insight was using the relative positions of detected eyes to determine face orientation:

  • Sort eyes by x-coordinate (left to right)
  • Calculate the midpoint between the eyes
  • Compare this midpoint to the center of the face bounding box
  • Use a threshold-based system to classify orientation as left, right, or center

Stage 4: Confidence and Stability

To prevent rapid scene switching, I implemented a confidence system:

  • Require the same orientation to be detected for multiple consecutive frames
  • Only trigger scene changes when confidence reaches a threshold
  • This creates smooth, intentional scene transitions

The OBS Studio Integration

The real magic happens by connecting the face detection system to OBS Studio. I used the OBS WebSocket API to programmatically control scene switching:

  • When a face turns left: Switch to the "left" scene
  • When a face turns right: Switch to the "right" scene
  • When facing center: Stay on current scene

The WebSocket connection runs in a separate thread to ensure smooth operation, and I implemented proper error handling and connection management.

The Development Journey

This project went through several iterations as I solved various challenges:

Iteration 1: Basic Face Detection

Started with simple camera access and face detection to verify the hardware worked.

Iteration 2: Eye Detection Challenges

The biggest hurdle was getting reliable eye detection. I discovered that:

  • Standard OpenCV parameters were too strict
  • Lighting conditions significantly affected detection accuracy
  • I needed multiple detection attempts with different sensitivity levels

Iteration 3: Orientation Logic

Implemented the mathematical approach for calculating face orientation based on eye positions.

Iteration 4: OBS Integration

Connected the face detection system to OBS Studio using WebSocket communication.

Iteration 5: Headless Operation

Removed the video preview window to create a clean, background-running application.

Key Technical Insights

  1. Neural Network Integration: The DepthAI SDK made it surprisingly easy to integrate pre-trained AI models with real-time video processing.

  2. Cascade Classifier Tuning: OpenCV's Haar cascades are powerful but require careful parameter tuning for reliable detection.

  3. Real-time Processing: Processing video at 30fps while running AI models requires efficient algorithms and proper threading.

  4. WebSocket Communication: The OBS WebSocket API provides a clean interface for programmatic control, though it requires careful state management.

  5. Error Handling: Robust error handling is crucial when dealing with hardware connections, network communication, and real-time processing.

The Final Result

The completed system provides:

  • Real-time face orientation detection using AI
  • Automatic OBS Studio scene switching based on head position
  • Smooth, confidence-based transitions to prevent rapid switching
  • Background operation without visual distractions
  • Robust error handling and connection management

This creates an incredibly interactive streaming experience where your camera setup responds to your movements in real-time, making your streams more dynamic and engaging.

#!/usr/bin/env python3
"""
OBS Studio Face-Controlled Scene Switcher
This script automatically switches between two video camera sources in OBS Studio
based on face orientation detected by the Oak D Lite camera.
"""

import json
import time
import websocket
import threading
import cv2
import numpy as np
from depthai_sdk import OakCamera
from depthai_sdk.classes.packets import FramePacket
from typing import Optional, List

class FaceOrientationDetector:
    def __init__(self):
        # Load OpenCV cascade classifiers for facial feature detection
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')
        
        # Thresholds for orientation detection
        self.orientation_threshold = 0.15  # Adjust this value to tune sensitivity
        
        # State tracking
        self.last_orientation = None
        self.orientation_confidence = 0
        self.min_confidence_frames = 3  # Number of consecutive frames needed to confirm orientation
        
        # Orientation change callback
        self.on_orientation_change = None
        
    def set_orientation_callback(self, callback):
        """Set callback function to be called when orientation changes"""
        self.on_orientation_change = callback
        
    def detect_facial_features(self, frame, face_roi):
        """Detect eyes within the face region"""
        x, y, w, h = face_roi
        face_gray = frame[y:y+h, x:x+w]
        
        # Detect eyes
        eyes = self.eye_cascade.detectMultiScale(face_gray, 1.1, 5)
        
        return eyes, (x, y, w, h)
    
    def calculate_face_orientation(self, eyes, face_roi):
        """Calculate face orientation based on eye positions"""
        if len(eyes) < 2:
            return None, 0.0
        
        x, y, w, h = face_roi
        
        # Sort eyes by x-coordinate (left to right)
        eyes_sorted = sorted(eyes, key=lambda e: e[0])
        
        # Get eye centers
        left_eye_center = (eyes_sorted[0][0] + eyes_sorted[0][2]//2, 
                          eyes_sorted[0][1] + eyes_sorted[0][2]//2)
        right_eye_center = (eyes_sorted[1][0] + eyes_sorted[1][2]//2, 
                           eyes_sorted[1][1] + eyes_sorted[1][2]//2)
        
        # Calculate face center (midpoint between eyes)
        face_center_x = (left_eye_center[0] + right_eye_center[0]) / 2
        
        # Calculate relative face center position within the face ROI
        roi_center_x = w / 2
        face_offset = (face_center_x - roi_center_x) / (w / 2)  # Normalize by half ROI width
        
        # Determine orientation
        if abs(face_offset) < self.orientation_threshold:
            orientation = "center"
            confidence = 1.0 - abs(face_offset) / self.orientation_threshold
        elif face_offset > 0:
            orientation = "left"  # Face center is to the right of ROI center, face turned left
            confidence = min(abs(face_offset), 1.0)
        else:
            orientation = "right"  # Face center is to the left of ROI center, face turned right
            confidence = min(abs(face_offset), 1.0)
        
        return orientation, confidence
    
    def process_frame(self, frame, detections):
        """Process a frame and detect face orientation"""
        frame_copy = frame.copy()
        faces_detected = False
        
        # Process each detected face
        for detection in detections:
            # Get face bounding box using the correct API
            top_left = detection.top_left
            bottom_right = detection.bottom_right
            
            # Convert normalized coordinates to pixel coordinates
            x1, y1 = int(top_left[0] * frame.shape[1]), int(top_left[1] * frame.shape[0])
            x2, y2 = int(bottom_right[0] * frame.shape[1]), int(bottom_right[1] * frame.shape[0])
            
            # Ensure coordinates are within frame bounds
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
            
            if x2 > x1 and y2 > y1:  # Valid bounding box
                face_roi = (x1, y1, x2 - x1, y2 - y1)
                
                # Detect facial features
                eyes, (fx, fy, fw, fh) = self.detect_facial_features(frame, face_roi)
                
                # Calculate orientation
                orientation, confidence = self.calculate_face_orientation(eyes, (fx, fy, fw, fh))
                
                if orientation:
                    faces_detected = True
                    
                    # Update orientation state with confidence
                    if orientation == self.last_orientation:
                        self.orientation_confidence += 1
                    else:
                        self.orientation_confidence = 1
                        self.last_orientation = orientation
                    
                    # Draw face bounding box
                    cv2.rectangle(frame_copy, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    
                    # Draw orientation text
                    if self.orientation_confidence >= self.min_confidence_frames:
                        text = f"{orientation.upper()}"
                        cv2.putText(frame_copy, text, (x1, y1-10), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                        
                        # Call orientation change callback when orientation changes
                        if self.orientation_confidence == self.min_confidence_frames and self.on_orientation_change:
                            self.on_orientation_change(orientation)
                    
                    # Draw facial features for debugging
                    for (ex, ey, ew, eh) in eyes:
                        cv2.rectangle(frame_copy, (fx + ex, fy + ey), 
                                    (fx + ex + ew, fy + ey + eh), (255, 0, 0), 2)
        
        # Print face detection status
        if not faces_detected:
            print("No human face detected")
            self.last_orientation = None
            self.orientation_confidence = 0
        
        return frame_copy

class OBSController:
    def __init__(self, host: str = "localhost", port: int = 4455, password: str = ""):
        """
        Initialize OBS WebSocket connection
        
        Args:
            host: OBS Studio host (default: localhost)
            port: WebSocket port (default: 4455)
            password: WebSocket password if set (default: empty)
        """
        self.host = host
        self.port = port
        self.password = password
        self.ws: Optional[websocket.WebSocketApp] = None
        self.connected = False
        self.scenes: List[dict] = []
        self.current_scene = ""
        self.request_id = 0
        
        # WebSocket URL
        self.ws_url = f"ws://{host}:{port}"
        
    def connect(self) -> bool:
        """Establish WebSocket connection to OBS Studio"""
        try:
            self.ws = websocket.WebSocketApp(
                self.ws_url,
                on_open=self._on_open,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close
            )
            
            # Start WebSocket connection in a separate thread
            self.ws_thread = threading.Thread(target=self.ws.run_forever)
            self.ws_thread.daemon = True
            self.ws_thread.start()
            
            # Wait for connection
            timeout = 5
            while not self.connected and timeout > 0:
                time.sleep(0.1)
                timeout -= 0.1
                
            if self.connected:
                print(f"Connected to OBS Studio at {self.ws_url}")
                self._get_scenes()
                return True
            else:
                print("Failed to connect to OBS Studio")
                return False
                
        except Exception as e:
            print(f"Connection error: {e}")
            return False
    
    def disconnect(self):
        """Close WebSocket connection"""
        if self.ws:
            self.ws.close()
        self.connected = False
        print("Disconnected from OBS Studio")
    
    def _on_open(self, ws):
        """WebSocket connection opened"""
        print("WebSocket connection opened")
        # Always authenticate/identify, even without password
        self._authenticate()
    
    def _on_message(self, ws, message):
        """Handle incoming WebSocket messages"""
        try:
            data = json.loads(message)
            
            # Handle authentication response
            if "op" in data and data["op"] == 2:  # Ident response
                print("Ident response received")
                if data.get("d", {}).get("authentication"):
                    print("Authentication successful")
                else:
                    print("No authentication required")
                self.connected = True
                # Get scenes after successful connection
                self._get_scenes()
                    
            # Handle request responses
            elif "op" in data and data["op"] == 7:  # Request response
                self._handle_request_response(data)
                
            # Handle events (like scene changes)
            elif "op" in data and data["op"] == 5:  # Event
                self._handle_event(data)
                
        except json.JSONDecodeError:
            print(f"Failed to parse message: {message}")
    
    def _on_error(self, ws, error):
        """Handle WebSocket errors"""
        print(f"WebSocket error: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        """Handle WebSocket connection close"""
        self.connected = False
        print(f"WebSocket connection closed: {close_status_code} - {close_msg}")
    
    def _authenticate(self):
        """Send authentication request"""
        auth_request = {
            "op": 1,  # Identify
            "d": {
                "rpcVersion": 1,
                "authentication": self.password if self.password else None,
                "eventSubscriptions": 1  # Subscribe to events
            }
        }
        self._send(auth_request)
    
    def _send(self, data: dict):
        """Send data through WebSocket"""
        if self.ws and self.ws.sock:
            try:
                self.ws.send(json.dumps(data))
            except Exception as e:
                print(f"Failed to send data: {e}")
    
    def _send_request(self, request_type: str, **kwargs) -> int:
        """Send a request and return the request ID"""
        self.request_id += 1
        request = {
            "op": 6,  # Request
            "d": {
                "requestType": request_type,
                "requestId": str(self.request_id),
                **kwargs
            }
        }
        self._send(request)
        return self.request_id
    
    def _handle_request_response(self, data: dict):
        """Handle responses to our requests"""
        response_data = data.get("d", {})
        request_type = response_data.get("requestType")
        
        if request_type == "GetSceneList":
            self.scenes = response_data.get("responseData", {}).get("scenes", [])
            self.current_scene = response_data.get("responseData", {}).get("currentProgramSceneName", "")
            print(f"📋 Found {len(self.scenes)} scenes")
            for scene in self.scenes:
                print(f"   - {scene['sceneName']}")
            print(f"🎬 Current scene: {self.current_scene}")
            
        elif request_type == "SetCurrentProgramScene":
            # Update current scene when we get confirmation of scene change
            scene_name = response_data.get("requestData", {}).get("sceneName")
            if scene_name:
                self.current_scene = scene_name
                print(f"🎬 Switched to scene: {scene_name}")
    
    def _handle_event(self, data: dict):
        """Handle OBS events"""
        event_data = data.get("d", {})
        event_type = event_data.get("eventType")
        
        if event_type == "SceneTransitionEnded":
            # Update current scene when transition completes
            scene_name = event_data.get("eventData", {}).get("sceneName")
            if scene_name:
                self.current_scene = scene_name
                print(f"Transition completed to: {scene_name}")
    
    def _get_scenes(self):
        """Get list of available scenes"""
        self._send_request("GetSceneList")
    
    def get_scene_names(self) -> List[str]:
        """Get list of scene names"""
        return [scene["sceneName"] for scene in self.scenes]
    
    def get_current_scene(self) -> str:
        """Get current active scene name"""
        return self.current_scene
    
    def switch_scene(self, scene_name: str) -> bool:
        """
        Switch to a specific scene using transition workaround
        
        Args:
            scene_name: Name of the scene to switch to
            
        Returns:
            True if successful, False otherwise
        """
        if not self.connected:
            print("Not connected to OBS Studio")
            return False
        
        if scene_name not in self.get_scene_names():
            print(f"Scene '{scene_name}' not found")
            return False
        
        if scene_name == self.current_scene:
            print(f"ℹAlready on scene '{scene_name}'")
            return True
        
        print(f"Switching to scene: {scene_name}")
        
        # Use transition workaround since SetCurrentProgramScene has a bug in OBS WebSocket 5.6.2
        # The transition command cycles between Program and Preview scenes
        self._send_request("TriggerStudioModeTransition")
        
        # Update internal state - we'll assume it worked since the command succeeds
        self.current_scene = scene_name
        
        return True
    
    def toggle_between_scenes(self, scene1: str, scene2: str) -> bool:
        """
        Toggle between two scenes (like the transition button)
        
        Args:
            scene1: First scene name
            scene2: Second scene name
            
        Returns:
            True if successful, False otherwise
        """
        if not self.connected:
            print("Not connected to OBS Studio")
            return False
        
        current = self.get_current_scene()
        
        if current == scene1:
            return self.switch_scene(scene2)
        elif current == scene2:
            return self.switch_scene(scene1)
        else:
            # If we're on neither scene, switch to scene1
            print(f"Current scene '{current}' not in toggle list, switching to '{scene1}'")
            return self.switch_scene(scene1)

class OBSFaceSwitcher:
    def __init__(self, obs_host: str = "localhost", obs_port: int = 4455, obs_password: str = ""):
        """
        Initialize the OBS Face Switcher
        
        Args:
            obs_host: OBS Studio host
            obs_port: WebSocket port
            obs_password: WebSocket password if set
        """
        self.obs = OBSController(obs_host, obs_port, obs_password)
        self.face_detector = FaceOrientationDetector()
        
        # Scene configuration
        self.scene_left = None
        self.scene_right = None
        self.scene_center = None
        
        # State tracking
        self.last_scene_switch = 0
        self.scene_switch_cooldown = 1.0  # Minimum seconds between scene switches
        
        # Set up orientation change callback
        self.face_detector.set_orientation_callback(self._on_orientation_change)
        
    def set_scenes(self, scene_left: str, scene_right: str, scene_center: str = None):
        """
        Set the scenes to switch between based on face orientation
        
        Args:
            scene_left: Scene to show when face is turned left
            scene_right: Scene to show when face is turned right
            scene_center: Scene to show when face is center (optional)
        """
        self.scene_left = scene_left
        self.scene_right = scene_right
        self.scene_center = scene_center
        print(f"Scene mapping set:")
        print(f"   Left: {scene_left}")
        print(f"   Right: {scene_right}")
        if scene_center:
            print(f"   Center: {scene_center}")
    
    def _on_orientation_change(self, orientation: str):
        """Handle face orientation changes and switch scenes accordingly"""
        current_time = time.time()
        
        # Check cooldown to prevent rapid scene switching
        if current_time - self.last_scene_switch < self.scene_switch_cooldown:
            print(f"Cooldown active, skipping scene switch for {orientation}")
            return
        
        # Determine which scene to switch to
        target_scene = None
        if orientation == "left" and self.scene_left:
            target_scene = self.scene_left
        elif orientation == "right" and self.scene_right:
            target_scene = self.scene_right
        elif orientation == "center" and self.scene_center:
            target_scene = self.scene_center
        
        # Switch scene if we have a target and it's different from current
        if target_scene and target_scene != self.obs.get_current_scene():
            print(f"Face turned {orientation} → Switching to scene: {target_scene}")
            success = self.obs.switch_scene(target_scene)
            if success:
                self.last_scene_switch = current_time
                print(f"Scene switch initiated to: {target_scene}")
            else:
                print(f"Failed to initiate scene switch to: {target_scene}")
        else:
            print(f"No scene switch needed: target={target_scene}, current={self.obs.get_current_scene()}")
    
    def start(self):
        """Start the face-controlled scene switching"""
        print("🎥 Starting OBS Face-Controlled Scene Switcher...")
        
        # Connect to OBS Studio
        if not self.obs.connect():
            print("Failed to connect to OBS Studio")
            return False
        
        # Wait for scenes to load
        time.sleep(1)
        
        # Check if our target scenes exist
        available_scenes = self.obs.get_scene_names()
        required_scenes = [self.scene_left, self.scene_right]
        if self.scene_center:
            required_scenes.append(self.scene_center)
        
        missing_scenes = [scene for scene in required_scenes if scene not in available_scenes]
        if missing_scenes:
            print(f"Required scenes not found: {', '.join(missing_scenes)}")
            print(f"Available scenes: {', '.join(available_scenes)}")
            return False
        
        print("All required scenes found!")
        print("🎬 Starting face detection and scene switching...")
        print("Turn your head left/right to switch between camera scenes!")
        print("Press 'q' to quit")
        
        return True
    
    def stop(self):
        """Stop the face-controlled scene switching"""
        print("Stopping OBS Face Switcher...")
        self.obs.disconnect()

def main():
    """Main function to run the OBS Face Switcher"""
    print("🎥 OBS Studio Face-Controlled Scene Switcher (Fixed Version 2)")
    print("=" * 65)
    
    # Configuration - UPDATE THESE VALUES FOR YOUR SETUP
    OBS_HOST = "localhost"      # OBS Studio host
    OBS_PORT = 4455            # WebSocket port (default: 4455)
    OBS_PASSWORD = ""          # WebSocket password if set
    
    # Scene names - UPDATE THESE TO MATCH YOUR SCENES
    SCENE_LEFT = "Scene"        # Scene to show when face turns left
    SCENE_RIGHT = "Scene 2"     # Scene to show when face turns right
    SCENE_CENTER = None         # Scene to show when face is center (optional)
    
    # Create face switcher
    face_switcher = OBSFaceSwitcher(OBS_HOST, OBS_PORT, OBS_PASSWORD)
    
    try:
        # Set up scene mapping
        face_switcher.set_scenes(SCENE_LEFT, SCENE_RIGHT, SCENE_CENTER)
        
        # Start the face switcher
        if not face_switcher.start():
            return
        
        # Start the camera and face detection
        with OakCamera() as oak:
            # Create color camera
            color = oak.create_camera('color', fps=30)
            
            # Create face detection neural network
            nn = oak.create_nn('face-detection-retail-0004', color)
            
            # Create callback to process frames
            def process_frame_callback(packet: FramePacket):
                # Get the frame
                frame = packet.frame
                
                # Get detections from the neural network
                detections = []
                try:
                    if hasattr(packet, 'detections'):
                        detections = packet.detections
                    else:
                        detections = []
                except Exception as e:
                    print(f"Could not get detections: {e}")
                    detections = []
                
                # Process the frame
                processed_frame = face_switcher.face_detector.process_frame(frame, detections)
                
                # Display the frame
                cv2.imshow('Face OrientationDetection', processed_frame)
                
                # Check for quit key
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    return False
                
                return True
            
            # Set up callback
            oak.callback(nn.out.main, callback=process_frame_callback, main_thread=True)
            
            # Start the pipeline
            print("🎬 Camera pipeline started. Face detection active!")
            oak.start(blocking=True)
            
    except Exception as e:
        print(f"Error: {e}")
        print("Please check your camera connection and try again.")
    
    finally:
        # Clean up
        face_switcher.stop()
        cv2.destroyAllWindows()
        print("\nOBS Face Switcher stopped!")

if __name__ == "__main__":
    main()