Chapter 06

Supervised Machine Learning on Federal Platforms

Building classifiers that work for the subgroups that matter operationally, that you can explain to a contracting officer, and that survive the gap between a slide deck number and fleet reality.

~55 min read Databricks, Advana, Navy Jupiter, Palantir Foundry Code on GitHub

The Readiness Prediction Problem

The canonical federal ML problem: predict which maintenance work orders will exceed their estimated completion time. The data is in Navy Jupiter. The output goes into a command dashboard. The contracting officer wants to know why the model flagged a specific work order. The admiral wants to know why the accuracy number in the slide doesn't match what she sees in the fleet.

This chapter walks from raw features through a production-ready scikit-learn Pipeline, MLflow logging, Hyperopt tuning, SHAP explanations, batch scoring, and the failure modes that make federal ML projects fail after the demo succeeds.

scikit-learn Pipeline Pattern

Always build a scikit-learn Pipeline — not separate preprocessing steps and a model. A Pipeline serializes as a single artifact: the same preprocessing that ran on training data runs automatically on scoring data, with no possibility of train/test leakage or preprocessing drift.

python
import mlflow, mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, average_precision_score

# Feature definition — explicit, documented, version-controlled
numeric_features = [
    "labor_hours_estimated",
    "estimated_completion_days",
    "ship_age_years",
    "prior_work_order_count",
    "start_month",           # fiscal month, 1-12
    "data_quality_score",
]
categorical_features = ["hull_class", "maintenance_category"]
target = "exceeded_estimate"   # binary: 1 if work order ran over, 0 if not

df = spark.table("jupiter_catalog.silver.maintenance_work_orders").toPandas()
X = df[numeric_features + categorical_features]
y = df[target].values

# Time-based split — never random split for time-series-dependent data
cutoff_date = "2023-09-30"
train_mask = df["completion_date"] <= cutoff_date
test_mask  = df["start_date"] > cutoff_date
X_train, y_train = X[train_mask], y[train_mask]
X_test,  y_test  = X[test_mask],  y[test_mask]

print(f"Train: {len(X_train):,} rows | Test: {len(X_test):,} rows")
print(f"Train positive rate: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")

# Pipeline: preprocessing + model as single artifact
preprocessor = ColumnTransformer([
    ("num", StandardScaler(), numeric_features),
    ("cat", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1),
     categorical_features),
])

pipeline = Pipeline([
    ("prep", preprocessor),
    ("clf", GradientBoostingClassifier(
        n_estimators=200, max_depth=4, learning_rate=0.05,
        subsample=0.8, min_samples_leaf=20, random_state=42
    ))
])

mlflow.set_experiment("/models/maintenance_overrun")

with mlflow.start_run(run_name="gbt_baseline") as run:
    pipeline.fit(X_train, y_train)
    y_proba = pipeline.predict_proba(X_test)[:, 1]

    test_auc = roc_auc_score(y_test, y_proba)
    test_ap  = average_precision_score(y_test, y_proba)

    mlflow.log_params({
        "n_estimators": 200, "max_depth": 4,
        "learning_rate": 0.05, "train_size": len(X_train),
    })
    mlflow.log_metrics({"test_auc": test_auc, "test_avg_precision": test_ap})

    mlflow.sklearn.log_model(
        pipeline,
        artifact_path="model",
        registered_model_name="maintenance_overrun_classifier",
        input_example=X_test.head(5),
        signature=mlflow.models.infer_signature(X_test, y_proba),
    )

    print(f"Test AUC: {test_auc:.4f}  |  Avg Precision: {test_ap:.4f}")

Hyperparameter Tuning with Hyperopt

Manual grid search is slow. On Databricks, Hyperopt with SparkTrials runs Bayesian optimization — learning which parameter regions are promising — in parallel across your cluster. Every trial is automatically logged to MLflow.

python
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
from sklearn.model_selection import cross_val_score

def objective(params):
    model = Pipeline([
        ("prep", preprocessor),
        ("clf", GradientBoostingClassifier(
            n_estimators=int(params["n_estimators"]),
            max_depth=int(params["max_depth"]),
            learning_rate=params["learning_rate"],
            subsample=params["subsample"],
            random_state=42,
        ))
    ])
    auc = cross_val_score(model, X_train, y_train, cv=3,
                          scoring="roc_auc", n_jobs=-1).mean()
    return {"loss": -auc, "status": STATUS_OK}

search_space = {
    "n_estimators":  hp.quniform("n_estimators", 100, 500, 50),
    "max_depth":     hp.quniform("max_depth", 3, 8, 1),
    "learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
    "subsample":     hp.uniform("subsample", 0.6, 1.0),
}

spark_trials = SparkTrials(parallelism=4)

with mlflow.start_run(run_name="hyperopt_search"):
    best_params = fmin(
        fn=objective, space=search_space,
        algo=tpe.suggest, max_evals=50,
        trials=spark_trials,
    )

print(f"Best parameters: {best_params}")

Stratified Performance Evaluation

The worst-performing slice is what you present to the decision-maker first. If your model performs at AUC 0.91 overall but 0.61 on the hull class that makes up 40% of the fleet, you do not have a deployable model.

python
def stratified_eval_report(model, X_test: pd.DataFrame, y_test: np.ndarray,
                            stratify_col: str, min_slice_size: int = 50) -> pd.DataFrame:
    """
    Compute performance metrics for each value of a stratification column.
    Returns results sorted by AUC ascending — worst slices first.
    """
    y_proba = model.predict_proba(X_test)[:, 1]
    results = []

    for value in X_test[stratify_col].unique():
        mask = X_test[stratify_col] == value
        n    = int(mask.sum())
        if n < min_slice_size:
            continue

        results.append({
            "slice":         value,
            "n":             n,
            "positive_rate": round(float(y_test[mask].mean()), 3),
            "auc":           round(roc_auc_score(y_test[mask], y_proba[mask]), 3),
            "avg_precision": round(average_precision_score(y_test[mask], y_proba[mask]), 3),
        })

    results_df = pd.DataFrame(results).sort_values("auc", ascending=True)
    print(f"\nStratified report by '{stratify_col}' (worst first):")
    print(results_df.to_string(index=False))
    return results_df


# Run before any briefing — not after
hull_report = stratified_eval_report(pipeline, X_test, y_test, "hull_class")

Threshold Selection for Asymmetric Costs

The default 0.5 classification threshold is almost always wrong for government use cases. The cost of a false positive (expediting an unnecessary supply item: $2,000) is not the same as a false negative (missing a Priority 01 item: $50,000). Set the threshold based on the actual cost ratio.

python
from sklearn.metrics import confusion_matrix

def find_operational_threshold(y_true: np.ndarray, y_proba: np.ndarray,
                                cost_fp: float, cost_fn: float) -> float:
    """
    Find the classification threshold that minimizes total operational cost.
    """
    thresholds  = np.linspace(0.05, 0.95, 91)
    total_costs = []

    for t in thresholds:
        y_pred = (y_proba >= t).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        total_costs.append(fp * cost_fp + fn * cost_fn)

    optimal_idx       = int(np.argmin(total_costs))
    optimal_threshold = float(thresholds[optimal_idx])

    print(f"Cost ratio FP:FN = {cost_fp}:{cost_fn}")
    print(f"Optimal threshold: {optimal_threshold:.2f}")
    print(f"  Default 0.50 total cost : {total_costs[45]:,.0f}")
    print(f"  Optimal {optimal_threshold:.2f} total cost: {total_costs[optimal_idx]:,.0f}")

    return optimal_threshold


# Example: expediting a supply item costs $2,000; missing a Priority 01 costs $50,000
operational_threshold = find_operational_threshold(
    y_test, y_proba, cost_fp=2_000, cost_fn=50_000
)

SHAP Explanations

On every federal platform that deploys a model affecting operational decisions, someone will ask "why did it predict this?" SHAP (SHapley Additive exPlanations) is the standard answer. It assigns each feature a contribution value for each individual prediction.

"This requisition was flagged because the vendor's historical on-time rate is 62% (pushes toward late), it's Priority 01 (pushes toward scrutiny), and the item's demand frequency is low (harder to source quickly)." That explanation lands. "The model said so" doesn't.

python
import shap
import matplotlib.pyplot as plt

preprocessor_fitted = pipeline.named_steps["prep"]
classifier          = pipeline.named_steps["clf"]
X_transformed       = preprocessor_fitted.transform(X_test.head(500))

explainer   = shap.TreeExplainer(classifier)
shap_values = explainer.shap_values(X_transformed)

# For binary classification, take class 1 SHAP values
if isinstance(shap_values, list):
    shap_values = shap_values[1]

feature_names = (
    numeric_features +
    list(preprocessor_fitted.named_transformers_["cat"]
         .get_feature_names_out(categorical_features))
)

plt.figure(figsize=(10, 6))
shap.summary_plot(
    shap_values, X_transformed,
    feature_names=feature_names,
    show=False, max_display=10,
)
plt.title("Feature Impact on Maintenance Overrun Prediction")
plt.tight_layout()
plt.savefig("/tmp/shap_summary.png", dpi=150, bbox_inches="tight")
mlflow.log_artifact("/tmp/shap_summary.png")

Palantir Foundry: Training as a Transform

On Foundry, model training runs as a scheduled Transform that reads from the Ontology and writes a versioned model artifact. The model connects to the Ontology, allowing it to be called from Workshop dashboards or AIP Logic functions.

python
from transforms.api import transform, Input, Output
from palantir_models import Model

@transform(
    model_output=Output("/models/maintenance_overrun_classifier"),
    training_data=Input("/analytics/silver/maintenance_features"),
)
def train_maintenance_classifier(training_data, model_output):
    df = training_data.dataframe().toPandas()

    numeric_features     = ["labor_hours_estimated", "estimated_completion_days",
                             "ship_age_years", "prior_work_order_count", "start_month"]
    categorical_features = ["hull_class", "maintenance_category"]
    target               = "exceeded_estimate"

    X = df[numeric_features + categorical_features]
    y = df[target].values

    preprocessor = ColumnTransformer([
        ("num", StandardScaler(), numeric_features),
        ("cat", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1),
         categorical_features),
    ])
    pipeline = Pipeline([
        ("prep", preprocessor),
        ("clf", GradientBoostingClassifier(
            n_estimators=200, max_depth=4, learning_rate=0.05, random_state=42
        ))
    ])
    pipeline.fit(X, y)

    # Foundry wraps models in its own versioned container
    model_output.write_model(Model(pipeline))

Where This Goes Wrong

Failure Mode 1: The Data Cutoff Problem

Training on all available years without accounting for incomplete labels in recent periods. For a contract cost growth model, records from the last 12 months have unusually low positive rates because outcomes haven't resolved yet. Fix: establish a label cutoff — exclude records where the outcome has not yet resolved. Only include contracts that completed at least 12 months ago.

Failure Mode 2: Leaking the Future Into the Past

Including features computed from the full dataset that use information only available after the event you're predicting — e.g., "this vendor's average cost growth across all their contracts" includes future contracts in the average. Fix: use temporal cross-validation (train on years 1–3, test on year 4) and always compute aggregated features using only data available at prediction time.

Failure Mode 3: Deploying Without a Monitoring Plan

Getting model approval, deploying to production, and moving on. Fix: at deployment time, define three things in writing: a drift detection metric, a performance monitoring query, and a retraining trigger. On Databricks, this is a scheduled Workflow job. Schedule it before you close the project.

Platform Comparison

Dimension Advana (Databricks) Palantir Foundry Navy Jupiter
Primary ML framework scikit-learn, XGBoost, MLflow scikit-learn via Code Workspaces scikit-learn, XGBoost, MLflow
Experiment tracking MLflow (native) Foundry versioned artifacts MLflow (native)
Hyperparameter tuning Hyperopt + SparkTrials Custom loops Hyperopt + SparkTrials
Batch scoring Databricks Workflows Foundry Transforms (scheduled) Databricks Workflows
SHAP interpretability Pre-installed (DBR ML) Available via conda Pre-installed (DBR ML)
IL5 support Yes Yes Yes (SIPR)

Exercises

This chapter includes 5 hands-on exercises with full solutions — coding challenges, analysis tasks, and scenario-based problems.

View Exercises on GitHub →