Chapter 09

MLOps and Production Pipelines

Training a model is 20% of the work. Getting it to score new data reliably, on schedule, with monitoring and an audit trail that survives a program manager change — that's the other 80%. This chapter covers MLflow experiment tracking, model registry lifecycle, drift detection, CI/CD for ML, and the Palantir palantir_models deployment pattern.

~50 min read Databricks, Advana, Navy Jupiter, Palantir Foundry Code on GitHub

MLflow Experiment Tracking

MLflow is the standard experiment tracking layer on all Databricks deployments (Advana, Jupiter, standalone). Every training run that touches a production dataset should log to MLflow. The audit trail isn't optional — it's the answer to "why did the model predict this in February 2025 when the data at that time was X?"

python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

mlflow.set_experiment("/Users/your_email/maintenance-anomaly-v2")

with mlflow.start_run(run_name="rf_baseline") as run:
    # Log parameters
    mlflow.log_param("n_estimators", 200)
    mlflow.log_param("max_depth", 8)
    mlflow.log_param("training_data_table", "jupiter_catalog.silver.maintenance_features")
    mlflow.log_param("training_cutoff_date", "2024-09-30")

    # Train model
    clf = RandomForestClassifier(n_estimators=200, max_depth=8, random_state=42)
    clf.fit(X_train, y_train)

    # Log metrics
    mlflow.log_metric("test_auc", clf.score(X_test, y_test))
    mlflow.log_metric("test_f1", f1_score(y_test, clf.predict(X_test)))

    # Log artifacts
    mlflow.log_dict({"feature_names": feature_cols}, "feature_schema.json")

    # Register model — always use stage aliases, never hardcode version numbers
    mlflow.sklearn.log_model(
        clf,
        artifact_path="model",
        registered_model_name="maintenance-anomaly-detector",
        input_example=X_test.head(5),
        signature=mlflow.models.infer_signature(X_test, clf.predict_proba(X_test)),
    )

    print(f"Run ID: {run.info.run_id}")

Model Registry Lifecycle

The MLflow Model Registry provides stage management: a model version moves from NoneStagingProductionArchived. In a federal program, stage transitions map to review gates: Staging requires a technical peer review; Production requires the program manager's sign-off and the monitoring plan to be in place.

python
from mlflow import MlflowClient

client = MlflowClient()
model_name = "maintenance-anomaly-detector"

# Get the latest version in staging
staging_versions = client.get_latest_versions(model_name, stages=["Staging"])
if staging_versions:
    version = staging_versions[0]
    print(f"Staging version: {version.version}  Run ID: {version.run_id}")

# Promote to Production (requires human approval in your workflow)
client.transition_model_version_stage(
    name=model_name,
    version=version.version,
    stage="Production",
    archive_existing_versions=True,  # Archive the previous Production version
)
print(f"Promoted version {version.version} to Production")

# Archive old Production version
# (archive_existing_versions=True above handles this automatically)

# Always load Production model by stage alias, never by version number
model = mlflow.pyfunc.load_model(f"models:/{model_name}/Production")

Palantir palantir_models Deployment

On Foundry, model deployment uses the palantir_models library (not the deprecated foundry_ml). Models are registered as versioned Foundry datasets that connect to the Ontology, allowing them to be called from Workshop dashboards and AIP Logic functions.

python
from transforms.api import transform, Input, Output
from palantir_models import Model
from palantir_models.transforms import ModelInput, ModelOutput
import pandas as pd

# Model adapter — defines the interface between Foundry Ontology and your model
class MaintenanceRiskAdapter:
    def __init__(self, sklearn_pipeline):
        self.pipeline = sklearn_pipeline

    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Called by AIP Logic and Workshop batch scoring.
        Input: DataFrame with work order features.
        Output: DataFrame with overrun_probability and overrun_flag columns.
        """
        proba = self.pipeline.predict_proba(df)[:, 1]
        df["overrun_probability"] = proba
        df["overrun_flag"]        = (proba >= 0.65).astype(int)
        return df[["work_order_id", "overrun_probability", "overrun_flag"]]


@transform(
    model_output=ModelOutput("/models/maintenance_overrun_v2"),
    training_data=Input("/analytics/silver/maintenance_features"),
)
def train_and_register(training_data, model_output):
    df = training_data.dataframe().toPandas()
    # ... training code ...
    adapter = MaintenanceRiskAdapter(trained_pipeline)
    model_output.write_model(Model(adapter))

CI/CD for ML Pipelines

ML code is code. It goes through the same review, testing, and deployment pipeline as application code. On Databricks, GitHub Actions (or equivalent CI/CD on GovCloud) triggers training jobs and validation gates before any model moves to Production.

bash
# .github/workflows/ml-pipeline.yml (simplified)
# Triggered on PR to main — runs validation before production deployment

name: ML Pipeline Validation

on:
  pull_request:
    branches: [main]
    paths: ['src/models/**', 'notebooks/**']

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Run data quality checks
        run: python scripts/validate_data_quality.py

      - name: Run model unit tests
        run: pytest tests/test_model_logic.py -v

      - name: Trigger Databricks training job
        run: |
          databricks jobs run-now \
            --job-id ${{ secrets.TRAINING_JOB_ID }} \
            --python-params '["--mode=validation", "--holdout=2024"]'

      - name: Check model performance gate
        run: python scripts/check_performance_gate.py --min-auc=0.78

Feature Stores

A feature store is the shared registry of computed features used across models. Without one, every team recomputes the same features independently — "ship age in years," "vendor prior award count," "fiscal month" — each with slightly different logic. The first production inconsistency surfaces in a discrepancy between the score in the dashboard and the score the model actually produces.

python
from databricks.feature_store import FeatureStoreClient

fs = FeatureStoreClient()

# Create feature table from precomputed features
fs.create_table(
    name="procurement_catalog.feature_store.vendor_risk_features",
    primary_keys=["recipient_uei"],
    timestamp_keys=["as_of_date"],
    description="Vendor-level risk features for ML models — updated daily",
    schema=vendor_risk_df.schema,
)

# Write features to the store
fs.write_table(
    name="procurement_catalog.feature_store.vendor_risk_features",
    df=vendor_risk_df,
    mode="merge",
)

# Training with Feature Store — automatic training/serving consistency
from databricks.feature_store import FeatureLookup

feature_lookups = [
    FeatureLookup(
        table_name="procurement_catalog.feature_store.vendor_risk_features",
        feature_names=["prior_award_count", "on_time_rate", "cost_growth_avg"],
        lookup_key="recipient_uei",
        timestamp_lookup_key="action_date",
    )
]

training_set = fs.create_training_set(
    df=df_awards,
    feature_lookups=feature_lookups,
    label="exceeded_cost",
    exclude_columns=["recipient_uei", "action_date"],
)

training_df = training_set.load_df().toPandas()

Drift Detection

Models decay. The operational environment changes — new equipment, doctrine updates, fiscal policy shifts — and the model trained 18 months ago no longer reflects reality. Drift detection is the monitoring discipline that tells you when this happens before the program manager notices it in the dashboard.

python
from evidently.report import Report
from evidently.metrics import DataDriftPreset, TargetDriftPreset

def generate_drift_report(
    reference_df: pd.DataFrame,
    current_df: pd.DataFrame,
    report_path: str = "/tmp/drift_report.html",
) -> dict:
    """
    Generate a data drift report comparing training distribution to current scoring distribution.
    Log report artifact to MLflow for audit trail.
    """
    report = Report(metrics=[DataDriftPreset(), TargetDriftPreset()])
    report.run(reference_data=reference_df, current_data=current_df)
    report.save_html(report_path)

    result = report.as_dict()
    drift_detected = result["metrics"][0]["result"]["dataset_drift"]

    summary = {
        "dataset_drift_detected": drift_detected,
        "n_drifted_features":     result["metrics"][0]["result"]["number_of_drifted_columns"],
        "total_features":         result["metrics"][0]["result"]["number_of_columns"],
    }

    with mlflow.start_run(run_name="drift_check"):
        mlflow.log_metrics(summary)
        mlflow.log_artifact(report_path)

        if drift_detected:
            mlflow.set_tag("action_required", "retraining_triggered")
            print("DRIFT DETECTED — triggering retraining workflow")

    return summary

Where This Goes Wrong

Failure Mode 1: No Monitoring After Deployment

The model is deployed. You roll off the contract. Nobody checks whether it's still working. Six months later a new analyst notices the predictions are wrong and discovers the training data is two years stale and the operational context changed. Fix: define the monitoring owner, the monitoring frequency, and the retraining trigger in writing before you deploy. Not after. Before.

Failure Mode 2: Hardcoding Version Numbers

mlflow.pyfunc.load_model("models:/my-model/3") works until version 4 is promoted and the scoring pipeline isn't updated. Fix: always use stage aliases: "models:/my-model/Production". Stage transitions are managed by the registry; version numbers are an implementation detail.

Failure Mode 3: Feature Inconsistency Between Training and Scoring

The training pipeline computes "vendor on-time rate" as contracts completed on time divided by all contracts. The scoring pipeline computes it as a rolling 12-month average. The model trains on one, scores on another, and produces systematically biased predictions that look fine in aggregate but are wrong for specific vendors. Fix: use a feature store. The same computation runs at training time and scoring time, guaranteed.

Platform Comparison

MLOps Capability Databricks (Advana/Jupiter) Palantir Foundry
Experiment tracking MLflow (native) Foundry versioned artifacts
Model registry MLflow Model Registry Foundry datasets + Ontology
Feature store Databricks Feature Store Foundry Pipeline Builder
Drift detection evidently + scheduled Workflow Monitoring Transforms
CI/CD integration GitHub Actions + Databricks CLI Foundry CI/CD (built-in)
Audit trail MLflow runs + Unity Catalog logs Foundry Transform history + Ontology
Batch scoring orchestration Databricks Workflows (DAG) Foundry Pipeline Builder (scheduled)

Exercises

This chapter includes 6 hands-on exercises with full solutions — coding challenges, analysis tasks, and scenario-based problems.

View Exercises on GitHub →