So you've trained a model that hits 94% accuracy on your validation set. Your Jupyter notebook is clean (well, clean-ish), the loss curves look great, and your manager is excited. Now comes the question nobody warned you about in that Coursera course:
"When can we put it in production?"
This is where MLOps enters the picture — and where most ML projects quietly die. In this guide, we'll walk through the entire journey: from a notebook-bound model to a versioned, containerized, CI/CD-deployed, and monitored production API. No hand-waving. Real code. Real decisions.
What Is MLOps (And Why Should You Care)?
MLOps — short for Machine Learning Operations — is the practice of applying DevOps principles to the ML lifecycle. It sits at the intersection of:
- ML Engineering (model training, evaluation, feature engineering)
- Software Engineering (versioning, testing, packaging)
- DevOps (CI/CD, containerization, infrastructure)
The goal is simple: make models reproducible, deployable, and maintainable — just like any other piece of software.
💡 By 2026, over 85% of ML projects still fail to reach production (Gartner). The bottleneck is rarely the model itself — it's the surrounding infrastructure.
The MLOps Maturity Model
Before diving in, understand where you are:
| Level | Description |
|---|---|
| 0 | Manual, notebook-driven process. No reproducibility. |
| 1 | ML pipeline automation. Models trained and evaluated consistently. |
| 2 | CI/CD for ML pipelines. Automated retraining and deployment. |
| 3 | Full MLOps. Automated monitoring, drift detection, and feedback loops. |
This tutorial takes you from Level 0 → Level 2, with a clear path toward Level 3.
Project Overview: What We're Building
We'll use a concrete example: a customer churn prediction API.
Our starting point:
- A trained
scikit-learnpipeline (aRandomForestClassifierwith preprocessing) - A messy Jupyter notebook called
churn_model_final_v3_REAL.ipynb
Our destination:
- A versioned model artifact stored in MLflow
- A FastAPI REST endpoint serving predictions
- A Docker container for reproducible deployment
- A GitHub Actions CI/CD pipeline
- Basic monitoring with drift detection
Let's build.
Step 1: Refactor the Notebook into a Training Script
The first sin of ML engineering is treating the notebook as the artifact. Notebooks are great for exploration — terrible for production.
Extract a train.py
# train.py
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
import joblib
import yaml
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(path: str = "config.yaml") -> dict:
with open(path) as f:
return yaml.safe_load(f)
def build_preprocessor(numeric_features: list, categorical_features: list):
return ColumnTransformer(transformers=[
("num", StandardScaler(), numeric_features),
("cat", OneHotEncoder(handle_unknown="ignore"), categorical_features),
])
def train(config: dict):
mlflow.set_experiment(config["mlflow"]["experiment_name"])
df = pd.read_csv(config["data"]["train_path"])
X = df.drop(columns=[config["data"]["target_column"]])
y = df[config["data"]["target_column"]]
X_train, X_val, y_train, y_val = train_test_split(
X, y,
test_size=config["training"]["val_size"],
random_state=config["training"]["random_state"],
stratify=y,
)
preprocessor = build_preprocessor(
config["features"]["numeric"],
config["features"]["categorical"],
)
model = Pipeline(steps=[
("preprocessor", preprocessor),
("classifier", RandomForestClassifier(
n_estimators=config["model"]["n_estimators"],
max_depth=config["model"]["max_depth"],
random_state=config["training"]["random_state"],
)),
])
with mlflow.start_run() as run:
logger.info(f"Starting MLflow run: {run.info.run_id}")
model.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, y_pred_proba)
# Log params and metrics
mlflow.log_params(config["model"])
mlflow.log_metric("val_auc", auc)
mlflow.log_artifact("config.yaml")
# Log the model
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name=config["mlflow"]["model_name"],
)
logger.info(f"Validation AUC: {auc:.4f}")
logger.info(f"Model registered as: {config['mlflow']['model_name']}")
# Save locally too (for Docker packaging)
joblib.dump(model, "artifacts/model.pkl")
return run.info.run_id
if __name__ == "__main__":
config = load_config()
train(config)
The Config File
Never hardcode hyperparameters. Use a config.yaml:
# config.yaml
mlflow:
experiment_name: "churn-prediction"
model_name: "churn-classifier"
tracking_uri: "http://mlflow-server:5000"
data:
train_path: "data/churn_train.csv"
target_column: "churned"
features:
numeric: ["tenure", "monthly_charges", "total_charges", "num_products"]
categorical: ["contract_type", "payment_method", "internet_service"]
training:
val_size: 0.2
random_state: 42
model:
n_estimators: 200
max_depth: 8
Step 2: Model Versioning with MLflow
MLflow is the de facto standard for ML experiment tracking. Here's what you get:
- Experiment tracking: Every training run is logged with params, metrics, and artifacts
- Model Registry: Versioned model storage with staging/production transitions
- Artifact storage: Models, configs, plots — all versioned
Transitioning a Model to Production
# promote_model.py
import mlflow
from mlflow.tracking import MlflowClient
def promote_to_production(model_name: str, version: int):
client = MlflowClient()
# Archive any currently deployed production model
for mv in client.get_latest_versions(model_name, stages=["Production"]):
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
)
# Promote new version
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=False,
)
print(f"✅ Model '{model_name}' v{version} promoted to Production")
if __name__ == "__main__":
promote_to_production("churn-classifier", version=3)
Model Card
Every production model should have a Model Card — a brief document describing:
# Model Card: Churn Classifier v3
## Model Details
- **Type**: RandomForestClassifier (scikit-learn 1.5)
- **Task**: Binary classification (churn / no-churn)
- **Owner**: ML Platform Team
## Performance
| Metric | Value |
|--------|-------|
| Val AUC | 0.912 |
| Precision (churn) | 0.84 |
| Recall (churn) | 0.79 |
## Training Data
- 50,000 customers, Jan 2024 – Dec 2025
- Balanced sampling applied
## Limitations
- Not validated for customers with < 1 month tenure
- Assumes stable feature distributions
## Last Updated
2026-04-14
Step 3: Building the Production API with FastAPI
# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import joblib
import pandas as pd
import numpy as np
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Churn Prediction API",
description="Predicts customer churn probability",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# --- Prometheus Metrics ---
PREDICTION_COUNT = Counter(
"predictions_total",
"Total prediction requests",
["outcome"],
)
PREDICTION_LATENCY = Histogram(
"prediction_latency_seconds",
"Prediction latency",
)
HIGH_RISK_GAUGE = Counter(
"high_risk_predictions_total",
"Predictions flagged as high churn risk",
)
# --- Load Model ---
MODEL_PATH = os.getenv("MODEL_PATH", "artifacts/model.pkl")
@app.on_event("startup")
async def load_model():
global model
logger.info(f"Loading model from {MODEL_PATH}")
model = joblib.load(MODEL_PATH)
logger.info("Model loaded successfully")
# --- Request/Response Schemas ---
class CustomerFeatures(BaseModel):
tenure: float = Field(..., ge=0, description="Months as customer")
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
contract_type: str = Field(..., pattern="^(Month-to-month|One year|Two year)$")
payment_method: str
internet_service: str
class Config:
json_schema_extra = {
"example": {
"tenure": 12,
"monthly_charges": 65.50,
"total_charges": 786.00,
"num_products": 2,
"contract_type": "Month-to-month",
"payment_method": "Electronic check",
"internet_service": "Fiber optic",
}
}
class PredictionResponse(BaseModel):
churn_probability: float
churn_prediction: bool
risk_tier: str
model_version: str
def get_risk_tier(probability: float) -> str:
if probability >= 0.75:
return "HIGH"
elif probability >= 0.45:
return "MEDIUM"
return "LOW"
# --- Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "healthy", "model_loaded": model is not None}
@app.post("/predict", response_model=PredictionResponse)
async def predict(features: CustomerFeatures):
start = time.time()
try:
input_df = pd.DataFrame([features.model_dump()])
proba = model.predict_proba(input_df)[0, 1]
prediction = bool(proba >= 0.5)
risk = get_risk_tier(proba)
PREDICTION_COUNT.labels(outcome="success").inc()
PREDICTION_LATENCY.observe(time.time() - start)
if risk == "HIGH":
HIGH_RISK_GAUGE.inc()
return PredictionResponse(
churn_probability=round(float(proba), 4),
churn_prediction=prediction,
risk_tier=risk,
model_version=os.getenv("MODEL_VERSION", "unknown"),
)
except Exception as e:
PREDICTION_COUNT.labels(outcome="error").inc()
logger.exception("Prediction failed")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Step 4: Containerization with Docker
Dockerfile
# Dockerfile
FROM python:3.12-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /app
# System dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Python dependencies
COPY requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt
# App source
COPY app/ ./app/
COPY artifacts/ ./artifacts/
# Non-root user for security
RUN useradd -m -u 1000 mlservice && chown -R mlservice:mlservice /app
USER mlservice
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.2
scikit-learn==1.5.2
pandas==2.2.3
numpy==2.1.2
joblib==1.4.2
prometheus-client==0.21.0
mlflow==2.18.0
Docker Compose (for local development)
# docker-compose.yml
services:
api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=artifacts/model.pkl
- MODEL_VERSION=3.0.0
volumes:
- ./artifacts:/app/artifacts:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
mlflow:
image: ghcr.io/mlflow/mlflow:v2.18.0
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --port 5000
volumes:
- mlflow_data:/mlflow
volumes:
mlflow_data:
Step 5: CI/CD Pipeline with GitHub Actions
# .github/workflows/mlops-pipeline.yml
name: MLOps CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/churn-api
jobs:
# ── 1. Test & Lint ────────────────────────────────────────────────────────
test:
name: Test & Lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
- name: Install dependencies
run: pip install -r requirements.txt -r requirements-dev.txt
- name: Lint
run: |
ruff check .
mypy app/
- name: Run unit tests
run: pytest tests/ -v --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
# ── 2. Train & Evaluate ───────────────────────────────────────────────────
train:
name: Train Model
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
- name: Install dependencies
run: pip install -r requirements.txt
- name: Train model
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
run: python train.py
- name: Evaluate model gate
run: |
python -c "
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
runs = client.search_runs('churn-prediction', order_by=['start_time DESC'], max_results=1)
auc = runs[0].data.metrics['val_auc']
assert auc >= 0.88, f'AUC {auc:.4f} below threshold 0.88'
print(f'✅ AUC gate passed: {auc:.4f}')
"
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model-artifact
path: artifacts/
# ── 3. Build & Push Docker Image ──────────────────────────────────────────
build:
name: Build & Push Image
needs: train
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: model-artifact
path: artifacts/
- name: Log in to registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=sha-
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}
- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ── 4. Deploy to Staging ──────────────────────────────────────────────────
deploy-staging:
name: Deploy to Staging
needs: build
runs-on: ubuntu-latest
environment: staging
steps:
- name: Deploy to staging cluster
run: |
echo "Deploying ${{ env.IMAGE_NAME }}:sha-${{ github.sha }} to staging"
# kubectl set image deployment/churn-api churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:sha-${{ github.sha }} -n staging
- name: Run smoke tests
run: |
sleep 10
curl -f https://staging.api.yourcompany.com/health
# ── 5. Deploy to Production ───────────────────────────────────────────────
deploy-production:
name: Deploy to Production
needs: deploy-staging
runs-on: ubuntu-latest
environment: production # Requires manual approval
steps:
- name: Deploy to production
run: |
echo "Deploying to production..."
# kubectl set image deployment/churn-api ...
Step 6: Writing Good Tests for ML Code
Testing ML code is different from testing regular software. Here's a practical test suite:
# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
import joblib
import numpy as np
from unittest.mock import patch, MagicMock
client = TestClient(app)
VALID_PAYLOAD = {
"tenure": 24,
"monthly_charges": 70.0,
"total_charges": 1680.0,
"num_products": 2,
"contract_type": "Month-to-month",
"payment_method": "Electronic check",
"internet_service": "Fiber optic",
}
@pytest.fixture(autouse=True)
def mock_model():
"""Mock the model for API tests."""
mock = MagicMock()
mock.predict_proba.return_value = np.array([[0.3, 0.7]])
with patch("app.main.model", mock):
yield mock
def test_health_check():
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_predict_valid_input():
response = client.post("/predict", json=VALID_PAYLOAD)
assert response.status_code == 200
data = response.json()
assert 0.0 <= data["churn_probability"] <= 1.0
assert isinstance(data["churn_prediction"], bool)
assert data["risk_tier"] in ["LOW", "MEDIUM", "HIGH"]
def test_predict_high_risk():
"""Verify risk tier classification."""
with patch("app.main.model") as mock:
mock.predict_proba.return_value = np.array([[0.1, 0.9]])
response = client.post("/predict", json=VALID_PAYLOAD)
assert response.json()["risk_tier"] == "HIGH"
def test_predict_invalid_contract_type():
payload = {**VALID_PAYLOAD, "contract_type": "Weekly"}
response = client.post("/predict", json=payload)
assert response.status_code == 422
def test_predict_negative_tenure():
payload = {**VALID_PAYLOAD, "tenure": -5}
response = client.post("/predict", json=payload)
assert response.status_code == 422
# tests/test_model.py
import pytest
import pandas as pd
import numpy as np
import joblib
from sklearn.pipeline import Pipeline
@pytest.fixture
def model():
return joblib.load("artifacts/model.pkl")
@pytest.fixture
def sample_input():
return pd.DataFrame([{
"tenure": 12, "monthly_charges": 65.5, "total_charges": 786.0,
"num_products": 2, "contract_type": "Month-to-month",
"payment_method": "Electronic check", "internet_service": "Fiber optic",
}])
def test_model_is_pipeline(model):
assert isinstance(model, Pipeline)
def test_model_output_shape(model, sample_input):
proba = model.predict_proba(sample_input)
assert proba.shape == (1, 2)
def test_model_output_valid_probabilities(model, sample_input):
proba = model.predict_proba(sample_input)
assert np.allclose(proba.sum(axis=1), 1.0)
assert (proba >= 0).all() and (proba <= 1).all()
def test_model_handles_unknown_categories(model):
"""Model must not crash on unseen categorical values."""
df = pd.DataFrame([{
"tenure": 5, "monthly_charges": 45.0, "total_charges": 225.0,
"num_products": 1, "contract_type": "Month-to-month",
"payment_method": "COMPLETELY_NEW_METHOD",
"internet_service": "Fiber optic",
}])
proba = model.predict_proba(df)
assert proba.shape == (1, 2)
Step 7: Monitoring and Data Drift Detection
Deploying a model is not the end — it's the beginning. Models degrade silently. You need monitoring.
Two Layers of Monitoring
1. Operational Monitoring (is the API healthy?)
- Request latency (p50, p95, p99)
- Error rates
- Throughput
2. ML Monitoring (is the model still accurate?)
- Prediction distribution shift
- Feature distribution drift (statistical tests)
- Business metric correlation (e.g., actual churn rate vs predicted)
Drift Detection Script
# monitoring/drift_detector.py
import pandas as pd
import numpy as np
from scipy import stats
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftReport:
feature: str
statistic: float
p_value: float
drifted: bool
test_used: str
def detect_feature_drift(
reference: pd.DataFrame,
current: pd.DataFrame,
numeric_features: list[str],
categorical_features: list[str],
alpha: float = 0.05,
) -> list[DriftReport]:
"""
Detects feature drift using:
- Kolmogorov-Smirnov test for numeric features
- Chi-squared test for categorical features
"""
reports = []
for feat in numeric_features:
if feat not in reference.columns or feat not in current.columns:
continue
stat, p_value = stats.ks_2samp(
reference[feat].dropna(),
current[feat].dropna(),
)
reports.append(DriftReport(
feature=feat,
statistic=stat,
p_value=p_value,
drifted=p_value < alpha,
test_used="KS",
))
for feat in categorical_features:
if feat not in reference.columns or feat not in current.columns:
continue
all_cats = set(reference[feat]).union(set(current[feat]))
ref_counts = reference[feat].value_counts().reindex(all_cats, fill_value=0)
cur_counts = current[feat].value_counts().reindex(all_cats, fill_value=0)
stat, p_value = stats.chisquare(cur_counts, f_exp=ref_counts)
reports.append(DriftReport(
feature=feat,
statistic=stat,
p_value=p_value,
drifted=p_value < alpha,
test_used="Chi-squared",
))
drifted = [r for r in reports if r.drifted]
if drifted:
logger.warning(f"⚠️ Drift detected in {len(drifted)} feature(s): "
f"{[r.feature for r in drifted]}")
else:
logger.info("✅ No significant feature drift detected.")
return reports
def detect_prediction_drift(
reference_predictions: np.ndarray,
current_predictions: np.ndarray,
alpha: float = 0.05,
) -> DriftReport:
"""Detect drift in model output distribution."""
stat, p_value = stats.ks_2samp(reference_predictions, current_predictions)
drifted = p_value < alpha
if drifted:
logger.warning(f"⚠️ Prediction distribution drift detected (p={p_value:.4f})")
return DriftReport(
feature="prediction_probability",
statistic=stat,
p_value=p_value,
drifted=drifted,
test_used="KS",
)
Prometheus + Grafana Dashboard
In docker-compose.yml, add:
prometheus:
image: prom/prometheus:v2.55.0
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
# monitoring/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "churn-api"
static_configs:
- targets: ["api:8000"]
Set up Grafana panels for:
rate(predictions_total[5m])— prediction throughputhistogram_quantile(0.95, prediction_latency_seconds_bucket)— p95 latencyrate(high_risk_predictions_total[1h])— high-risk rate over time
Best Practices
- Separate concerns: Training code, serving code, and monitoring code should live in separate modules — not one giant script.
- Version everything: Models, datasets, configs, and Docker images. If you can't reproduce a result, it doesn't exist.
- Fail fast with gates: An AUC gate in CI prevents degraded models from reaching production silently.
- Use structured logging: JSON logs are queryable.
print()statements are archaeology. - Schema validation on ingress:
pydanticis your first line of defense against malformed inputs crashing the model. - Canary deployments: For high-stakes models, route 5-10% of traffic to the new model before full rollout.
- Store ground truth: Log inputs and (eventually) labels so you can retrain and evaluate retrospectively.
- Document your model: A model without a model card is a liability, not an asset.
Common Mistakes
❌ Pickle-ing the entire notebook namespace
# BAD — pickles everything including data, random state, etc.
import pickle
with open("model.pkl", "wb") as f:
pickle.dump(locals(), f)
# GOOD — serialize only the pipeline
import joblib
joblib.dump(pipeline, "artifacts/model.pkl")
❌ Skipping input validation
# BAD — model receives raw dict, no validation
@app.post("/predict")
def predict(data: dict):
return model.predict([list(data.values())])
Always use Pydantic models with constraints. Garbage in, garbage out — and garbage out in production means a PagerDuty alert at 3am.
❌ No model versioning
Deploying model.pkl with no version tag means you can never trace which experiment produced the model that just misbehaved.
❌ Monitoring only infrastructure, not model behavior
CPU and memory metrics won't tell you that your model started systematically under-predicting churn for a new customer segment. Track prediction distributions.
❌ Retraining without evaluation gates
Automated retraining pipelines must include a model comparison step — new model must beat (or at minimum match) current production model before promotion.
🚀 Pro Tips
- Use
mlflow.autolog()for scikit-learn — it automatically logs all params and metrics with one line. - Shadow mode deployment: Run the new model in parallel, log predictions, but don't serve them. Compare to production before cutting over.
- Feature stores (Feast, Tecton, Hopsworks) eliminate training/serving skew at scale — the same feature computation runs both at training time and inference time.
- Model compression: For latency-sensitive APIs, profile inference time.
ONNXcan cut scikit-learn inference by 3-5×. - Async endpoints: For I/O-bound prediction services (e.g., models that call external feature APIs), use FastAPI's
async defendpoints andasyncio. - Request batching: If your use case allows it, batch multiple predictions in one model call —
predict_probaon 100 rows is usually much faster than 100 individual calls. - Blue/green deployments: Maintain two identical environments. Switch traffic after a successful smoke test. Instant rollback if something goes wrong.
📌 Key Takeaways
- Notebooks are for exploration; scripts are for production. The first refactor you make after model validation is extracting a clean
train.py. - MLflow (or an equivalent) should be your model registry from day one. You cannot manage model versions manually.
- FastAPI + Pydantic is the gold standard for ML serving in Python in 2026 — type-safe, fast, and self-documenting.
- Docker makes your serving environment reproducible. If it works in the container locally, it works in production.
- GitHub Actions can orchestrate the full ML pipeline: test → train → evaluate → build → deploy.
- Monitoring is not optional. Statistical drift detection should run on a schedule against a production data sample.
- Fail-safe gates in CI prevent degraded models from silently reaching users.
- MLOps is a journey, not a destination. Start at Level 1, instrument your way to Level 2, and build toward Level 3 iteratively.
Conclusion
The gap between a working Jupyter notebook and a production ML system is enormous — but it's entirely bridgeable with the right tooling and habits. What we've covered in this guide is the skeleton of a modern MLOps workflow:
- Clean, reproducible training scripts with config files
- MLflow for experiment tracking and model versioning
- FastAPI for serving with schema validation and observability hooks
- Docker for reproducible, portable deployment
- GitHub Actions for automated test → train → evaluate → deploy pipelines
- Prometheus/Grafana for operational and ML monitoring
- Statistical drift detection to catch model degradation before users do
The companies winning with ML in 2026 aren't necessarily those with the best models — they're the ones who can iterate fastest and trust their deployment pipelines.
Build the infrastructure once. Deploy with confidence. Monitor relentlessly.
References
- MLflow Documentation
- FastAPI Documentation
- scikit-learn Pipelines Guide
- Google: Continuous Delivery for Machine Learning (CD4ML)
- Evidently AI — Open Source ML Monitoring
- Prometheus Python Client
- GitHub Actions Documentation
- Sculley, D. et al. (2015). Hidden Technical Debt in Machine Learning Systems. NeurIPS.
- The MLOps Maturity Model — Microsoft Azure