MLOps and Production Pipelines
Training a model is 20% of the work. Getting it to score new data reliably, on schedule, with monitoring and an audit trail that survives a program manager change — that's the other 80%. This chapter covers MLflow experiment tracking, model registry lifecycle, drift detection, CI/CD for ML, and the Palantir palantir_models deployment pattern.
MLflow Experiment Tracking
MLflow is the standard experiment tracking layer on all Databricks deployments (Advana, Jupiter, standalone). Every training run that touches a production dataset should log to MLflow. The audit trail isn't optional — it's the answer to "why did the model predict this in February 2025 when the data at that time was X?"
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
mlflow.set_experiment("/Users/your_email/maintenance-anomaly-v2")
with mlflow.start_run(run_name="rf_baseline") as run:
# Log parameters
mlflow.log_param("n_estimators", 200)
mlflow.log_param("max_depth", 8)
mlflow.log_param("training_data_table", "jupiter_catalog.silver.maintenance_features")
mlflow.log_param("training_cutoff_date", "2024-09-30")
# Train model
clf = RandomForestClassifier(n_estimators=200, max_depth=8, random_state=42)
clf.fit(X_train, y_train)
# Log metrics
mlflow.log_metric("test_auc", clf.score(X_test, y_test))
mlflow.log_metric("test_f1", f1_score(y_test, clf.predict(X_test)))
# Log artifacts
mlflow.log_dict({"feature_names": feature_cols}, "feature_schema.json")
# Register model — always use stage aliases, never hardcode version numbers
mlflow.sklearn.log_model(
clf,
artifact_path="model",
registered_model_name="maintenance-anomaly-detector",
input_example=X_test.head(5),
signature=mlflow.models.infer_signature(X_test, clf.predict_proba(X_test)),
)
print(f"Run ID: {run.info.run_id}")
Model Registry Lifecycle
The MLflow Model Registry provides stage management: a model version moves from None → Staging → Production → Archived. In a federal program, stage transitions map to review gates: Staging requires a technical peer review; Production requires the program manager's sign-off and the monitoring plan to be in place.
from mlflow import MlflowClient
client = MlflowClient()
model_name = "maintenance-anomaly-detector"
# Get the latest version in staging
staging_versions = client.get_latest_versions(model_name, stages=["Staging"])
if staging_versions:
version = staging_versions[0]
print(f"Staging version: {version.version} Run ID: {version.run_id}")
# Promote to Production (requires human approval in your workflow)
client.transition_model_version_stage(
name=model_name,
version=version.version,
stage="Production",
archive_existing_versions=True, # Archive the previous Production version
)
print(f"Promoted version {version.version} to Production")
# Archive old Production version
# (archive_existing_versions=True above handles this automatically)
# Always load Production model by stage alias, never by version number
model = mlflow.pyfunc.load_model(f"models:/{model_name}/Production")
Palantir palantir_models Deployment
On Foundry, model deployment uses the palantir_models library (not the deprecated foundry_ml). Models are registered as versioned Foundry datasets that connect to the Ontology, allowing them to be called from Workshop dashboards and AIP Logic functions.
from transforms.api import transform, Input, Output
from palantir_models import Model
from palantir_models.transforms import ModelInput, ModelOutput
import pandas as pd
# Model adapter — defines the interface between Foundry Ontology and your model
class MaintenanceRiskAdapter:
def __init__(self, sklearn_pipeline):
self.pipeline = sklearn_pipeline
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Called by AIP Logic and Workshop batch scoring.
Input: DataFrame with work order features.
Output: DataFrame with overrun_probability and overrun_flag columns.
"""
proba = self.pipeline.predict_proba(df)[:, 1]
df["overrun_probability"] = proba
df["overrun_flag"] = (proba >= 0.65).astype(int)
return df[["work_order_id", "overrun_probability", "overrun_flag"]]
@transform(
model_output=ModelOutput("/models/maintenance_overrun_v2"),
training_data=Input("/analytics/silver/maintenance_features"),
)
def train_and_register(training_data, model_output):
df = training_data.dataframe().toPandas()
# ... training code ...
adapter = MaintenanceRiskAdapter(trained_pipeline)
model_output.write_model(Model(adapter))
CI/CD for ML Pipelines
ML code is code. It goes through the same review, testing, and deployment pipeline as application code. On Databricks, GitHub Actions (or equivalent CI/CD on GovCloud) triggers training jobs and validation gates before any model moves to Production.
# .github/workflows/ml-pipeline.yml (simplified)
# Triggered on PR to main — runs validation before production deployment
name: ML Pipeline Validation
on:
pull_request:
branches: [main]
paths: ['src/models/**', 'notebooks/**']
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run data quality checks
run: python scripts/validate_data_quality.py
- name: Run model unit tests
run: pytest tests/test_model_logic.py -v
- name: Trigger Databricks training job
run: |
databricks jobs run-now \
--job-id ${{ secrets.TRAINING_JOB_ID }} \
--python-params '["--mode=validation", "--holdout=2024"]'
- name: Check model performance gate
run: python scripts/check_performance_gate.py --min-auc=0.78
Feature Stores
A feature store is the shared registry of computed features used across models. Without one, every team recomputes the same features independently — "ship age in years," "vendor prior award count," "fiscal month" — each with slightly different logic. The first production inconsistency surfaces in a discrepancy between the score in the dashboard and the score the model actually produces.
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
# Create feature table from precomputed features
fs.create_table(
name="procurement_catalog.feature_store.vendor_risk_features",
primary_keys=["recipient_uei"],
timestamp_keys=["as_of_date"],
description="Vendor-level risk features for ML models — updated daily",
schema=vendor_risk_df.schema,
)
# Write features to the store
fs.write_table(
name="procurement_catalog.feature_store.vendor_risk_features",
df=vendor_risk_df,
mode="merge",
)
# Training with Feature Store — automatic training/serving consistency
from databricks.feature_store import FeatureLookup
feature_lookups = [
FeatureLookup(
table_name="procurement_catalog.feature_store.vendor_risk_features",
feature_names=["prior_award_count", "on_time_rate", "cost_growth_avg"],
lookup_key="recipient_uei",
timestamp_lookup_key="action_date",
)
]
training_set = fs.create_training_set(
df=df_awards,
feature_lookups=feature_lookups,
label="exceeded_cost",
exclude_columns=["recipient_uei", "action_date"],
)
training_df = training_set.load_df().toPandas()
Drift Detection
Models decay. The operational environment changes — new equipment, doctrine updates, fiscal policy shifts — and the model trained 18 months ago no longer reflects reality. Drift detection is the monitoring discipline that tells you when this happens before the program manager notices it in the dashboard.
from evidently.report import Report
from evidently.metrics import DataDriftPreset, TargetDriftPreset
def generate_drift_report(
reference_df: pd.DataFrame,
current_df: pd.DataFrame,
report_path: str = "/tmp/drift_report.html",
) -> dict:
"""
Generate a data drift report comparing training distribution to current scoring distribution.
Log report artifact to MLflow for audit trail.
"""
report = Report(metrics=[DataDriftPreset(), TargetDriftPreset()])
report.run(reference_data=reference_df, current_data=current_df)
report.save_html(report_path)
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
summary = {
"dataset_drift_detected": drift_detected,
"n_drifted_features": result["metrics"][0]["result"]["number_of_drifted_columns"],
"total_features": result["metrics"][0]["result"]["number_of_columns"],
}
with mlflow.start_run(run_name="drift_check"):
mlflow.log_metrics(summary)
mlflow.log_artifact(report_path)
if drift_detected:
mlflow.set_tag("action_required", "retraining_triggered")
print("DRIFT DETECTED — triggering retraining workflow")
return summary
Where This Goes Wrong
Failure Mode 1: No Monitoring After Deployment
The model is deployed. You roll off the contract. Nobody checks whether it's still working. Six months later a new analyst notices the predictions are wrong and discovers the training data is two years stale and the operational context changed. Fix: define the monitoring owner, the monitoring frequency, and the retraining trigger in writing before you deploy. Not after. Before.
Failure Mode 2: Hardcoding Version Numbers
mlflow.pyfunc.load_model("models:/my-model/3") works until version 4 is promoted and the scoring pipeline isn't updated. Fix: always use stage aliases: "models:/my-model/Production". Stage transitions are managed by the registry; version numbers are an implementation detail.
Failure Mode 3: Feature Inconsistency Between Training and Scoring
The training pipeline computes "vendor on-time rate" as contracts completed on time divided by all contracts. The scoring pipeline computes it as a rolling 12-month average. The model trains on one, scores on another, and produces systematically biased predictions that look fine in aggregate but are wrong for specific vendors. Fix: use a feature store. The same computation runs at training time and scoring time, guaranteed.
Platform Comparison
| MLOps Capability | Databricks (Advana/Jupiter) | Palantir Foundry |
|---|---|---|
| Experiment tracking | MLflow (native) | Foundry versioned artifacts |
| Model registry | MLflow Model Registry | Foundry datasets + Ontology |
| Feature store | Databricks Feature Store | Foundry Pipeline Builder |
| Drift detection | evidently + scheduled Workflow | Monitoring Transforms |
| CI/CD integration | GitHub Actions + Databricks CLI | Foundry CI/CD (built-in) |
| Audit trail | MLflow runs + Unity Catalog logs | Foundry Transform history + Ontology |
| Batch scoring orchestration | Databricks Workflows (DAG) | Foundry Pipeline Builder (scheduled) |
Exercises
This chapter includes 6 hands-on exercises with full solutions — coding challenges, analysis tasks, and scenario-based problems.
View Exercises on GitHub →