Client Interfaces

StreamPoseML’s Integration Options

This guide helps you understand and choose between StreamPoseML’s client interfaces to integrate pose-based classification into your applications.

Choosing the Right Client Interface

StreamPoseML offers two main client interfaces, each designed for different use cases:

  • StreamPoseClient: A lightweight, self-contained client that processes video frames directly and handles the entire pipeline from pose detection to classification. Best for: - Desktop applications - Standalone Python applications - Projects where simplicity is more important than scalability - When you need to process raw video frames

  • MLFlowClient: A client that leverages MLflow for model management. Best for: - Web applications - Production systems requiring scalability - Projects needing model versioning and management - When you need standardized model deployment

StreamPoseClient: Simple and Direct

The StreamPoseClient provides a straightforward way to perform real-time pose classification directly from video frames or pre-extracted keypoints. It’s perfect for applications where you want to keep everything in Python and need a complete end-to-end solution.

StreamPoseClient workflow

StreamPoseClient handles the entire pipeline from frame capture to classification

Key Features:

  • Complete Pipeline: Handles all steps from pose detection through classification

  • Temporal Analysis: Maintains a window of frames for analyzing movements over time

  • Flexible Input: Works with raw video frames or pre-extracted keypoints

  • Real-time Focus: Optimized for low-latency classification

  • Self-contained: No external services or dependencies needed for operation

Basic Usage

from stream_pose_ml import StreamPoseClient
from stream_pose_ml.blaze_pose.mediapipe_client import MediaPipeClient
from stream_pose_ml.learning.trained_model import TrainedModel
from stream_pose_ml.learning.sequence_transformer import SequenceTransformer
import pickle

# 1. Load your trained model
model = TrainedModel()
with open('path/to/your/model.pkl', 'rb') as f:
    trained_classifier = pickle.load(f)
    model_data = pickle.load(f)

model.set_model(model=trained_classifier, model_data=model_data)

# 2. Initialize components
mpc = MediaPipeClient()  # MediaPipe client for pose detection
transformer = SequenceTransformer()  # Transforms pose data to features

# 3. Create StreamPoseClient
client = StreamPoseClient(
    frame_window=30,  # Number of frames to consider together (like 1 second of video)
    mediapipe_client_instance=mpc,
    trained_model=model,
    data_transformer=transformer
)

# 4. Process video frames
import cv2

# Option A: Process a single image
image = cv2.imread('path/to/image.jpg')
client.run_frame_pipeline(image)

# Option B: Process keypoints (if already extracted)
# client.run_keypoint_pipeline(keypoints_data)

# 5. Get the classification result
result = client.current_classification
print(f"Classification result: {result}")

# 6. For continuous video like webcam feed:
'''
cap = cv2.VideoCapture(0)  # Open webcam
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Process each frame
    client.run_frame_pipeline(frame)

    # Use the classification result when available
    if client.current_classification is not None:
        # Do something with the result
        print(f"Classification: {client.current_classification}")

    # Exit on key press
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
'''

Parameters Explained:

  • frame_window (int): The number of consecutive frames to analyze together. Important for movements that happen over time. If your video is 30 fps and you want to analyze 1 second of movement, use 30.

  • mediapipe_client_instance: An instance of MediaPipeClient that handles pose detection.

  • trained_model: Your previously trained model wrapped in a TrainedModel class.

  • data_transformer: Transforms raw pose data into the feature format your model expects.

Practical Example: Video Analysis Application

Here’s how to integrate StreamPoseClient into a video analysis application:

import cv2
import numpy as np
from stream_pose_ml import StreamPoseClient
from stream_pose_ml.blaze_pose.mediapipe_client import MediaPipeClient
from stream_pose_ml.learning.trained_model import TrainedModel

# Initialize your model and components first (as shown above)
# ...

# Define a function for processing video files
def analyze_video(video_path, output_path=None):
    cap = cv2.VideoCapture(video_path)

    # Set up video writer if saving output
    if output_path:
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Classification statistics
    frames = 0
    positive_frames = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process the frame
        client.run_frame_pipeline(frame)
        frames += 1

        # Visualize classification when available
        if client.current_classification is not None:
            # Type could be "correct form", "incorrect form", etc.
            # depending on what your model predicts
            if client.current_classification:
                label = "Correct Movement"
                color = (0, 255, 0)  # Green
                positive_frames += 1
            else:
                label = "Incorrect Movement"
                color = (0, 0, 255)  # Red

            # Display on frame
            cv2.putText(frame, label, (50, 50),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        # Write frame if saving output
        if output_path:
            out.write(frame)

    # Clean up
    cap.release()
    if output_path:
        out.release()

    return {
        "total_frames": frames,
        "positive_frames": positive_frames,
        "positive_percentage": (positive_frames / frames * 100) if frames > 0 else 0
    }

# Example usage
results = analyze_video("dance_video.mp4", "analyzed_video.mp4")
print(f"Analysis complete: {results['positive_percentage']:.2f}% correct movements")

MLFlowClient: Complex Model Deployment

The MLFlowClient is designed for systems where model management, versioning, and scalability are critical. It integrates with MLflow for robust model deployment and is particularly well-suited for web applications.

MLFlowClient architecture

MLFlowClient integrates with MLflow for scalable model serving

Key Features:

  • MLflow Integration: Connects directly to MLflow model serving endpoints

  • Optimized Data Flow: Designed for web applications receiving keypoint data

  • Frame Overlap: Supports smoother predictions with overlapping frame windows

  • Performance Tracking: Monitors prediction times and model performance metrics

  • Scalability: Works well in distributed architectures and cloud deployments

Basic Usage

import requests
from stream_pose_ml import MLFlowClient
from stream_pose_ml.blaze_pose.mediapipe_client import MediaPipeClient
from stream_pose_ml.learning.sequence_transformer import SequenceTransformer

# 1. Initialize dependencies
mpc = MediaPipeClient(dummy_client=True)  # dummy_client=True if not processing raw frames
transformer = SequenceTransformer()  # Or specialized MLFlowTransformer if needed

# 2. Define a prediction function that interfaces with MLflow
def mlflow_predict(json_data_payload):
    # Send request to MLflow serving endpoint
    response = requests.post(
        "http://mlflow:5002/invocations",  # Your MLflow server endpoint
        json={"inputs": json_data_payload},
        headers={"Content-Type": "application/json"}
    )
    return response.json()

# 3. Create client with frame overlap for smoother predictions
client = MLFlowClient(
    mediapipe_client_instance=mpc,
    data_transformer=transformer,
    predict_fn=mlflow_predict,  # Your custom prediction function
    input_example={"columns": ["angle_left_elbow", "angle_right_knee"]},  # Match your model's expected input
    frame_window=30,  # Total frames to consider
    frame_overlap=5   # Process new predictions every (frame_window - frame_overlap) frames
)

# 4. Process keypoints (typically from a web client)
keypoints_data = {"joint_positions": {...}}  # Received from frontend or other source
client.run_keypoint_pipeline(keypoints_data)

# 5. Get classification result
result = client.current_classification
processing_time = client.prediction_processing_time  # Performance monitoring

print(f"Classification: {result}, Processing time: {processing_time}ms")

Parameters Explained:

  • mediapipe_client_instance: MediaPipeClient instance (often with dummy_client=True for web deployments)

  • data_transformer: Transforms pose data into the format your MLflow model expects

  • predict_fn: Custom function that sends data to MLflow and returns predictions

  • input_example: Example of the input format your model expects

  • frame_window: Number of frames to consider in each analysis window

  • frame_overlap: Number of frames that overlap between consecutive analysis windows

Web Application Integration

Here’s a complete example of integrating MLFlowClient in a Flask web application with WebSockets for real-time communication:

from flask import Flask, request, jsonify
from flask_socketio import SocketIO, emit
import requests
from stream_pose_ml import MLFlowClient
from stream_pose_ml.blaze_pose.mediapipe_client import MediaPipeClient
from stream_pose_ml.learning.sequence_transformer import SequenceTransformer

# Initialize Flask and SocketIO
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")

# Global client instance
stream_pose_client = None

# MLflow prediction function
def mlflow_predict(json_data):
    """Send prediction requests to MLflow serving endpoint"""
    try:
        response = requests.post(
            "http://mlflow:5002/invocations",
            json={"inputs": json_data},
            headers={"Content-Type": "application/json"}
        )
        return response.json()
    except Exception as e:
        app.logger.error(f"Prediction error: {str(e)}")
        return None

# API endpoint to set model parameters
@app.route("/api/model/setup", methods=["POST"])
def setup_model():
    """Configure the MLFlowClient with model parameters"""
    global stream_pose_client

    data = request.json
    frame_window = data.get("frame_window", 30)
    frame_overlap = data.get("frame_overlap", 5)
    input_example = data.get("input_example", {"columns": []})

    try:
        # Initialize components
        mpc = MediaPipeClient(dummy_client=True)  # No raw frame processing
        transformer = SequenceTransformer()

        # Create MLFlowClient
        stream_pose_client = MLFlowClient(
            mediapipe_client_instance=mpc,
            data_transformer=transformer,
            predict_fn=mlflow_predict,
            input_example=input_example,
            frame_window=frame_window,
            frame_overlap=frame_overlap,
        )

        return jsonify({"status": "success", "message": "Model configured"})

    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500

# WebSocket endpoint for real-time keypoint processing
@socketio.on("keypoints")
def handle_keypoints(payload):
    """Process incoming keypoint data and return classification"""
    global stream_pose_client

    if stream_pose_client is None:
        emit("frame_result", {"error": "No model configured"})
        return

    try:
        # Process keypoints with MLFlowClient
        results = stream_pose_client.run_keypoint_pipeline(payload)

        # Return classification results if available
        if results and stream_pose_client.current_classification is not None:
            classification = stream_pose_client.current_classification
            predict_speed = stream_pose_client.prediction_processing_time

            # Send results back to client
            emit("frame_result", {
                "classification": classification,
                "prediction_time": predict_speed,
                "confidence": stream_pose_client.current_confidence
                           if hasattr(stream_pose_client, "current_confidence") else None
            })
        else:
            emit("frame_result", {"status": "processing"})

    except Exception as e:
        app.logger.error(f"Error processing keypoints: {str(e)}")
        emit("frame_result", {"error": str(e)})

if __name__ == "__main__":
    socketio.run(app, host="0.0.0.0", port=5000, debug=True)

TrainedModel: Your Machine Learning Container

The TrainedModel class is a convenient container for your trained machine learning models. It provides a standard interface regardless of the underlying model type and handles integration with data transformers.

Key Features:

  • Model Encapsulation: Neatly packages your trained model and associated data

  • Consistent Interface: Provides a standardized predict() method regardless of model type

  • Data Transformer Integration: Works seamlessly with StreamPoseML’s transformers

  • Metadata Support: Stores additional information about the model

Basic Usage

from stream_pose_ml.learning.trained_model import TrainedModel
import pickle
import joblib

# Create a TrainedModel instance
model = TrainedModel()

# Load a model from pickle file
with open('dance_classifier.pkl', 'rb') as f:
    trained_classifier = pickle.load(f)  # The actual model (e.g., RandomForest, XGBoost)
    model_data = pickle.load(f)  # Additional data like test features

# Set the model and associated data
model.set_model(
    model=trained_classifier,  # Your trained sklearn/xgboost/etc. model
    model_data={               # Additional data needed for predictions
        "X_test": model_data["X_test"],  # Feature columns from test data
        "feature_names": model_data.get("feature_names", []),
    },
    notes="Dance movement classifier trained on 500 examples" # Optional documentation
)

# Optional: Connect a data transformer for preprocessing
from stream_pose_ml.learning.sequence_transformer import SequenceTransformer
transformer = SequenceTransformer()
model.set_data_transformer(transformer)

# Make predictions directly
import numpy as np
test_features = np.array([[0.5, 0.3, 0.2, ...]])  # Your feature vector
predictions = model.predict(data=test_features)

print(f"Prediction: {predictions[0]}")

When to use TrainedModel:

  • When integrating with StreamPoseClient or MLFlowClient

  • To standardize prediction interfaces across different model types

  • To package models for deployment in the StreamPoseML ecosystem