Skip to main content
Back to Blog
MLOpsMachine LearningPythonDockerCI/CDFastAPIMLflowModel DeploymentDevOpsProduction ML

Practical MLOps: Taking a Model from Jupyter Notebook to Production API

Learn how to operationalize machine learning models end-to-end — from messy Jupyter notebooks to versioned, monitored, production-grade REST APIs. Covers model versioning, CI/CD pipelines, containerized deployment, and observability with real-world code examples.

April 23, 202616 min readNiraj Kumar

So you've trained a model that hits 94% accuracy on your validation set. Your Jupyter notebook is clean (well, clean-ish), the loss curves look great, and your manager is excited. Now comes the question nobody warned you about in that Coursera course:

"When can we put it in production?"

This is where MLOps enters the picture — and where most ML projects quietly die. In this guide, we'll walk through the entire journey: from a notebook-bound model to a versioned, containerized, CI/CD-deployed, and monitored production API. No hand-waving. Real code. Real decisions.


What Is MLOps (And Why Should You Care)?

MLOps — short for Machine Learning Operations — is the practice of applying DevOps principles to the ML lifecycle. It sits at the intersection of:

  • ML Engineering (model training, evaluation, feature engineering)
  • Software Engineering (versioning, testing, packaging)
  • DevOps (CI/CD, containerization, infrastructure)

The goal is simple: make models reproducible, deployable, and maintainable — just like any other piece of software.

💡 By 2026, over 85% of ML projects still fail to reach production (Gartner). The bottleneck is rarely the model itself — it's the surrounding infrastructure.


The MLOps Maturity Model

Before diving in, understand where you are:

LevelDescription
0Manual, notebook-driven process. No reproducibility.
1ML pipeline automation. Models trained and evaluated consistently.
2CI/CD for ML pipelines. Automated retraining and deployment.
3Full MLOps. Automated monitoring, drift detection, and feedback loops.

This tutorial takes you from Level 0 → Level 2, with a clear path toward Level 3.


Project Overview: What We're Building

We'll use a concrete example: a customer churn prediction API.

Our starting point:

  • A trained scikit-learn pipeline (a RandomForestClassifier with preprocessing)
  • A messy Jupyter notebook called churn_model_final_v3_REAL.ipynb

Our destination:

  • A versioned model artifact stored in MLflow
  • A FastAPI REST endpoint serving predictions
  • A Docker container for reproducible deployment
  • A GitHub Actions CI/CD pipeline
  • Basic monitoring with drift detection

Let's build.


Step 1: Refactor the Notebook into a Training Script

The first sin of ML engineering is treating the notebook as the artifact. Notebooks are great for exploration — terrible for production.

Extract a train.py

# train.py
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
import joblib
import yaml
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def load_config(path: str = "config.yaml") -> dict:
    with open(path) as f:
        return yaml.safe_load(f)


def build_preprocessor(numeric_features: list, categorical_features: list):
    return ColumnTransformer(transformers=[
        ("num", StandardScaler(), numeric_features),
        ("cat", OneHotEncoder(handle_unknown="ignore"), categorical_features),
    ])


def train(config: dict):
    mlflow.set_experiment(config["mlflow"]["experiment_name"])

    df = pd.read_csv(config["data"]["train_path"])
    X = df.drop(columns=[config["data"]["target_column"]])
    y = df[config["data"]["target_column"]]

    X_train, X_val, y_train, y_val = train_test_split(
        X, y,
        test_size=config["training"]["val_size"],
        random_state=config["training"]["random_state"],
        stratify=y,
    )

    preprocessor = build_preprocessor(
        config["features"]["numeric"],
        config["features"]["categorical"],
    )

    model = Pipeline(steps=[
        ("preprocessor", preprocessor),
        ("classifier", RandomForestClassifier(
            n_estimators=config["model"]["n_estimators"],
            max_depth=config["model"]["max_depth"],
            random_state=config["training"]["random_state"],
        )),
    ])

    with mlflow.start_run() as run:
        logger.info(f"Starting MLflow run: {run.info.run_id}")

        model.fit(X_train, y_train)

        y_pred_proba = model.predict_proba(X_val)[:, 1]
        auc = roc_auc_score(y_val, y_pred_proba)

        # Log params and metrics
        mlflow.log_params(config["model"])
        mlflow.log_metric("val_auc", auc)
        mlflow.log_artifact("config.yaml")

        # Log the model
        mlflow.sklearn.log_model(
            model,
            artifact_path="model",
            registered_model_name=config["mlflow"]["model_name"],
        )

        logger.info(f"Validation AUC: {auc:.4f}")
        logger.info(f"Model registered as: {config['mlflow']['model_name']}")

        # Save locally too (for Docker packaging)
        joblib.dump(model, "artifacts/model.pkl")

    return run.info.run_id


if __name__ == "__main__":
    config = load_config()
    train(config)

The Config File

Never hardcode hyperparameters. Use a config.yaml:

# config.yaml
mlflow:
  experiment_name: "churn-prediction"
  model_name: "churn-classifier"
  tracking_uri: "http://mlflow-server:5000"

data:
  train_path: "data/churn_train.csv"
  target_column: "churned"

features:
  numeric: ["tenure", "monthly_charges", "total_charges", "num_products"]
  categorical: ["contract_type", "payment_method", "internet_service"]

training:
  val_size: 0.2
  random_state: 42

model:
  n_estimators: 200
  max_depth: 8

Step 2: Model Versioning with MLflow

MLflow is the de facto standard for ML experiment tracking. Here's what you get:

  • Experiment tracking: Every training run is logged with params, metrics, and artifacts
  • Model Registry: Versioned model storage with staging/production transitions
  • Artifact storage: Models, configs, plots — all versioned

Transitioning a Model to Production

# promote_model.py
import mlflow
from mlflow.tracking import MlflowClient

def promote_to_production(model_name: str, version: int):
    client = MlflowClient()

    # Archive any currently deployed production model
    for mv in client.get_latest_versions(model_name, stages=["Production"]):
        client.transition_model_version_stage(
            name=model_name,
            version=mv.version,
            stage="Archived",
        )

    # Promote new version
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production",
        archive_existing_versions=False,
    )

    print(f"✅ Model '{model_name}' v{version} promoted to Production")


if __name__ == "__main__":
    promote_to_production("churn-classifier", version=3)

Model Card

Every production model should have a Model Card — a brief document describing:

# Model Card: Churn Classifier v3

## Model Details
- **Type**: RandomForestClassifier (scikit-learn 1.5)
- **Task**: Binary classification (churn / no-churn)
- **Owner**: ML Platform Team

## Performance
| Metric | Value |
|--------|-------|
| Val AUC | 0.912 |
| Precision (churn) | 0.84 |
| Recall (churn) | 0.79 |

## Training Data
- 50,000 customers, Jan 2024 – Dec 2025
- Balanced sampling applied

## Limitations
- Not validated for customers with < 1 month tenure
- Assumes stable feature distributions

## Last Updated
2026-04-14

Step 3: Building the Production API with FastAPI

# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import joblib
import pandas as pd
import numpy as np
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time
import logging
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="Churn Prediction API",
    description="Predicts customer churn probability",
    version="1.0.0",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- Prometheus Metrics ---
PREDICTION_COUNT = Counter(
    "predictions_total",
    "Total prediction requests",
    ["outcome"],
)
PREDICTION_LATENCY = Histogram(
    "prediction_latency_seconds",
    "Prediction latency",
)
HIGH_RISK_GAUGE = Counter(
    "high_risk_predictions_total",
    "Predictions flagged as high churn risk",
)

# --- Load Model ---
MODEL_PATH = os.getenv("MODEL_PATH", "artifacts/model.pkl")

@app.on_event("startup")
async def load_model():
    global model
    logger.info(f"Loading model from {MODEL_PATH}")
    model = joblib.load(MODEL_PATH)
    logger.info("Model loaded successfully")


# --- Request/Response Schemas ---
class CustomerFeatures(BaseModel):
    tenure: float = Field(..., ge=0, description="Months as customer")
    monthly_charges: float = Field(..., ge=0)
    total_charges: float = Field(..., ge=0)
    num_products: int = Field(..., ge=1, le=10)
    contract_type: str = Field(..., pattern="^(Month-to-month|One year|Two year)$")
    payment_method: str
    internet_service: str

    class Config:
        json_schema_extra = {
            "example": {
                "tenure": 12,
                "monthly_charges": 65.50,
                "total_charges": 786.00,
                "num_products": 2,
                "contract_type": "Month-to-month",
                "payment_method": "Electronic check",
                "internet_service": "Fiber optic",
            }
        }


class PredictionResponse(BaseModel):
    churn_probability: float
    churn_prediction: bool
    risk_tier: str
    model_version: str


def get_risk_tier(probability: float) -> str:
    if probability >= 0.75:
        return "HIGH"
    elif probability >= 0.45:
        return "MEDIUM"
    return "LOW"


# --- Endpoints ---
@app.get("/health")
async def health_check():
    return {"status": "healthy", "model_loaded": model is not None}


@app.post("/predict", response_model=PredictionResponse)
async def predict(features: CustomerFeatures):
    start = time.time()

    try:
        input_df = pd.DataFrame([features.model_dump()])
        proba = model.predict_proba(input_df)[0, 1]
        prediction = bool(proba >= 0.5)
        risk = get_risk_tier(proba)

        PREDICTION_COUNT.labels(outcome="success").inc()
        PREDICTION_LATENCY.observe(time.time() - start)

        if risk == "HIGH":
            HIGH_RISK_GAUGE.inc()

        return PredictionResponse(
            churn_probability=round(float(proba), 4),
            churn_prediction=prediction,
            risk_tier=risk,
            model_version=os.getenv("MODEL_VERSION", "unknown"),
        )

    except Exception as e:
        PREDICTION_COUNT.labels(outcome="error").inc()
        logger.exception("Prediction failed")
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

Step 4: Containerization with Docker

Dockerfile

# Dockerfile
FROM python:3.12-slim AS base

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1

WORKDIR /app

# System dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Python dependencies
COPY requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt

# App source
COPY app/ ./app/
COPY artifacts/ ./artifacts/

# Non-root user for security
RUN useradd -m -u 1000 mlservice && chown -R mlservice:mlservice /app
USER mlservice

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

requirements.txt

fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.2
scikit-learn==1.5.2
pandas==2.2.3
numpy==2.1.2
joblib==1.4.2
prometheus-client==0.21.0
mlflow==2.18.0

Docker Compose (for local development)

# docker-compose.yml
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MODEL_PATH=artifacts/model.pkl
      - MODEL_VERSION=3.0.0
    volumes:
      - ./artifacts:/app/artifacts:ro
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  mlflow:
    image: ghcr.io/mlflow/mlflow:v2.18.0
    ports:
      - "5000:5000"
    command: mlflow server --host 0.0.0.0 --port 5000
    volumes:
      - mlflow_data:/mlflow

volumes:
  mlflow_data:

Step 5: CI/CD Pipeline with GitHub Actions

# .github/workflows/mlops-pipeline.yml
name: MLOps CI/CD Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}/churn-api

jobs:
  # ── 1. Test & Lint ────────────────────────────────────────────────────────
  test:
    name: Test & Lint
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
          cache: "pip"

      - name: Install dependencies
        run: pip install -r requirements.txt -r requirements-dev.txt

      - name: Lint
        run: |
          ruff check .
          mypy app/

      - name: Run unit tests
        run: pytest tests/ -v --cov=app --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v4

  # ── 2. Train & Evaluate ───────────────────────────────────────────────────
  train:
    name: Train Model
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
          cache: "pip"

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Train model
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
        run: python train.py

      - name: Evaluate model gate
        run: |
          python -c "
          import mlflow
          from mlflow.tracking import MlflowClient
          client = MlflowClient()
          runs = client.search_runs('churn-prediction', order_by=['start_time DESC'], max_results=1)
          auc = runs[0].data.metrics['val_auc']
          assert auc >= 0.88, f'AUC {auc:.4f} below threshold 0.88'
          print(f'✅ AUC gate passed: {auc:.4f}')
          "

      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: model-artifact
          path: artifacts/

  # ── 3. Build & Push Docker Image ──────────────────────────────────────────
  build:
    name: Build & Push Image
    needs: train
    runs-on: ubuntu-latest

    permissions:
      contents: read
      packages: write

    steps:
      - uses: actions/checkout@v4

      - name: Download model artifact
        uses: actions/download-artifact@v4
        with:
          name: model-artifact
          path: artifacts/

      - name: Log in to registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
          tags: |
            type=sha,prefix=sha-
            type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}

      - name: Build and push
        uses: docker/build-push-action@v6
        with:
          context: .
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

  # ── 4. Deploy to Staging ──────────────────────────────────────────────────
  deploy-staging:
    name: Deploy to Staging
    needs: build
    runs-on: ubuntu-latest
    environment: staging

    steps:
      - name: Deploy to staging cluster
        run: |
          echo "Deploying ${{ env.IMAGE_NAME }}:sha-${{ github.sha }} to staging"
          # kubectl set image deployment/churn-api churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:sha-${{ github.sha }} -n staging

      - name: Run smoke tests
        run: |
          sleep 10
          curl -f https://staging.api.yourcompany.com/health

  # ── 5. Deploy to Production ───────────────────────────────────────────────
  deploy-production:
    name: Deploy to Production
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production  # Requires manual approval

    steps:
      - name: Deploy to production
        run: |
          echo "Deploying to production..."
          # kubectl set image deployment/churn-api ...

Step 6: Writing Good Tests for ML Code

Testing ML code is different from testing regular software. Here's a practical test suite:

# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
import joblib
import numpy as np
from unittest.mock import patch, MagicMock

client = TestClient(app)

VALID_PAYLOAD = {
    "tenure": 24,
    "monthly_charges": 70.0,
    "total_charges": 1680.0,
    "num_products": 2,
    "contract_type": "Month-to-month",
    "payment_method": "Electronic check",
    "internet_service": "Fiber optic",
}


@pytest.fixture(autouse=True)
def mock_model():
    """Mock the model for API tests."""
    mock = MagicMock()
    mock.predict_proba.return_value = np.array([[0.3, 0.7]])
    with patch("app.main.model", mock):
        yield mock


def test_health_check():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"


def test_predict_valid_input():
    response = client.post("/predict", json=VALID_PAYLOAD)
    assert response.status_code == 200
    data = response.json()
    assert 0.0 <= data["churn_probability"] <= 1.0
    assert isinstance(data["churn_prediction"], bool)
    assert data["risk_tier"] in ["LOW", "MEDIUM", "HIGH"]


def test_predict_high_risk():
    """Verify risk tier classification."""
    with patch("app.main.model") as mock:
        mock.predict_proba.return_value = np.array([[0.1, 0.9]])
        response = client.post("/predict", json=VALID_PAYLOAD)
    assert response.json()["risk_tier"] == "HIGH"


def test_predict_invalid_contract_type():
    payload = {**VALID_PAYLOAD, "contract_type": "Weekly"}
    response = client.post("/predict", json=payload)
    assert response.status_code == 422


def test_predict_negative_tenure():
    payload = {**VALID_PAYLOAD, "tenure": -5}
    response = client.post("/predict", json=payload)
    assert response.status_code == 422
# tests/test_model.py
import pytest
import pandas as pd
import numpy as np
import joblib
from sklearn.pipeline import Pipeline


@pytest.fixture
def model():
    return joblib.load("artifacts/model.pkl")


@pytest.fixture
def sample_input():
    return pd.DataFrame([{
        "tenure": 12, "monthly_charges": 65.5, "total_charges": 786.0,
        "num_products": 2, "contract_type": "Month-to-month",
        "payment_method": "Electronic check", "internet_service": "Fiber optic",
    }])


def test_model_is_pipeline(model):
    assert isinstance(model, Pipeline)


def test_model_output_shape(model, sample_input):
    proba = model.predict_proba(sample_input)
    assert proba.shape == (1, 2)


def test_model_output_valid_probabilities(model, sample_input):
    proba = model.predict_proba(sample_input)
    assert np.allclose(proba.sum(axis=1), 1.0)
    assert (proba >= 0).all() and (proba <= 1).all()


def test_model_handles_unknown_categories(model):
    """Model must not crash on unseen categorical values."""
    df = pd.DataFrame([{
        "tenure": 5, "monthly_charges": 45.0, "total_charges": 225.0,
        "num_products": 1, "contract_type": "Month-to-month",
        "payment_method": "COMPLETELY_NEW_METHOD",
        "internet_service": "Fiber optic",
    }])
    proba = model.predict_proba(df)
    assert proba.shape == (1, 2)

Step 7: Monitoring and Data Drift Detection

Deploying a model is not the end — it's the beginning. Models degrade silently. You need monitoring.

Two Layers of Monitoring

1. Operational Monitoring (is the API healthy?)

  • Request latency (p50, p95, p99)
  • Error rates
  • Throughput

2. ML Monitoring (is the model still accurate?)

  • Prediction distribution shift
  • Feature distribution drift (statistical tests)
  • Business metric correlation (e.g., actual churn rate vs predicted)

Drift Detection Script

# monitoring/drift_detector.py
import pandas as pd
import numpy as np
from scipy import stats
from dataclasses import dataclass
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class DriftReport:
    feature: str
    statistic: float
    p_value: float
    drifted: bool
    test_used: str


def detect_feature_drift(
    reference: pd.DataFrame,
    current: pd.DataFrame,
    numeric_features: list[str],
    categorical_features: list[str],
    alpha: float = 0.05,
) -> list[DriftReport]:
    """
    Detects feature drift using:
    - Kolmogorov-Smirnov test for numeric features
    - Chi-squared test for categorical features
    """
    reports = []

    for feat in numeric_features:
        if feat not in reference.columns or feat not in current.columns:
            continue
        stat, p_value = stats.ks_2samp(
            reference[feat].dropna(),
            current[feat].dropna(),
        )
        reports.append(DriftReport(
            feature=feat,
            statistic=stat,
            p_value=p_value,
            drifted=p_value < alpha,
            test_used="KS",
        ))

    for feat in categorical_features:
        if feat not in reference.columns or feat not in current.columns:
            continue
        all_cats = set(reference[feat]).union(set(current[feat]))
        ref_counts = reference[feat].value_counts().reindex(all_cats, fill_value=0)
        cur_counts = current[feat].value_counts().reindex(all_cats, fill_value=0)
        stat, p_value = stats.chisquare(cur_counts, f_exp=ref_counts)
        reports.append(DriftReport(
            feature=feat,
            statistic=stat,
            p_value=p_value,
            drifted=p_value < alpha,
            test_used="Chi-squared",
        ))

    drifted = [r for r in reports if r.drifted]
    if drifted:
        logger.warning(f"⚠️  Drift detected in {len(drifted)} feature(s): "
                       f"{[r.feature for r in drifted]}")
    else:
        logger.info("✅ No significant feature drift detected.")

    return reports


def detect_prediction_drift(
    reference_predictions: np.ndarray,
    current_predictions: np.ndarray,
    alpha: float = 0.05,
) -> DriftReport:
    """Detect drift in model output distribution."""
    stat, p_value = stats.ks_2samp(reference_predictions, current_predictions)
    drifted = p_value < alpha
    if drifted:
        logger.warning(f"⚠️  Prediction distribution drift detected (p={p_value:.4f})")
    return DriftReport(
        feature="prediction_probability",
        statistic=stat,
        p_value=p_value,
        drifted=drifted,
        test_used="KS",
    )

Prometheus + Grafana Dashboard

In docker-compose.yml, add:

  prometheus:
    image: prom/prometheus:v2.55.0
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:11.3.0
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
# monitoring/prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "churn-api"
    static_configs:
      - targets: ["api:8000"]

Set up Grafana panels for:

  • rate(predictions_total[5m]) — prediction throughput
  • histogram_quantile(0.95, prediction_latency_seconds_bucket) — p95 latency
  • rate(high_risk_predictions_total[1h]) — high-risk rate over time

Best Practices

  • Separate concerns: Training code, serving code, and monitoring code should live in separate modules — not one giant script.
  • Version everything: Models, datasets, configs, and Docker images. If you can't reproduce a result, it doesn't exist.
  • Fail fast with gates: An AUC gate in CI prevents degraded models from reaching production silently.
  • Use structured logging: JSON logs are queryable. print() statements are archaeology.
  • Schema validation on ingress: pydantic is your first line of defense against malformed inputs crashing the model.
  • Canary deployments: For high-stakes models, route 5-10% of traffic to the new model before full rollout.
  • Store ground truth: Log inputs and (eventually) labels so you can retrain and evaluate retrospectively.
  • Document your model: A model without a model card is a liability, not an asset.

Common Mistakes

❌ Pickle-ing the entire notebook namespace

# BAD — pickles everything including data, random state, etc.
import pickle
with open("model.pkl", "wb") as f:
    pickle.dump(locals(), f)
# GOOD — serialize only the pipeline
import joblib
joblib.dump(pipeline, "artifacts/model.pkl")

❌ Skipping input validation

# BAD — model receives raw dict, no validation
@app.post("/predict")
def predict(data: dict):
    return model.predict([list(data.values())])

Always use Pydantic models with constraints. Garbage in, garbage out — and garbage out in production means a PagerDuty alert at 3am.

❌ No model versioning

Deploying model.pkl with no version tag means you can never trace which experiment produced the model that just misbehaved.

❌ Monitoring only infrastructure, not model behavior

CPU and memory metrics won't tell you that your model started systematically under-predicting churn for a new customer segment. Track prediction distributions.

❌ Retraining without evaluation gates

Automated retraining pipelines must include a model comparison step — new model must beat (or at minimum match) current production model before promotion.


🚀 Pro Tips

  • Use mlflow.autolog() for scikit-learn — it automatically logs all params and metrics with one line.
  • Shadow mode deployment: Run the new model in parallel, log predictions, but don't serve them. Compare to production before cutting over.
  • Feature stores (Feast, Tecton, Hopsworks) eliminate training/serving skew at scale — the same feature computation runs both at training time and inference time.
  • Model compression: For latency-sensitive APIs, profile inference time. ONNX can cut scikit-learn inference by 3-5×.
  • Async endpoints: For I/O-bound prediction services (e.g., models that call external feature APIs), use FastAPI's async def endpoints and asyncio.
  • Request batching: If your use case allows it, batch multiple predictions in one model call — predict_proba on 100 rows is usually much faster than 100 individual calls.
  • Blue/green deployments: Maintain two identical environments. Switch traffic after a successful smoke test. Instant rollback if something goes wrong.

📌 Key Takeaways

  • Notebooks are for exploration; scripts are for production. The first refactor you make after model validation is extracting a clean train.py.
  • MLflow (or an equivalent) should be your model registry from day one. You cannot manage model versions manually.
  • FastAPI + Pydantic is the gold standard for ML serving in Python in 2026 — type-safe, fast, and self-documenting.
  • Docker makes your serving environment reproducible. If it works in the container locally, it works in production.
  • GitHub Actions can orchestrate the full ML pipeline: test → train → evaluate → build → deploy.
  • Monitoring is not optional. Statistical drift detection should run on a schedule against a production data sample.
  • Fail-safe gates in CI prevent degraded models from silently reaching users.
  • MLOps is a journey, not a destination. Start at Level 1, instrument your way to Level 2, and build toward Level 3 iteratively.

Conclusion

The gap between a working Jupyter notebook and a production ML system is enormous — but it's entirely bridgeable with the right tooling and habits. What we've covered in this guide is the skeleton of a modern MLOps workflow:

  1. Clean, reproducible training scripts with config files
  2. MLflow for experiment tracking and model versioning
  3. FastAPI for serving with schema validation and observability hooks
  4. Docker for reproducible, portable deployment
  5. GitHub Actions for automated test → train → evaluate → deploy pipelines
  6. Prometheus/Grafana for operational and ML monitoring
  7. Statistical drift detection to catch model degradation before users do

The companies winning with ML in 2026 aren't necessarily those with the best models — they're the ones who can iterate fastest and trust their deployment pipelines.

Build the infrastructure once. Deploy with confidence. Monitor relentlessly.


References

All Articles
MLOpsMachine LearningPythonDockerCI/CDFastAPIMLflowModel DeploymentDevOpsProduction ML

Written by

Niraj Kumar

Software Developer — building scalable systems for businesses.