Agenda
- Problem framing & data
- Pipeline architecture
- Feature engineering
- Walk-forward ML design
- Position construction
- Backtest & cost model
- Results: full sample, IS/OOS, attribution
- Robustness & honest statistics
- Reflections & next steps
0. Imports & Setup
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, kpss, acf, pacf
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge, Lasso, ElasticNet
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from scipy.stats import norm, spearmanr
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import warnings; warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 50); pd.set_option("display.width", 200)
RNG = np.random.default_rng(42)
TRADING_DAYS = 252
SEED = 42
# Global trial counter for honest deflated Sharpe
TRIAL_COUNTER = {"n": 0}
def bump_trials(k=1): TRIAL_COUNTER["n"] += k
Conventions:
- All panels are
T × Nwide DataFrames; long format isMultiIndex(date, asset). - Signal at close
t→ trade at closet+1(enforced byposition.shift(1)). - Returns are log returns. AnnRet/AnnVol use additive convention; equity uses compounding.
- All cross-validation uses
embargo = horizonto prevent target leakage.
1. Load & First Look
def load_panel(path, date_col="date"):
df = pd.read_csv(path, parse_dates=[date_col]).set_index(date_col).sort_index()
df = df[~df.index.duplicated(keep="last")]
return df
def first_look(prices):
print("shape :", prices.shape)
print("range :", prices.index.min(), "→", prices.index.max())
# robust frequency detection
freq = pd.infer_freq(prices.index)
if freq is None:
diffs = prices.index.to_series().diff().dropna().dt.days
freq = f"irregular (median Δ={diffs.median()}d)"
print("freq :", freq)
print("dup_idx:", prices.index.duplicated().sum())
return prices.apply(lambda s: pd.Series({
"first": s.first_valid_index(),
"last": s.last_valid_index(),
"n_obs": s.notna().sum(),
"pct_na": s.isna().mean(),
})).T
2. Missing-Value Treatment
2.1 Diagnostics
def panel_missing_report(panel):
return pd.DataFrame({
"n_missing": panel.isna().sum(),
"pct": panel.isna().mean() * 100,
"first_valid":panel.apply(lambda s: s.first_valid_index()),
"last_valid": panel.apply(lambda s: s.last_valid_index()),
"max_gap": panel.apply(lambda s: s.isna().astype(int)
.groupby((~s.isna()).cumsum())
.sum().max()),
}).sort_values("pct", ascending=False)
2.2 Walk-Forward Safe Cleaners
def ffill_capped_panel(panel, max_gap=5):
"""ffill ONLY the first `max_gap` days of each gap; longer gaps stay NaN."""
out = panel.copy()
for c in panel.columns:
s = panel[c]
is_na = s.isna()
# position within each NaN run (1, 2, 3, ...)
grp = (~is_na).cumsum()
pos_in_run = is_na.groupby(grp).cumsum()
ff = s.ffill()
out[c] = ff.where(pos_in_run <= max_gap, np.nan)
return out
def expanding_winsorize_panel(panel, q=0.005, min_history=252):
"""Expanding-window winsorization — no lookahead."""
out = panel.copy()
for c in panel.columns:
s = panel[c]
lo = s.expanding(min_history).quantile(q)
hi = s.expanding(min_history).quantile(1 - q)
out[c] = s.clip(lower=lo, upper=hi)
return out
def build_active_universe(prices, min_history=252, max_gap=21):
"""
For each date t, asset is 'active' iff:
- it has ≥ min_history non-NaN observations up to and including t
- last valid obs is within max_gap days of t
Returns T×N boolean panel — use for masking ANY signal/position.
"""
active = pd.DataFrame(False, index=prices.index, columns=prices.columns)
for c in prices.columns:
s = prices[c]
n_obs = s.notna().cumsum()
last_valid_idx = s.notna().astype(int).replace(0, np.nan).ffill()
# days since last valid
days_since = pd.Series(range(len(s)), index=s.index)
last_pos = pd.Series(np.where(s.notna(), range(len(s)), np.nan),
index=s.index).ffill()
gap = days_since - last_pos
active[c] = (n_obs >= min_history) & (gap <= max_gap)
return active
def clean_panel(prices_raw, drop_pct_first_half=0.5, ffill_cap=5):
"""
Universe selection uses FIRST HALF of sample only (no lookahead into OOS).
Winsorization is expanding (also no lookahead).
"""
rep = {}
prices = prices_raw.sort_index().copy()
prices = prices[~prices.index.duplicated(keep="last")]
n_half = len(prices) // 2
pct_na_first_half = prices.iloc[:n_half].isna().mean()
drop_cols = pct_na_first_half[pct_na_first_half > drop_pct_first_half].index.tolist()
prices = prices.drop(columns=drop_cols)
rep["dropped_assets"] = drop_cols
prices = expanding_winsorize_panel(prices, q=0.001, min_history=252)
prices = ffill_capped_panel(prices, max_gap=ffill_cap)
rep["remaining_pct_na"] = prices.isna().mean().to_dict()
return prices, rep
2.3 Strategy table
| Situation | Fix |
|---|---|
| Asset >50% missing in 1st half | Drop column |
| Whole row missing (holiday) | Drop row |
| Sparse price gaps | ffill_capped (first ≤5 days only) |
| Returns | Fill 0.0, never ffill |
| Asset starts late | Leave leading NaN; active_universe masks |
| Cross-section sparse on a date | XS median in-fold, or drop date |
| ML features | Median impute inside each CV fold |
3. Stationarity
def to_log_returns(panel): return np.log(panel).diff()
def stationarity_diagnosis(s):
s = s.dropna()
try: adf_p = adfuller(s, autolag="AIC")[1]
except: adf_p = np.nan
try: kpss_p = kpss(s, nlags="auto")[1]
except: kpss_p = np.nan
if adf_p < 0.05 and kpss_p > 0.05: v = "stationary"
elif adf_p > 0.05 and kpss_p < 0.05: v = "unit_root -> difference"
elif adf_p < 0.05 and kpss_p < 0.05: v = "trend_stationary -> detrend"
else: v = "inconclusive"
return {"adf_p": adf_p, "kpss_p": kpss_p, "verdict": v}
def panel_stationarity(panel):
return pd.DataFrame({c: stationarity_diagnosis(panel[c]) for c in panel.columns}).T
def half_life(s):
s = s.dropna(); lag = s.shift(1).dropna()
delta = (s - s.shift(1)).dropna()
lag, delta = lag.align(delta, join="inner")
beta = OLS(delta, add_constant(lag)).fit().params.iloc[1]
return -np.log(2)/beta if beta < 0 else np.inf
4. Commodity-Specific: Roll Handling
def detect_roll_dates(front_raw, threshold_jump=0.03):
"""
Detect roll dates as days where front-contract price has an abnormal jump.
Use NON-back-adjusted prices for detection; use back-adjusted for returns.
Returns T×N boolean mask of suspected roll dates.
"""
r = np.log(front_raw).diff().abs()
roll_dates = r > threshold_jump
return roll_dates
def mask_roll_returns(returns, roll_dates):
"""Set returns on suspected roll dates to NaN so they don't feed momentum."""
return returns.where(~roll_dates.reindex_like(returns).fillna(False), np.nan)
Convention:
prices(used for returns/features) = back-adjusted continuous front-month.front_raw,back_raw(used for carry only) = non-adjusted front and 2nd contract.
5. Feature Engineering
5.1 Builder — every feature uses info ≤ time t
def build_panel_features(prices, front_raw=None, back_raw=None, dt_years=1/12):
"""
All features at time t use data through t (not t+1).
Target will be ret_{t+h}, so signal_t * ret_{t,t+h} has no leakage when shifted.
"""
r = np.log(prices).diff()
feats = {}
# ---------- MOMENTUM ----------
for w in [5, 21, 63, 126, 252]:
feats[f"mom_{w}"] = r.rolling(w).sum() # ends at t (uses r_t)
feats["mom_sharpe_63"] = feats["mom_63"] / (r.rolling(63).std() * np.sqrt(TRADING_DAYS))
# ---------- VOLATILITY ----------
for w in [21, 63]:
feats[f"vol_{w}"] = r.rolling(w).std() * np.sqrt(TRADING_DAYS)
feats["volvol_63"] = feats["vol_21"].rolling(63).std()
feats["vol_ratio"] = feats["vol_21"] / feats["vol_63"]
# ---------- MEAN REVERSION ----------
for w in [21, 63]:
ma = prices.rolling(w).mean(); sd = prices.rolling(w).std()
feats[f"zscore_{w}"] = (prices - ma) / sd
# ---------- TERM STRUCTURE (carry — backwardation = positive) ----------
# Sign: front > back ⇒ backwardation ⇒ positive carry ⇒ long-favorable
if front_raw is not None and back_raw is not None:
feats["carry"] = (np.log(front_raw) - np.log(back_raw)) / dt_years
feats["d_carry_21"] = feats["carry"].diff(21)
feats["basis"] = (front_raw - back_raw) / back_raw
# ---------- HIGHER MOMENTS / REGIME ----------
feats["skew_63"] = r.rolling(63).skew()
feats["kurt_63"] = r.rolling(63).kurt()
feats["dd"] = prices / prices.cummax() - 1 # uses ≤ t ✅
return feats
5.2 Cross-sectional transforms (per date)
def cs_demean(feat): return feat.sub(feat.mean(axis=1), axis=0)
def cs_zscore(feat):
mu = feat.mean(axis=1); sd = feat.std(axis=1).replace(0, np.nan)
return feat.sub(mu, axis=0).div(sd, axis=0)
def cs_rank(feat):
return feat.rank(axis=1, pct=True) * 2 - 1
def apply_cs_transform(feats, transform=cs_zscore, active=None):
out = {}
for name, df in feats.items():
d = df if active is None else df.where(active)
out[name] = transform(d)
return out
5.3 Stack to long format
def stack_features(feats):
out = pd.concat({k: v.stack() for k, v in feats.items()}, axis=1)
out.index.names = ["date", "asset"]
return out
def stack_target(y_panel):
s = y_panel.stack(); s.index.names = ["date", "asset"]
return s.rename("y")
5.4 PCA
def panel_pca(returns, n=3):
R = returns.dropna(how="any")
Z = StandardScaler().fit_transform(R)
p = PCA(n_components=n, random_state=SEED).fit(Z)
factors = pd.DataFrame(p.transform(Z), index=R.index,
columns=[f"PC{i+1}" for i in range(n)])
loadings = pd.DataFrame(p.components_.T, index=R.columns, columns=factors.columns)
return factors, loadings, p.explained_variance_ratio_
6. Walk-Forward ML
6.1 Target
def make_panel_target(prices, horizon=1, kind="log"):
if kind == "log":
return np.log(prices).diff(horizon).shift(-horizon)
return prices.pct_change(horizon).shift(-horizon)
6.2 Model factories (all seeded)
def ridge_factory(alpha=1.0): return lambda: Ridge(alpha=alpha, random_state=SEED)
def lasso_factory(alpha=1e-4): return lambda: Lasso(alpha=alpha, max_iter=5000, random_state=SEED)
def en_factory(a=1e-3, l1=.5): return lambda: ElasticNet(alpha=a, l1_ratio=l1, max_iter=5000, random_state=SEED)
def gbm_factory(**kw):
kw.setdefault("random_state", SEED)
return lambda: GradientBoostingRegressor(**kw)
6.3 Pooled panel walk-forward (with embargo)
def panel_walk_forward_predict(X_long, y_long, model_factory,
train_window_days=252*3, step_days=63,
embargo_days=1, mode="expanding",
scale=True, impute="median"):
"""
embargo_days should equal forecast horizon to prevent target overlap.
"""
X_long, y_long = X_long.align(y_long, join="inner", axis=0)
dates = X_long.index.get_level_values("date").unique().sort_values()
preds = pd.Series(index=X_long.index, dtype=float)
start = train_window_days
while start < len(dates):
end = min(start + step_days, len(dates))
tr_start = 0 if mode == "expanding" else max(0, start - train_window_days)
# Apply embargo: drop last `embargo_days` of train
tr_end = max(tr_start + 1, start - embargo_days)
tr_dates = dates[tr_start:tr_end]
te_dates = dates[start:end]
tr_mask = X_long.index.get_level_values("date").isin(tr_dates)
te_mask = X_long.index.get_level_values("date").isin(te_dates)
Xtr, ytr = X_long.loc[tr_mask], y_long.loc[tr_mask]
Xte = X_long.loc[te_mask]
# Drop rows where y is NaN (can't train on them)
valid = ytr.notna()
Xtr, ytr = Xtr.loc[valid], ytr.loc[valid]
if len(Xtr) == 0:
start = end; continue
imp = SimpleImputer(strategy=impute).fit(Xtr)
Xtr_i, Xte_i = imp.transform(Xtr), imp.transform(Xte)
if scale:
sc = StandardScaler().fit(Xtr_i)
Xtr_z, Xte_z = sc.transform(Xtr_i), sc.transform(Xte_i)
else:
Xtr_z, Xte_z = Xtr_i, Xte_i
m = model_factory()
m.fit(Xtr_z, ytr.values)
preds.loc[te_mask] = m.predict(Xte_z)
start = end
return preds.unstack("asset")
6.4 Per-asset walk-forward (same embargo logic)
def walk_forward_predict_single(X, y, model_factory,
train_window=252*3, step=63, embargo=1,
mode="expanding", scale=True, impute="median"):
X, y = X.align(y, join="inner", axis=0)
preds = pd.Series(index=y.index, dtype=float)
n = len(X); start = train_window
while start < n:
end = min(start + step, n)
tr_start = 0 if mode == "expanding" else max(0, start - train_window)
tr_end = max(tr_start + 1, start - embargo)
Xtr, ytr = X.iloc[tr_start:tr_end], y.iloc[tr_start:tr_end]
Xte = X.iloc[start:end]
valid = ytr.notna()
if valid.sum() == 0:
start = end; continue
Xtr, ytr = Xtr.loc[valid], ytr.loc[valid]
imp = SimpleImputer(strategy=impute).fit(Xtr)
Xtr_i, Xte_i = imp.transform(Xtr), imp.transform(Xte)
if scale:
sc = StandardScaler().fit(Xtr_i)
Xtr_z, Xte_z = sc.transform(Xtr_i), sc.transform(Xte_i)
else:
Xtr_z, Xte_z = Xtr_i, Xte_i
m = model_factory()
m.fit(Xtr_z, ytr.values)
preds.iloc[start:end] = m.predict(Xte_z)
start = end
return preds
6.5 IC-based scorer + hyperparameter tuning
def ic_scorer_factory(asset_index):
"""Build a sklearn-compatible scorer that computes mean cross-sectional IC."""
def _scorer(estimator, X, y):
pred = estimator.predict(X)
df = pd.DataFrame({"p": pred, "y": y.values if hasattr(y,"values") else y},
index=asset_index)
ics = []
for dt, sub in df.groupby(level="date"):
if len(sub) >= 5 and sub["y"].notna().sum() >= 5:
ic, _ = spearmanr(sub["p"], sub["y"], nan_policy="omit")
if np.isfinite(ic): ics.append(ic)
return np.mean(ics) if ics else -1.0
return _scorer
def panel_tune_hyperparams(X_long, y_long, param_grid=None, n_splits=5,
horizon=1, scoring="ic"):
"""
Tune with embargo = horizon, custom IC scorer (or MSE).
"""
if param_grid is None:
param_grid = {"model__alpha": [0.01, 0.1, 1, 10, 100]}
dates = X_long.index.get_level_values("date").unique().sort_values()
splits = []
for tr_idx, te_idx in TimeSeriesSplit(n_splits=n_splits, gap=horizon).split(dates):
tr_d, te_d = dates[tr_idx], dates[te_idx]
tr_rows = np.where(X_long.index.get_level_values("date").isin(tr_d))[0]
te_rows = np.where(X_long.index.get_level_values("date").isin(te_d))[0]
splits.append((tr_rows, te_rows))
pipe = Pipeline([("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
("model", Ridge(random_state=SEED))])
scorer = ic_scorer_factory(X_long.index) if scoring == "ic" else "neg_mean_squared_error"
gs = GridSearchCV(pipe, param_grid, cv=splits, scoring=scorer, n_jobs=-1)
gs.fit(X_long, y_long.fillna(0))
bump_trials(len(gs.cv_results_["params"])) # honest trial accounting
return gs.best_params_, gs
6.6 IC helpers + statistical test
def panel_ic_per_date(pred_panel, y_panel, method="spearman", min_n=5):
ics = {}
for dt in pred_panel.index:
p, r = pred_panel.loc[dt], y_panel.loc[dt]
ok = p.notna() & r.notna()
if ok.sum() >= min_n:
if method == "spearman":
ic, _ = spearmanr(p[ok], r[ok])
else:
ic = p[ok].corr(r[ok], method=method)
ics[dt] = ic
return pd.Series(ics).sort_index()
def hac_tstat(series, lags=5):
"""Newey-West t-stat for H0: mean=0."""
x = series.dropna().values
n = len(x); mu = x.mean(); e = x - mu
gamma0 = (e**2).sum() / n
var = gamma0
for L in range(1, lags+1):
w = 1 - L/(lags+1)
gamma = (e[L:] * e[:-L]).sum() / n
var += 2 * w * gamma
se = np.sqrt(var / n)
return mu / se if se > 0 else np.nan
def panel_ic_summary(ic_series, hac_lags=5):
return pd.Series({
"mean_IC": ic_series.mean(),
"std_IC": ic_series.std(),
"IR": ic_series.mean()/ic_series.std() if ic_series.std()>0 else np.nan,
"hit%": (ic_series > 0).mean(),
"t_HAC": hac_tstat(ic_series, lags=hac_lags),
"n": len(ic_series),
})
def ic_decay(pred_panel, prices, horizons=(1,2,5,10,21,63)):
rows = []
for h in horizons:
y_h = make_panel_target(prices, horizon=h, kind="log")
ic = panel_ic_per_date(pred_panel, y_h)
rows.append({"horizon": h, "mean_IC": ic.mean(),
"IR": ic.mean()/ic.std() if ic.std()>0 else np.nan,
"t_HAC": hac_tstat(ic, lags=10)})
return pd.DataFrame(rows)
7. Prediction → Position
7.1 Risk-aware dollar-neutral builder
def panel_pred_to_position(pred_panel, ret_panel,
gross_target=2.0,
sizing="rank",
vol_floor=0.05,
vol_window=21,
ewm_half_life=None,
threshold=None,
neutralize="risk", # "risk" | "dollar" | None
max_weight=0.20,
min_active_names=5,
active=None):
"""
Order of ops:
1. EWM smooth
2. CS normalize (rank/zscore)
3. Threshold
4. Mask inactive universe
5. NEUTRALIZE FIRST (in signal space)
6. Inverse-vol weight
7. Re-neutralize (in risk-weighted space)
8. Per-name cap, min-names check, gross renormalize
"""
s = pred_panel.copy()
# 1. SMOOTH
if ewm_half_life is not None:
s = s.ewm(halflife=ewm_half_life, min_periods=5).mean()
# 2. CS NORMALIZE
if sizing == "rank":
s = s.rank(axis=1, pct=True) * 2 - 1
elif sizing == "zscore":
s = s.sub(s.mean(axis=1), axis=0).div(s.std(axis=1).replace(0,np.nan), axis=0)
# 3. THRESHOLD
if threshold is not None:
s = s.where(s.abs() > threshold, 0.0)
# 4. UNIVERSE MASK
if active is not None:
s = s.where(active.reindex_like(s).fillna(False), 0.0)
# 5. NEUTRALIZE in signal space
if neutralize in ("dollar", "risk"):
s = s.sub(s.mean(axis=1), axis=0)
# 6. INVERSE-VOL WEIGHTING (use only past info)
vol = ret_panel.rolling(vol_window, min_periods=vol_window).std() * np.sqrt(TRADING_DAYS)
vol = vol.shift(1).clip(lower=vol_floor)
s = s.div(vol)
# 7. RE-NEUTRALIZE (risk-weighted): subtract vol-weighted mean
if neutralize == "risk":
s = s.sub(s.mean(axis=1), axis=0)
# 8a. Per-name cap (relative to gross)
cap = s.abs().sum(axis=1) * max_weight
s = s.clip(lower=-cap, upper=cap, axis=0)
# 8b. Min active names guard
n_active = (s.abs() > 1e-8).sum(axis=1)
s = s.where(n_active.ge(min_active_names), 0.0, axis=0)
# 8c. Gross renormalize
gross = s.abs().sum(axis=1).replace(0, np.nan)
s = s.div(gross, axis=0) * gross_target
return s.fillna(0.0)
7.2 Quantile L/S (factor baseline)
def quantile_portfolio(pred_panel, n_buckets=5, long_only=False,
equal_weight=True, dollar_neutral_equal_gross=False):
ranks = pred_panel.rank(axis=1, pct=True)
long = (ranks > 1 - 1/n_buckets).astype(float)
short = (ranks < 1/n_buckets).astype(float)
if equal_weight:
long = long.div(long.sum(axis=1).replace(0,np.nan), axis=0)
short = short.div(short.sum(axis=1).replace(0,np.nan), axis=0)
if dollar_neutral_equal_gross:
# +1/N_long for longs, -1/N_short for shorts, gross = 2
return (long - short).fillna(0.0)
return (long if long_only else long - short).fillna(0.0)
7.3 Sector neutralization
def sector_neutralize(pred_panel, sector_map):
sectors = pd.Series(sector_map).reindex(pred_panel.columns)
out = pred_panel.copy()
for sec, group in sectors.groupby(sectors):
cols = group.index.tolist()
out[cols] = pred_panel[cols].sub(pred_panel[cols].mean(axis=1), axis=0)
return out
# Default commodity sectors — customize for your universe
COMMODITY_SECTORS = {
"CL":"Energy","HO":"Energy","NG":"Energy","RB":"Energy","BZ":"Energy",
"GC":"Metals","SI":"Metals","HG":"Metals","PL":"Metals","PA":"Metals",
"C":"Ags","W":"Ags","S":"Ags","SM":"Ags","BO":"Ags","KW":"Ags",
"SB":"Softs","KC":"Softs","CC":"Softs","CT":"Softs","OJ":"Softs",
"LE":"Livestock","HE":"Livestock","GF":"Livestock",
}
8. Backtester (Realistic Costs)
def panel_backtest(position, ret_panel,
cost_bps=1.0,
impact_coef=10.0, # bps per unit turnover (sqrt model)
borrow_bps_annual=50.0, # cost on short notional
funding_bps_annual=0.0, # cost on gross beyond 1.0
target_vol=None,
gross_cap=None,
max_scale=3.0,
min_vol_history=63):
"""
Costs: spread (linear) + impact (sqrt) + borrow (on shorts) + funding (on excess gross).
All applied per-asset, then summed.
"""
pos = position.shift(1).fillna(0.0)
# Vol target (uses past PnL only via .shift(1))
if target_vol is not None:
pnl_raw = (pos * ret_panel).sum(axis=1)
rv = pnl_raw.rolling(min_vol_history, min_periods=min_vol_history).std() \
* np.sqrt(TRADING_DAYS)
scale = (target_vol / rv).clip(0, max_scale).shift(1).fillna(1.0)
pos = pos.mul(scale, axis=0)
if gross_cap is not None:
gross = pos.abs().sum(axis=1)
scl = (gross_cap / gross).clip(upper=1.0).fillna(1.0)
pos = pos.mul(scl, axis=0)
# ----- TURNOVER -----
turnover_per_asset = pos.diff().abs().fillna(pos.abs())
# ----- COSTS -----
spread_cost = turnover_per_asset * (cost_bps / 1e4)
impact_cost = (turnover_per_asset ** 1.5) * (impact_coef / 1e4) # sqrt-impact on $-turnover
short_notnl = pos.clip(upper=0).abs()
borrow_cost = short_notnl * (borrow_bps_annual / 1e4 / TRADING_DAYS)
excess_gross = (pos.abs().sum(axis=1) - 1.0).clip(lower=0)
funding_cost = pd.DataFrame(
np.outer(excess_gross.values, np.ones(pos.shape[1])) / pos.shape[1],
index=pos.index, columns=pos.columns
) * (funding_bps_annual / 1e4 / TRADING_DAYS)
costs_per_asset = spread_cost + impact_cost + borrow_cost + funding_cost
gross_pnl_pa = pos * ret_panel
pnl_per_asset = gross_pnl_pa - costs_per_asset
summary = pd.DataFrame({
"pnl": pnl_per_asset.sum(axis=1),
"gross_pnl": gross_pnl_pa.sum(axis=1),
"spread": spread_cost.sum(axis=1),
"impact": impact_cost.sum(axis=1),
"borrow": borrow_cost.sum(axis=1),
"funding": funding_cost.sum(axis=1),
"costs": costs_per_asset.sum(axis=1),
"turnover": turnover_per_asset.sum(axis=1),
"gross_lev": pos.abs().sum(axis=1),
"net_exp": pos.sum(axis=1),
"n_active": (pos.abs() > 1e-8).sum(axis=1),
})
# Compounded equity for proper drawdown/return display
summary["equity_compound"] = (1 + summary["pnl"]).cumprod()
summary["equity_additive"] = summary["pnl"].cumsum()
return summary, pnl_per_asset
def cost_sweep_panel(position, ret_panel, bps_list=(0, 0.5, 1, 2, 5, 10)):
rows = []
for bps in bps_list:
s, _ = panel_backtest(position, ret_panel, cost_bps=bps,
impact_coef=0, borrow_bps_annual=0)
rows.append({"cost_bps": bps, "Sharpe": sharpe(s.pnl),
"AnnRet": s.pnl.mean()*TRADING_DAYS, "Turn": s.turnover.mean()})
return pd.DataFrame(rows)
9. Performance Analytics
def sharpe(pnl, f=TRADING_DAYS):
pnl = pnl.dropna()
return pnl.mean()/pnl.std()*np.sqrt(f) if pnl.std() > 0 else np.nan
def max_drawdown_compound(pnl):
"""Compounded drawdown in %, correct for reporting."""
eq = (1 + pnl.dropna()).cumprod()
return (eq/eq.cummax() - 1).min()
def max_drawdown_additive(pnl):
eq = pnl.dropna().cumsum()
return (eq - eq.cummax()).min()
def perf_stats(pnl, f=TRADING_DAYS):
pnl = pnl.dropna()
if len(pnl) < 2:
return pd.Series(dtype=float)
ann_ret = pnl.mean() * f
ann_vol = pnl.std() * np.sqrt(f)
sr = ann_ret / ann_vol if ann_vol > 0 else np.nan
mdd_c = max_drawdown_compound(pnl)
mdd_a = max_drawdown_additive(pnl)
calmar = ann_ret / abs(mdd_c) if mdd_c < 0 else np.nan
sr_se = np.sqrt((1 + 0.5*sr**2) / len(pnl)) if len(pnl) > 1 else np.nan
return pd.Series({
"AnnReturn": ann_ret,
"AnnVol": ann_vol,
"Sharpe": sr,
"MaxDD_cmp": mdd_c,
"MaxDD_add": mdd_a,
"Calmar": calmar,
"Hit%": (pnl > 0).mean(),
"Sharpe_SE": sr_se,
"t_HAC": hac_tstat(pnl, lags=10),
"Skew": pnl.skew(),
"ExcessKurt": pnl.kurt(),
"N": len(pnl),
})
def per_asset_perf(pnl_per_asset):
return pd.DataFrame({c: perf_stats(pnl_per_asset[c]) for c in pnl_per_asset.columns}).T
# ----- Block bootstrap for autocorrelated returns -----
def stationary_bootstrap_sharpe(pnl, n=2000, mean_block=None, ci=0.95):
"""Politis-Romano stationary bootstrap. Block size auto = sqrt(T)."""
arr = pnl.dropna().values
T = len(arr)
if mean_block is None:
mean_block = max(2, int(np.sqrt(T)))
p = 1.0 / mean_block
sims = np.empty(n)
for i in range(n):
idx = np.empty(T, dtype=int)
idx[0] = RNG.integers(0, T)
for j in range(1, T):
if RNG.random() < p:
idx[j] = RNG.integers(0, T)
else:
idx[j] = (idx[j-1] + 1) % T
sims[i] = sharpe(pd.Series(arr[idx]))
lo, hi = np.quantile(sims, [(1-ci)/2, 1-(1-ci)/2])
return sharpe(pnl), lo, hi
# ----- Deflated Sharpe (correct formula, honest trial count) -----
def deflated_sharpe(sr, T, n_trials, skew=0.0, excess_kurt=0.0):
"""
Bailey & Lopez de Prado (2014).
sr: observed annualized Sharpe (will be de-annualized internally? No — pass DAILY SR).
T: number of OBSERVATIONS (days), not years.
excess_kurt: excess kurtosis (i.e., kurt - 3). pd.Series.kurt() returns excess already.
"""
if n_trials < 1 or T < 2:
return np.nan
emc = 0.5772156649
# Expected max Sharpe under null over n_trials independent strategies
sr_star = ((1 - emc) * norm.ppf(1 - 1.0/n_trials) +
emc * norm.ppf(1 - 1.0/(n_trials * np.e))) / np.sqrt(T)
num = (sr - sr_star) * np.sqrt(T - 1)
den = np.sqrt(1 - skew*sr + ((excess_kurt)/4.0) * sr**2)
if den <= 0 or not np.isfinite(den):
return np.nan
return float(norm.cdf(num / den))
def report_deflated_sharpe(pnl, n_trials):
pnl = pnl.dropna()
daily_sr = pnl.mean()/pnl.std() if pnl.std()>0 else np.nan
return {
"daily_sharpe": daily_sr,
"ann_sharpe": daily_sr * np.sqrt(TRADING_DAYS),
"n_trials": n_trials,
"T": len(pnl),
"deflated_prob": deflated_sharpe(daily_sr, len(pnl), n_trials,
skew=pnl.skew(),
excess_kurt=pnl.kurt()),
}
10. PnL Attribution & Factor Regression
def pnl_attribution_by_signal(feats_cs, ret_panel, top_signals=None,
gross_target=2.0, cost_bps=1.0):
"""
For each feature, build a single-signal portfolio and report perf.
Reveals which raw signals carry the alpha.
"""
rows = []
names = list(feats_cs.keys()) if top_signals is None else top_signals
for name in names:
pred = feats_cs[name]
pos = panel_pred_to_position(pred, ret_panel, gross_target=gross_target,
sizing="zscore", ewm_half_life=3)
bt, _ = panel_backtest(pos, ret_panel, cost_bps=cost_bps,
impact_coef=0, borrow_bps_annual=0)
st = perf_stats(bt.pnl)
rows.append({"signal": name, **st.to_dict()})
return pd.DataFrame(rows).sort_values("Sharpe", ascending=False)
def pnl_attribution_by_sector(pnl_per_asset, sector_map):
sectors = pd.Series(sector_map).reindex(pnl_per_asset.columns).fillna("Other")
out = {}
for sec in sectors.unique():
cols = sectors[sectors == sec].index
out[sec] = pnl_per_asset[cols].sum(axis=1)
return pd.DataFrame(out)
def pnl_attribution_long_short(pos, ret_panel, cost_bps=1.0):
pos_lag = pos.shift(1).fillna(0)
long_pos = pos_lag.clip(lower=0)
short_pos = pos_lag.clip(upper=0)
long_pnl = (long_pos * ret_panel).sum(axis=1)
short_pnl = (short_pos * ret_panel).sum(axis=1)
return pd.DataFrame({"long_side": long_pnl, "short_side": short_pnl,
"total": long_pnl + short_pnl})
def factor_regression(pnl, factor_returns):
"""
OLS: pnl_t = alpha + sum_k beta_k * factor_k_t
Returns alpha (annualized), t-stats, betas.
"""
df = pd.concat([pnl.rename("pnl"), factor_returns], axis=1).dropna()
if len(df) < 30: return None
X = add_constant(df.drop(columns="pnl"))
res = OLS(df["pnl"], X).fit(cov_type="HAC", cov_kwds={"maxlags": 10})
out = pd.DataFrame({
"coef": res.params,
"t": res.tvalues,
"p": res.pvalues,
})
out.loc["const", "ann_alpha"] = res.params["const"] * TRADING_DAYS
return out, res
11. Exposure & Stability Analytics
def exposure_diagnostics(position):
pos = position.shift(1).fillna(0)
gross = pos.abs().sum(axis=1)
return pd.Series({
"avg_gross": gross.mean(),
"avg_net": pos.sum(axis=1).mean(),
"avg_n_active": (pos.abs() > 1e-8).sum(axis=1).mean(),
"avg_top_name_pct": (pos.abs().max(axis=1) / gross.replace(0, np.nan)).mean(),
"days_concentrated": ((pos.abs().max(axis=1) / gross.replace(0,np.nan)) > 0.30).sum(),
"avg_holding_days": (1.0 / (pos.diff().abs().sum(axis=1) /
gross.replace(0,np.nan)).mean()),
})
def signal_stability(pred_panel):
"""Day-over-day signal autocorrelation, per asset, averaged."""
autocorrs = {}
for c in pred_panel.columns:
s = pred_panel[c].dropna()
if len(s) > 10:
autocorrs[c] = s.autocorr(lag=1)
return pd.Series(autocorrs)
12. Plots (with HTML export)
§12 — Performance Visualization Suite (Complete, with Timestamp Fixes)
# ============================================================
# §12 — Performance Visualization Suite
import os
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from scipy.stats import norm
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from datetime import datetime
TRADING_DAYS = 252
# ------------------------------------------------------------
# Utilities
# ------------------------------------------------------------
def save_or_show(fig, path=None):
if path:
fig.write_html(path)
else:
try:
fig.show()
except Exception:
pass
return fig
def _ann_sharpe(s, f=TRADING_DAYS):
s = s.dropna()
return s.mean() / s.std() * np.sqrt(f) if s.std() > 0 else np.nan
def _to_pydt(x):
"""Convert pd.Timestamp to python datetime for plotly shape compatibility."""
if isinstance(x, pd.Timestamp):
return x.to_pydatetime()
return x
def _add_vline_shape(fig, x, row=None, col=None,
line_dash="dash", line_color="red",
annotation_text=None, line_width=1.5):
"""
Plotly's add_vline does internal arithmetic that breaks with pd.Timestamp.
Use add_shape with paper-referenced y instead. Works in any subplot.
"""
x = _to_pydt(x)
shape_kwargs = dict(
type="line", x0=x, x1=x, y0=0, y1=1,
yref="y domain" if row else "paper",
line=dict(dash=line_dash, color=line_color, width=line_width),
)
if row is not None and col is not None:
fig.add_shape(**shape_kwargs, row=row, col=col)
if annotation_text:
fig.add_annotation(
x=x, y=1, yref="y domain",
text=annotation_text, showarrow=False,
xanchor="left", yanchor="bottom",
font=dict(color=line_color, size=11),
row=row, col=col,
)
else:
fig.add_shape(**shape_kwargs)
if annotation_text:
fig.add_annotation(
x=x, y=1, yref="paper",
text=annotation_text, showarrow=False,
xanchor="left", yanchor="bottom",
font=dict(color=line_color, size=11),
)
return fig
def _add_vrect_shape(fig, x0, x1, row=None, col=None,
fillcolor="steelblue", opacity=0.08):
"""Shape-based vrect for Timestamp x-values."""
x0, x1 = _to_pydt(x0), _to_pydt(x1)
shape_kwargs = dict(
type="rect", x0=x0, x1=x1, y0=0, y1=1,
yref="y domain" if row else "paper",
fillcolor=fillcolor, opacity=opacity, line_width=0,
)
if row is not None and col is not None:
fig.add_shape(**shape_kwargs, row=row, col=col)
else:
fig.add_shape(**shape_kwargs)
return fig
# ------------------------------------------------------------
# 12.1 Equity Curve — Before vs After Costs
# ------------------------------------------------------------
def equity_before_after_costs(bt_summary, title="Equity: Gross vs Net of Costs"):
gross_eq = (1 + bt_summary["gross_pnl"]).cumprod()
net_eq = (1 + bt_summary["pnl"]).cumprod()
cost_cum = bt_summary["costs"].cumsum()
fig = make_subplots(
rows=2, cols=1, shared_xaxes=True,
row_heights=[0.7, 0.3], vertical_spacing=0.05,
subplot_titles=["Compounded Equity (Gross vs Net)", "Cumulative Costs"],
)
fig.add_trace(go.Scatter(x=gross_eq.index, y=gross_eq,
name="Gross (pre-cost)",
line=dict(color="steelblue", width=2)), 1, 1)
fig.add_trace(go.Scatter(x=net_eq.index, y=net_eq,
name="Net (post-cost)",
line=dict(color="darkorange", width=2)), 1, 1)
fig.add_trace(go.Scatter(x=cost_cum.index, y=cost_cum,
name="Cumulative cost",
fill="tozeroy",
line=dict(color="crimson", width=1)), 2, 1)
total_drag = (gross_eq.iloc[-1] - net_eq.iloc[-1]) / gross_eq.iloc[-1] * 100
fig.add_annotation(
text=f"Total cost drag: {total_drag:.1f}% of gross final equity",
xref="paper", yref="paper", x=0.02, y=0.98,
showarrow=False, bgcolor="rgba(255,255,200,0.6)",
)
fig.update_layout(template="plotly_white", height=600, title=title,
hovermode="x unified", legend=dict(orientation="h", y=1.05))
return fig
# ------------------------------------------------------------
# 12.2 Equity + Drawdown + Underwater Duration (FIXED)
# ------------------------------------------------------------
def equity_drawdown_underwater(pnl, title="Equity, Drawdown & Underwater Duration"):
pnl = pnl.dropna()
eq = (1 + pnl).cumprod()
dd = eq / eq.cummax() - 1
under = (eq < eq.cummax()).astype(int)
underwater_days = under.groupby((under == 0).cumsum()).cumsum()
fig = make_subplots(
rows=3, cols=1, shared_xaxes=True,
row_heights=[0.5, 0.25, 0.25], vertical_spacing=0.04,
subplot_titles=["Compounded equity",
"Drawdown (%)",
"Underwater duration (days since prior high)"],
)
fig.add_trace(go.Scatter(x=eq.index, y=eq,
line=dict(color="navy", width=2),
name="Equity"), 1, 1)
fig.add_trace(go.Scatter(x=dd.index, y=dd * 100, fill="tozeroy",
line=dict(color="crimson"),
name="Drawdown"), 2, 1)
fig.add_trace(go.Scatter(x=underwater_days.index, y=underwater_days,
fill="tozeroy",
line=dict(color="gray"),
name="Underwater days"), 3, 1)
# Mark worst drawdown using add_shape (Timestamp-safe)
worst_dd_idx = dd.idxmin()
_add_vline_shape(
fig, worst_dd_idx, row=2, col=1,
line_dash="dash", line_color="red",
annotation_text=f"Worst DD: {dd.min() * 100:.1f}%",
)
fig.update_layout(template="plotly_white", height=750, title=title,
showlegend=False, hovermode="x unified")
return fig
# ------------------------------------------------------------
# 12.3 Long vs Short Decomposition
# ------------------------------------------------------------
def long_short_decomposition(position, ret_panel, title="Long-Side vs Short-Side PnL"):
pos = position.shift(1).fillna(0)
long_pos = pos.clip(lower=0)
short_pos = pos.clip(upper=0)
long_pnl = (long_pos * ret_panel).sum(axis=1)
short_pnl = (short_pos * ret_panel).sum(axis=1)
total_pnl = long_pnl + short_pnl
long_eq = (1 + long_pnl).cumprod()
short_eq = (1 + short_pnl).cumprod()
tot_eq = (1 + total_pnl).cumprod()
yearly = pd.DataFrame({
"Long": long_pnl.resample("YE").sum(),
"Short": short_pnl.resample("YE").sum(),
"Total": total_pnl.resample("YE").sum(),
})
yearly.index = yearly.index.year
fig = make_subplots(
rows=2, cols=2,
row_heights=[0.6, 0.4],
specs=[[{"colspan": 2}, None], [{}, {}]],
subplot_titles=["Compounded equity by side",
"Annual PnL contribution",
"Sharpe by side"],
)
fig.add_trace(go.Scatter(x=tot_eq.index, y=tot_eq,
name="Total", line=dict(color="black", width=2.5)), 1, 1)
fig.add_trace(go.Scatter(x=long_eq.index, y=long_eq,
name="Long side", line=dict(color="green", width=1.8)), 1, 1)
fig.add_trace(go.Scatter(x=short_eq.index, y=short_eq,
name="Short side", line=dict(color="firebrick", width=1.8)), 1, 1)
fig.add_trace(go.Bar(x=yearly.index, y=yearly["Long"],
name="Long", marker_color="green", showlegend=False), 2, 1)
fig.add_trace(go.Bar(x=yearly.index, y=yearly["Short"],
name="Short", marker_color="firebrick", showlegend=False), 2, 1)
sr_data = pd.Series({
"Long": _ann_sharpe(long_pnl),
"Short": _ann_sharpe(short_pnl),
"Total": _ann_sharpe(total_pnl),
})
fig.add_trace(go.Bar(x=sr_data.index, y=sr_data.values,
marker_color=["green", "firebrick", "navy"],
text=[f"{v:.2f}" for v in sr_data.values],
textposition="outside",
showlegend=False), 2, 2)
fig.update_layout(barmode="relative", template="plotly_white",
height=750, title=title, hovermode="x unified")
fig.update_yaxes(title_text="Sharpe", row=2, col=2)
return fig
# ------------------------------------------------------------
# 12.4 Hit Rate & IC by Year
# ------------------------------------------------------------
def hit_rate_by_year(pnl, ic_series=None, title="Hit Rate & IC by Year"):
pnl = pnl.dropna()
annual_hit = pnl.groupby(pnl.index.year).apply(lambda s: (s > 0).mean())
annual_sr = pnl.groupby(pnl.index.year).apply(_ann_sharpe)
annual_ret = pnl.groupby(pnl.index.year).sum()
rows = 2 if ic_series is not None else 1
subplot_titles = ["Daily hit rate (%) by year", "Annual return & Sharpe"]
if ic_series is not None:
subplot_titles += ["IC mean by year", "IC hit rate by year"]
fig = make_subplots(rows=rows, cols=2, subplot_titles=subplot_titles)
colors = ["green" if h > 0.5 else "firebrick" for h in annual_hit]
fig.add_trace(go.Bar(x=annual_hit.index, y=annual_hit * 100,
marker_color=colors,
text=[f"{v * 100:.1f}%" for v in annual_hit],
textposition="outside",
name="Hit rate"), 1, 1)
fig.add_hline(y=50, line_dash="dash", line_color="gray", row=1, col=1)
fig.add_trace(go.Bar(x=annual_ret.index, y=annual_ret * 100,
marker_color=["green" if r > 0 else "firebrick" for r in annual_ret],
name="Ann Return %"), 1, 2)
fig.add_trace(go.Scatter(x=annual_sr.index, y=annual_sr,
mode="lines+markers",
line=dict(color="black", width=2),
name="Sharpe"), 1, 2)
if ic_series is not None:
ic = ic_series.dropna()
annual_ic = ic.groupby(ic.index.year).mean()
annual_ic_hit = ic.groupby(ic.index.year).apply(lambda s: (s > 0).mean())
fig.add_trace(go.Bar(x=annual_ic.index, y=annual_ic,
marker_color=["green" if v > 0 else "firebrick" for v in annual_ic],
text=[f"{v:.3f}" for v in annual_ic],
textposition="outside",
name="Mean IC"), 2, 1)
fig.add_hline(y=0, line_dash="dash", line_color="gray", row=2, col=1)
fig.add_trace(go.Bar(x=annual_ic_hit.index, y=annual_ic_hit * 100,
marker_color=["green" if v > 0.5 else "firebrick" for v in annual_ic_hit],
text=[f"{v * 100:.1f}%" for v in annual_ic_hit],
textposition="outside",
name="IC hit%"), 2, 2)
fig.add_hline(y=50, line_dash="dash", line_color="gray", row=2, col=2)
fig.update_layout(template="plotly_white", height=350 * rows, title=title,
showlegend=False)
return fig
# ------------------------------------------------------------
# 12.5 Monthly PnL Heatmap (+ YTD)
# ------------------------------------------------------------
def monthly_pnl_heatmap(pnl, title="Monthly PnL Heatmap (%)"):
pnl = pnl.dropna()
monthly = pnl.resample("ME").sum() * 100
table = pd.DataFrame({
"year": monthly.index.year,
"month": monthly.index.month,
"ret": monthly.values,
}).pivot(index="year", columns="month", values="ret")
month_labels = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
table.columns = [month_labels[m - 1] for m in table.columns]
table["YTD"] = table.sum(axis=1)
fig = px.imshow(
table.values,
x=table.columns, y=table.index,
text_auto=".1f",
color_continuous_scale="RdYlGn",
color_continuous_midpoint=0,
aspect="auto",
title=title,
)
fig.update_layout(template="plotly_white", height=450,
xaxis_title="Month", yaxis_title="Year")
return fig
# ------------------------------------------------------------
# 12.6 Rolling 1-Year Metrics
# ------------------------------------------------------------
def rolling_metrics(pnl, window=252, title="Rolling 1Y Performance Metrics"):
pnl = pnl.dropna()
roll_ret = pnl.rolling(window).mean() * TRADING_DAYS
roll_vol = pnl.rolling(window).std() * np.sqrt(TRADING_DAYS)
roll_sr = roll_ret / roll_vol
roll_hit = (pnl > 0).rolling(window).mean()
roll_skew = pnl.rolling(window).skew()
fig = make_subplots(
rows=2, cols=2,
subplot_titles=["Rolling Sharpe (1Y)",
"Rolling Ann Return & Vol",
"Rolling Hit Rate",
"Rolling Skew"],
)
fig.add_trace(go.Scatter(x=roll_sr.index, y=roll_sr,
line=dict(color="navy", width=2)), 1, 1)
fig.add_hline(y=0, line_dash="dash", line_color="gray", row=1, col=1)
fig.add_hline(y=roll_sr.mean(), line_dash="dot", line_color="red",
annotation_text=f"avg={roll_sr.mean():.2f}", row=1, col=1)
fig.add_trace(go.Scatter(x=roll_ret.index, y=roll_ret * 100,
name="Ann Ret (%)", line=dict(color="green")), 1, 2)
fig.add_trace(go.Scatter(x=roll_vol.index, y=roll_vol * 100,
name="Ann Vol (%)", line=dict(color="firebrick")), 1, 2)
fig.add_trace(go.Scatter(x=roll_hit.index, y=roll_hit * 100,
line=dict(color="darkorange", width=2)), 2, 1)
fig.add_hline(y=50, line_dash="dash", line_color="gray", row=2, col=1)
fig.add_trace(go.Scatter(x=roll_skew.index, y=roll_skew,
line=dict(color="purple", width=2)), 2, 2)
fig.add_hline(y=0, line_dash="dash", line_color="gray", row=2, col=2)
fig.update_layout(template="plotly_white", height=700, title=title,
showlegend=True, hovermode="x unified")
return fig
# ------------------------------------------------------------
# 12.7 Cost Breakdown Stacked Area
# ------------------------------------------------------------
def cost_breakdown_plot(bt_summary, title="Daily Cost Breakdown"):
cost_cols = [c for c in ["spread", "impact", "borrow", "funding"]
if c in bt_summary.columns]
if not cost_cols:
fig = go.Figure(go.Scatter(x=bt_summary.index,
y=bt_summary["costs"].cumsum(),
fill="tozeroy", line=dict(color="crimson")))
fig.update_layout(template="plotly_white", height=400,
title="Cumulative costs")
return fig
cost_df = bt_summary[cost_cols].copy()
cost_smooth = cost_df.rolling(21).mean() * 10_000 # bps
fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
row_heights=[0.6, 0.4], vertical_spacing=0.05,
subplot_titles=["Daily costs (bps, 21d MA)",
"Cumulative cost contribution"])
colors = {"spread": "steelblue", "impact": "darkorange",
"borrow": "firebrick", "funding": "purple"}
for c in cost_cols:
fig.add_trace(go.Scatter(x=cost_smooth.index, y=cost_smooth[c],
stackgroup="one", name=c,
line=dict(color=colors[c], width=0.5)), 1, 1)
cum = cost_df.cumsum()
for c in cost_cols:
fig.add_trace(go.Scatter(x=cum.index, y=cum[c],
name=f"cum {c}",
line=dict(color=colors[c], width=2),
showlegend=False), 2, 1)
fig.update_layout(template="plotly_white", height=600, title=title,
hovermode="x unified", legend=dict(orientation="h"))
return fig
# ------------------------------------------------------------
# 12.8 IS vs OOS Comparison (FIXED)
# ------------------------------------------------------------
def is_oos_comparison(pnl, split_frac=0.7, title="In-Sample vs Out-of-Sample"):
pnl = pnl.dropna()
split = int(len(pnl) * split_frac)
split_date = pnl.index[split]
is_pnl, oos_pnl = pnl.iloc[:split], pnl.iloc[split:]
eq = (1 + pnl).cumprod()
fig = make_subplots(
rows=2, cols=2,
row_heights=[0.55, 0.45],
specs=[[{"colspan": 2}, None], [{}, {}]],
subplot_titles=["Equity curve (IS shaded blue, OOS shaded orange)",
"IS vs OOS — key metrics",
"IS vs OOS — return distributions"],
)
fig.add_trace(go.Scatter(x=eq.index, y=eq,
line=dict(color="black", width=2)), 1, 1)
# Timestamp-safe shading + split line
_add_vrect_shape(fig, eq.index[0], split_date,
row=1, col=1, fillcolor="steelblue", opacity=0.08)
_add_vrect_shape(fig, split_date, eq.index[-1],
row=1, col=1, fillcolor="darkorange", opacity=0.10)
_add_vline_shape(fig, split_date, row=1, col=1,
line_dash="dash", line_color="red",
annotation_text="IS / OOS")
metrics = pd.DataFrame({
"IS": [is_pnl.mean() * TRADING_DAYS * 100,
is_pnl.std() * np.sqrt(TRADING_DAYS) * 100,
_ann_sharpe(is_pnl),
(is_pnl > 0).mean() * 100],
"OOS": [oos_pnl.mean() * TRADING_DAYS * 100,
oos_pnl.std() * np.sqrt(TRADING_DAYS) * 100,
_ann_sharpe(oos_pnl),
(oos_pnl > 0).mean() * 100],
}, index=["AnnRet %", "AnnVol %", "Sharpe", "Hit %"])
for metric in metrics.index:
fig.add_trace(go.Bar(x=["IS", "OOS"], y=metrics.loc[metric].values,
name=metric,
text=[f"{v:.2f}" for v in metrics.loc[metric]],
textposition="outside"), 2, 1)
fig.add_trace(go.Histogram(x=is_pnl * 100, name="IS", opacity=0.6,
nbinsx=60, marker_color="steelblue"), 2, 2)
fig.add_trace(go.Histogram(x=oos_pnl * 100, name="OOS", opacity=0.6,
nbinsx=60, marker_color="darkorange"), 2, 2)
fig.update_layout(template="plotly_white", height=750, title=title,
barmode="group", hovermode="x unified")
return fig
# ------------------------------------------------------------
# 12.9 Per-Asset Risk/Return Bubble
# ------------------------------------------------------------
def per_asset_bubble(pnl_per_asset, title="Per-Asset Risk/Return"):
rows = []
for c in pnl_per_asset.columns:
s = pnl_per_asset[c].dropna()
if len(s) < 30:
continue
rows.append({
"asset": c,
"AnnRet": s.mean() * TRADING_DAYS * 100,
"AnnVol": s.std() * np.sqrt(TRADING_DAYS) * 100,
"Sharpe": _ann_sharpe(s),
"Total": s.sum() * 100,
})
df = pd.DataFrame(rows)
df["AbsTotal"] = df["Total"].abs()
fig = px.scatter(
df, x="AnnVol", y="AnnRet", size="AbsTotal", color="Sharpe",
hover_name="asset", text="asset",
color_continuous_scale="RdYlGn", color_continuous_midpoint=0,
size_max=40,
title=title,
)
fig.update_traces(textposition="top center")
fig.add_hline(y=0, line_dash="dash", line_color="gray")
fig.add_vline(x=0, line_dash="dash", line_color="gray")
fig.update_layout(template="plotly_white", height=600,
xaxis_title="Annualized Vol (%)",
yaxis_title="Annualized Return (%)")
return fig
# ------------------------------------------------------------
# 12.10 Per-Asset PnL Contribution
# ------------------------------------------------------------
def per_asset_contribution(pnl_per_asset, title="Per-Asset PnL Contribution"):
totals = pnl_per_asset.sum().sort_values()
sharpes = pnl_per_asset.apply(_ann_sharpe)
colors = ["firebrick" if v < 0 else "green" for v in totals.values]
fig = make_subplots(rows=1, cols=2,
subplot_titles=["Total PnL (%)", "Sharpe by asset"],
column_widths=[0.5, 0.5])
fig.add_trace(go.Bar(x=totals.values * 100, y=totals.index,
orientation="h", marker_color=colors,
text=[f"{v * 100:.1f}%" for v in totals.values],
textposition="outside"), 1, 1)
sharpes_sorted = sharpes.loc[totals.index]
sr_colors = ["firebrick" if v < 0 else "green" for v in sharpes_sorted.values]
fig.add_trace(go.Bar(x=sharpes_sorted.values, y=sharpes_sorted.index,
orientation="h", marker_color=sr_colors,
text=[f"{v:.2f}" for v in sharpes_sorted.values],
textposition="outside"), 1, 2)
fig.update_layout(template="plotly_white", height=700,
title=title, showlegend=False)
return fig
# ------------------------------------------------------------
# 12.11 Sector Attribution
# ------------------------------------------------------------
def sector_attribution_plot(pnl_per_asset, sector_map, title="Sector Attribution"):
sectors = pd.Series(sector_map).reindex(pnl_per_asset.columns).fillna("Other")
sector_pnl = {}
for sec in sectors.unique():
cols = sectors[sectors == sec].index
sector_pnl[sec] = pnl_per_asset[cols].sum(axis=1)
sec_df = pd.DataFrame(sector_pnl)
sec_eq = (1 + sec_df).cumprod()
sec_totals = sec_df.sum().sort_values()
sec_sharpes = sec_df.apply(_ann_sharpe)
fig = make_subplots(
rows=2, cols=2,
row_heights=[0.55, 0.45],
specs=[[{"colspan": 2}, None], [{}, {}]],
subplot_titles=["Compounded equity by sector",
"Total PnL by sector (%)",
"Sharpe by sector"],
)
palette = px.colors.qualitative.Set2
for i, c in enumerate(sec_eq.columns):
fig.add_trace(go.Scatter(x=sec_eq.index, y=sec_eq[c],
name=c, line=dict(color=palette[i % len(palette)],
width=2)), 1, 1)
fig.add_trace(go.Bar(x=sec_totals.index, y=sec_totals.values * 100,
marker_color=["firebrick" if v < 0 else "green"
for v in sec_totals.values],
text=[f"{v * 100:.1f}%" for v in sec_totals.values],
textposition="outside", showlegend=False), 2, 1)
fig.add_trace(go.Bar(x=sec_sharpes.index, y=sec_sharpes.values,
marker_color=["firebrick" if v < 0 else "green"
for v in sec_sharpes.values],
text=[f"{v:.2f}" for v in sec_sharpes.values],
textposition="outside", showlegend=False), 2, 2)
fig.update_layout(template="plotly_white", height=800, title=title,
hovermode="x unified", legend=dict(orientation="h", y=1.05))
return fig
# ------------------------------------------------------------
# 12.12 IC Analysis + Decay
# ------------------------------------------------------------
def ic_analysis_plot(ic_series, ic_decay_df=None, title="IC Analysis"):
fig = make_subplots(
rows=2, cols=2,
row_heights=[0.55, 0.45],
specs=[[{"colspan": 2}, None], [{}, {}]],
subplot_titles=["Cross-sectional IC over time",
"IC distribution",
"IC decay across horizons"],
)
fig.add_trace(go.Bar(x=ic_series.index, y=ic_series,
marker_color=["green" if v > 0 else "firebrick"
for v in ic_series],
opacity=0.35, name="daily IC"), 1, 1)
fig.add_trace(go.Scatter(x=ic_series.index,
y=ic_series.rolling(63).mean(),
name="63d MA",
line=dict(color="black", width=2)), 1, 1)
fig.add_hline(y=0, line_dash="dash", row=1, col=1)
fig.add_hline(y=ic_series.mean(), line_dash="dot", line_color="red",
annotation_text=f"mean={ic_series.mean():.3f}", row=1, col=1)
fig.add_trace(go.Histogram(x=ic_series, nbinsx=50,
marker_color="steelblue", opacity=0.7,
showlegend=False), 2, 1)
fig.add_vline(x=0, line_dash="dash", line_color="gray", row=2, col=1)
fig.add_vline(x=ic_series.mean(), line_dash="dot", line_color="red", row=2, col=1)
if ic_decay_df is not None:
fig.add_trace(go.Scatter(x=ic_decay_df["horizon"],
y=ic_decay_df["mean_IC"],
mode="lines+markers",
line=dict(color="navy", width=2),
marker=dict(size=10),
text=[f"t={v:.1f}" for v in ic_decay_df["t_HAC"]],
hovertemplate="h=%{x}<br>IC=%{y:.4f}<br>%{text}",
showlegend=False), 2, 2)
fig.add_hline(y=0, line_dash="dash", row=2, col=2)
fig.update_xaxes(title_text="Horizon (days)", row=2, col=2, type="log")
fig.update_yaxes(title_text="Mean IC", row=2, col=2)
fig.update_layout(template="plotly_white", height=750, title=title,
hovermode="x unified")
return fig
# ------------------------------------------------------------
# 12.13 Quantile Bucket Monotonicity
# ------------------------------------------------------------
def quantile_buckets_plot(pred_panel, ret_panel, n_buckets=5,
title="Forward Return by Predicted-Rank Bucket"):
fwd = ret_panel.shift(-1)
rows = []
for dt in pred_panel.index:
p, r = pred_panel.loc[dt], fwd.loc[dt]
ok = p.notna() & r.notna()
if ok.sum() < n_buckets:
continue
try:
q = pd.qcut(p[ok].rank(method="first"), n_buckets, labels=False)
for b in range(n_buckets):
vals = r[ok][q == b]
rows.append({"date": dt, "bucket": b, "ret": vals.mean()})
except Exception:
continue
df = pd.DataFrame(rows)
avg = df.groupby("bucket")["ret"].agg(["mean", "std", "count"])
avg["ann_ret"] = avg["mean"] * TRADING_DAYS * 100
avg["sharpe"] = avg["mean"] / avg["std"] * np.sqrt(TRADING_DAYS)
fig = make_subplots(rows=1, cols=2,
subplot_titles=["Annualized return by bucket",
"Sharpe by bucket"])
fig.add_trace(go.Bar(x=avg.index, y=avg["ann_ret"],
marker_color=["firebrick" if v < 0 else "green"
for v in avg["ann_ret"]],
text=[f"{v:.2f}%" for v in avg["ann_ret"]],
textposition="outside"), 1, 1)
fig.add_trace(go.Bar(x=avg.index, y=avg["sharpe"],
marker_color=["firebrick" if v < 0 else "green"
for v in avg["sharpe"]],
text=[f"{v:.2f}" for v in avg["sharpe"]],
textposition="outside"), 1, 2)
spread = avg["ann_ret"].iloc[-1] - avg["ann_ret"].iloc[0]
fig.add_annotation(text=f"Q{n_buckets}-Q1 spread: {spread:.2f}%",
xref="paper", yref="paper", x=0.5, y=1.05,
showarrow=False, font=dict(size=14, color="black"))
fig.update_layout(template="plotly_white", height=450,
title=title, showlegend=False,
xaxis_title="Bucket (low → high prediction)")
return fig
# ------------------------------------------------------------
# 12.14 Exposure, Leverage & Turnover
# ------------------------------------------------------------
def exposure_turnover_plot(position, title="Exposure, Leverage & Turnover"):
pos = position.shift(1).fillna(0)
gross = pos.abs().sum(axis=1)
net = pos.sum(axis=1)
n_active = (pos.abs() > 1e-8).sum(axis=1)
turn = pos.diff().abs().sum(axis=1).rolling(21).mean()
top_concentration = (pos.abs().max(axis=1) / gross.replace(0, np.nan))
fig = make_subplots(
rows=2, cols=2,
subplot_titles=["Gross & Net Exposure",
"# Active names",
"Turnover (21d MA)",
"Top-name concentration (%)"],
)
fig.add_trace(go.Scatter(x=gross.index, y=gross,
name="Gross", line=dict(color="navy")), 1, 1)
fig.add_trace(go.Scatter(x=net.index, y=net,
name="Net", line=dict(color="darkorange")), 1, 1)
fig.add_hline(y=0, line_dash="dash", line_color="gray", row=1, col=1)
fig.add_trace(go.Scatter(x=n_active.index, y=n_active,
line=dict(color="green"), showlegend=False), 1, 2)
fig.add_trace(go.Scatter(x=turn.index, y=turn,
fill="tozeroy",
line=dict(color="purple"), showlegend=False), 2, 1)
fig.add_trace(go.Scatter(x=top_concentration.index, y=top_concentration * 100,
line=dict(color="firebrick"), showlegend=False), 2, 2)
fig.add_hline(y=20, line_dash="dash", line_color="gray",
annotation_text="20% cap", row=2, col=2)
fig.update_layout(template="plotly_white", height=700, title=title,
hovermode="x unified", legend=dict(orientation="h"))
return fig
# ------------------------------------------------------------
# 12.15 Return Distribution Diagnostics
# ------------------------------------------------------------
def return_distribution_plot(pnl, title="Return Distribution Diagnostics"):
pnl = pnl.dropna() * 100 # to percent
mu, sigma = pnl.mean(), pnl.std()
fig = make_subplots(
rows=1, cols=2,
subplot_titles=["Daily returns vs Normal",
"Q-Q plot vs Normal"],
)
fig.add_trace(go.Histogram(x=pnl, nbinsx=80, histnorm="probability density",
marker_color="steelblue", opacity=0.7,
name="Empirical"), 1, 1)
xs = np.linspace(pnl.min(), pnl.max(), 200)
fig.add_trace(go.Scatter(x=xs, y=norm.pdf(xs, mu, sigma),
line=dict(color="firebrick", width=2),
name="Normal"), 1, 1)
var95 = np.percentile(pnl, 5)
cvar95 = pnl[pnl < var95].mean() if (pnl < var95).any() else np.nan
fig.add_annotation(
text=(f"μ={mu:.3f}% σ={sigma:.3f}%<br>"
f"skew={pnl.skew():.2f} kurt={pnl.kurt():.2f}<br>"
f"VaR95={var95:.2f}% CVaR95={cvar95:.2f}%"),
xref="paper", yref="paper", x=0.02, y=0.98,
showarrow=False, align="left",
bgcolor="rgba(255,255,200,0.7)", font=dict(size=11),
)
sorted_ret = np.sort(pnl.values)
n = len(sorted_ret)
theoretical = norm.ppf((np.arange(1, n + 1) - 0.5) / n, mu, sigma)
fig.add_trace(go.Scatter(x=theoretical, y=sorted_ret,
mode="markers",
marker=dict(color="navy", size=4, opacity=0.5),
showlegend=False), 1, 2)
line_range = [min(theoretical.min(), sorted_ret.min()),
max(theoretical.max(), sorted_ret.max())]
fig.add_trace(go.Scatter(x=line_range, y=line_range,
line=dict(color="firebrick", dash="dash"),
showlegend=False), 1, 2)
fig.update_xaxes(title_text="Theoretical quantile", row=1, col=2)
fig.update_yaxes(title_text="Empirical quantile", row=1, col=2)
fig.update_layout(template="plotly_white", height=450, title=title,
barmode="overlay")
return fig
# ------------------------------------------------------------
# 12.16 Factor Regression
# ------------------------------------------------------------
def factor_regression_plot(pnl, factor_returns, title="Factor Regression"):
df = pd.concat([pnl.rename("pnl"), factor_returns], axis=1).dropna()
if len(df) < 30:
return None
X = add_constant(df.drop(columns="pnl"))
res = OLS(df["pnl"], X).fit(cov_type="HAC", cov_kwds={"maxlags": 10})
coefs = res.params.drop("const")
tvals = res.tvalues.drop("const")
alpha_ann = res.params["const"] * TRADING_DAYS * 100
alpha_t = res.tvalues["const"]
fig = make_subplots(
rows=1, cols=2,
subplot_titles=[f"Factor betas (α_ann = {alpha_ann:.2f}%, t={alpha_t:.2f})",
"Residual (alpha) equity"],
column_widths=[0.45, 0.55],
)
colors = ["green" if abs(t) < 1.96 else "firebrick" for t in tvals]
fig.add_trace(go.Bar(x=coefs.index, y=coefs.values,
marker_color=colors,
text=[f"β={b:.2f}<br>t={t:.2f}"
for b, t in zip(coefs, tvals)],
textposition="outside",
showlegend=False), 1, 1)
fig.add_hline(y=0, line_dash="dash", line_color="gray", row=1, col=1)
resid = res.resid
resid_eq = (1 + resid).cumprod()
raw_eq = (1 + df["pnl"]).cumprod()
fig.add_trace(go.Scatter(x=raw_eq.index, y=raw_eq,
name="Raw PnL", line=dict(color="gray", width=1.5)), 1, 2)
fig.add_trace(go.Scatter(x=resid_eq.index, y=resid_eq,
name="Alpha (residual)",
line=dict(color="navy", width=2)), 1, 2)
fig.update_layout(template="plotly_white", height=500, title=title,
hovermode="x unified", legend=dict(orientation="h"))
return fig
# ------------------------------------------------------------
# 12.17 Master Dashboard (individual file output)
# ------------------------------------------------------------
def full_performance_dashboard(bt_summary, pnl_per_asset, position, ret_panel,
pred_panel=None, ic_series=None, ic_decay_df=None,
sector_map=None, factor_returns=None,
save_dir=None):
"""Generate every chart; either show or save individual HTML files."""
figs = {}
figs["01_equity_costs"] = equity_before_after_costs(bt_summary)
figs["02_drawdown"] = equity_drawdown_underwater(bt_summary["pnl"])
figs["03_long_short"] = long_short_decomposition(position, ret_panel)
figs["04_hit_rate_year"] = hit_rate_by_year(bt_summary["pnl"], ic_series)
figs["05_monthly_heatmap"] = monthly_pnl_heatmap(bt_summary["pnl"])
figs["06_rolling_metrics"] = rolling_metrics(bt_summary["pnl"])
figs["07_cost_breakdown"] = cost_breakdown_plot(bt_summary)
figs["08_is_oos"] = is_oos_comparison(bt_summary["pnl"])
figs["09_per_asset_bubble"] = per_asset_bubble(pnl_per_asset)
figs["10_per_asset_contrib"] = per_asset_contribution(pnl_per_asset)
if sector_map is not None:
figs["11_sector_attrib"] = sector_attribution_plot(pnl_per_asset, sector_map)
if ic_series is not None:
figs["12_ic_analysis"] = ic_analysis_plot(ic_series, ic_decay_df)
if pred_panel is not None:
figs["13_quantile_buckets"] = quantile_buckets_plot(pred_panel, ret_panel)
figs["14_exposure"] = exposure_turnover_plot(position)
figs["15_distribution"] = return_distribution_plot(bt_summary["pnl"])
if factor_returns is not None:
fig_fac = factor_regression_plot(bt_summary["pnl"], factor_returns)
if fig_fac is not None:
figs["16_factor_reg"] = fig_fac
if save_dir:
os.makedirs(save_dir, exist_ok=True)
for name, fig in figs.items():
fig.write_html(os.path.join(save_dir, f"{name}.html"))
print(f"Saved {len(figs)} plots → {save_dir}/")
else:
for fig in figs.values():
try:
fig.show()
except Exception:
pass
return figs
# ------------------------------------------------------------
# 12.18 Bundle ALL Charts into a Single Self-Contained HTML
# ------------------------------------------------------------
def bundle_all_charts_to_html(
bt_summary,
pnl_per_asset,
position,
ret_panel,
pred_panel=None,
ic_series=None,
ic_decay_df=None,
sector_map=None,
factor_returns=None,
output_path="performance_report.html",
title="Systematic Commodity Strategy — Performance Report",
subtitle="Squarepoint Dataset Round",
):
"""
Build every chart and embed in one self-contained HTML with sticky TOC
sidebar, KPI summary cards, and section anchors. Single file, no deps.
"""
sections = []
sections.append(("equity-costs", "1. Equity: Gross vs Net of Costs",
equity_before_after_costs(bt_summary)))
sections.append(("drawdown", "2. Drawdown & Underwater Duration",
equity_drawdown_underwater(bt_summary["pnl"])))
sections.append(("long-short", "3. Long-Side vs Short-Side PnL",
long_short_decomposition(position, ret_panel)))
sections.append(("hit-rate", "4. Hit Rate & IC by Year",
hit_rate_by_year(bt_summary["pnl"], ic_series)))
sections.append(("monthly-heatmap", "5. Monthly PnL Heatmap",
monthly_pnl_heatmap(bt_summary["pnl"])))
sections.append(("rolling-metrics", "6. Rolling 1Y Performance Metrics",
rolling_metrics(bt_summary["pnl"])))
sections.append(("cost-breakdown", "7. Cost Breakdown",
cost_breakdown_plot(bt_summary)))
sections.append(("is-oos", "8. In-Sample vs Out-of-Sample",
is_oos_comparison(bt_summary["pnl"])))
sections.append(("asset-bubble", "9. Per-Asset Risk/Return",
per_asset_bubble(pnl_per_asset)))
sections.append(("asset-contrib", "10. Per-Asset PnL Contribution",
per_asset_contribution(pnl_per_asset)))
if sector_map is not None:
sections.append(("sector-attrib", "11. Sector Attribution",
sector_attribution_plot(pnl_per_asset, sector_map)))
if ic_series is not None:
sections.append(("ic-analysis", "12. IC Analysis & Decay",
ic_analysis_plot(ic_series, ic_decay_df)))
if pred_panel is not None:
sections.append(("quantile", "13. Quantile Bucket Monotonicity",
quantile_buckets_plot(pred_panel, ret_panel)))
sections.append(("exposure", "14. Exposure, Leverage & Turnover",
exposure_turnover_plot(position)))
sections.append(("distribution", "15. Return Distribution Diagnostics",
return_distribution_plot(bt_summary["pnl"])))
if factor_returns is not None:
fig_fac = factor_regression_plot(bt_summary["pnl"], factor_returns)
if fig_fac is not None:
sections.append(("factor", "16. Factor Regression", fig_fac))
# Summary KPIs
pnl = bt_summary["pnl"].dropna()
ann_ret = pnl.mean() * TRADING_DAYS * 100
ann_vol = pnl.std() * np.sqrt(TRADING_DAYS) * 100
sr = _ann_sharpe(pnl)
eq = (1 + pnl).cumprod()
mdd = (eq / eq.cummax() - 1).min() * 100
hit = (pnl > 0).mean() * 100
n_days = len(pnl)
summary_cards = f"""
<div class="cards">
<div class="card"><div class="label">Ann Return</div><div class="value">{ann_ret:.2f}%</div></div>
<div class="card"><div class="label">Ann Vol</div><div class="value">{ann_vol:.2f}%</div></div>
<div class="card highlight"><div class="label">Sharpe</div><div class="value">{sr:.2f}</div></div>
<div class="card"><div class="label">Max DD</div><div class="value">{mdd:.2f}%</div></div>
<div class="card"><div class="label">Hit Rate</div><div class="value">{hit:.1f}%</div></div>
<div class="card"><div class="label">Trading Days</div><div class="value">{n_days:,}</div></div>
</div>
"""
toc_items = "\n".join(
f'<li><a href="#{anchor}">{sec_title}</a></li>'
for anchor, sec_title, _ in sections
)
chart_blocks = []
for i, (anchor, sec_title, fig) in enumerate(sections):
include_js = "cdn" if i == 0 else False
fig_html = fig.to_html(
full_html=False,
include_plotlyjs=include_js,
div_id=f"chart-{anchor}",
config={"displayModeBar": True, "displaylogo": False,
"toImageButtonOptions": {"format": "png", "scale": 2}},
)
chart_blocks.append(f"""
<section id="{anchor}">
<h2>{sec_title}</h2>
{fig_html}
</section>
""")
charts_html = "\n".join(chart_blocks)
generated_at = datetime.now().strftime("%Y-%m-%d %H:%M")
html = f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>{title}</title>
<style>
* {{ box-sizing: border-box; }}
body {{
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
color: #1f2937;
background: #f9fafb;
}}
header {{
background: linear-gradient(135deg, #1e3a8a 0%, #1e40af 100%);
color: white;
padding: 32px 40px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}}
header h1 {{ margin: 0 0 4px 0; font-size: 28px; font-weight: 600; }}
header .subtitle {{ opacity: 0.85; font-size: 14px; }}
header .timestamp {{ opacity: 0.7; font-size: 12px; margin-top: 8px; }}
.cards {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(140px, 1fr));
gap: 12px;
margin-top: 20px;
}}
.card {{
background: rgba(255,255,255,0.12);
backdrop-filter: blur(8px);
border-radius: 8px;
padding: 14px 18px;
border: 1px solid rgba(255,255,255,0.2);
}}
.card.highlight {{
background: rgba(251, 191, 36, 0.25);
border-color: rgba(251, 191, 36, 0.5);
}}
.card .label {{ font-size: 11px; text-transform: uppercase;
letter-spacing: 0.5px; opacity: 0.8; }}
.card .value {{ font-size: 22px; font-weight: 600; margin-top: 4px; }}
.container {{ display: flex; max-width: 1600px; margin: 0 auto; }}
nav.toc {{
position: sticky; top: 0; align-self: flex-start;
width: 280px; min-width: 280px;
height: 100vh; overflow-y: auto;
padding: 24px 16px;
background: white;
border-right: 1px solid #e5e7eb;
}}
nav.toc h3 {{ margin: 0 0 12px 0; font-size: 13px;
text-transform: uppercase; letter-spacing: 0.5px;
color: #6b7280; }}
nav.toc ol {{ list-style: none; padding: 0; margin: 0; }}
nav.toc li {{ margin: 0; }}
nav.toc a {{
display: block;
padding: 8px 12px;
color: #374151;
text-decoration: none;
border-radius: 6px;
font-size: 13px;
transition: background 0.15s;
}}
nav.toc a:hover {{ background: #f3f4f6; color: #1e40af; }}
main {{ flex: 1; padding: 24px 40px; }}
section {{
background: white;
border-radius: 10px;
padding: 24px;
margin-bottom: 24px;
box-shadow: 0 1px 3px rgba(0,0,0,0.05);
border: 1px solid #e5e7eb;
}}
section h2 {{
margin: 0 0 16px 0;
font-size: 18px;
color: #111827;
border-bottom: 2px solid #e5e7eb;
padding-bottom: 10px;
}}
footer {{
text-align: center;
padding: 20px;
color: #9ca3af;
font-size: 12px;
border-top: 1px solid #e5e7eb;
margin-top: 32px;
}}
@media (max-width: 900px) {{
nav.toc {{ display: none; }}
main {{ padding: 16px; }}
}}
</style>
</head>
<body>
<header>
<h1>{title}</h1>
<div class="subtitle">{subtitle}</div>
<div class="timestamp">Generated {generated_at}</div>
{summary_cards}
</header>
<div class="container">
<nav class="toc">
<h3>Contents</h3>
<ol>
{toc_items}
</ol>
</nav>
<main>
{charts_html}
</main>
</div>
<footer>
Generated by Squarepoint pipeline · {n_days:,} trading days · {len(sections)} charts
</footer>
</body>
</html>"""
output_path = os.path.abspath(output_path)
with open(output_path, "w", encoding="utf-8") as f:
f.write(html)
file_size_kb = os.path.getsize(output_path) / 1024
print(f"✓ Bundled {len(sections)} charts → {output_path}")
print(f" File size: {file_size_kb:.0f} KB · Self-contained · Open in any browser")
return output_path
## 13. Multiple Testing Correction (data-snooping bias)
```python
def holm_bonferroni(pvals):
"""Holm step-down adjustment."""
p = np.asarray(pvals)
n = len(p); order = np.argsort(p)
adj = np.empty(n)
running_max = 0
for rank, idx in enumerate(order):
v = (n - rank) * p[idx]
running_max = max(running_max, v)
adj[idx] = min(running_max, 1.0)
return adj
14. End-to-End Pipeline
# ============== 1) LOAD ==============
df = load_panel("data.csv")
front_raw = df.filter(regex="^px_front_"); front_raw.columns = [c.replace("px_front_","") for c in front_raw.columns]
back_raw = df.filter(regex="^px_back_"); back_raw.columns = [c.replace("px_back_","") for c in back_raw.columns]
print(first_look(front_raw))
# ============== 2) UNIVERSE + CLEAN ==============
prices, clean_rep = clean_panel(front_raw, drop_pct_first_half=0.5, ffill_cap=5)
back, _ = clean_panel(back_raw, drop_pct_first_half=0.5, ffill_cap=5)
common = prices.columns.intersection(back.columns)
prices, back = prices[common], back[common]
active_univ = build_active_universe(prices, min_history=252, max_gap=21)
# ============== 3) ROLLS + RETURNS ==============
roll_mask = detect_roll_dates(front_raw[common], threshold_jump=0.03)
returns_raw = to_log_returns(prices)
returns = mask_roll_returns(returns_raw, roll_mask).fillna(0.0)
print(panel_stationarity(returns).head())
# ============== 4) FEATURES (Deliverable #1) ==============
feats = build_panel_features(prices, front_raw=front_raw[common],
back_raw=back_raw[common], dt_years=1/12)
feats_cs = apply_cs_transform(feats, transform=cs_zscore, active=active_univ)
X_long = stack_features(feats_cs).replace([np.inf,-np.inf], np.nan)
HORIZON = 1
y_panel = make_panel_target(prices, horizon=HORIZON, kind="log")
y_long = stack_target(y_panel)
mask = y_long.notna() & X_long.notna().any(axis=1)
X_long, y_long = X_long.loc[mask], y_long.loc[mask]
# ============== 5) HP TUNE (IC-scored) + WALK-FORWARD ML (Deliverable #2) ==============
best_params, gs = panel_tune_hyperparams(
X_long, y_long,
param_grid={"model__alpha": [0.01, 0.1, 1, 10, 100]},
n_splits=5, horizon=HORIZON, scoring="ic",
)
print("Best:", best_params, "IC:", gs.best_score_)
best_alpha = best_params["model__alpha"]
pred_panel = panel_walk_forward_predict(
X_long, y_long,
model_factory=ridge_factory(alpha=best_alpha),
train_window_days=252*3, step_days=63,
embargo_days=HORIZON, mode="expanding",
)
bump_trials(1) # the actual walk-forward run
ic_ts = panel_ic_per_date(pred_panel, y_panel)
print("IC summary:\n", panel_ic_summary(ic_ts))
print("IC decay:\n", ic_decay(pred_panel, prices))
# ============== 6) POSITION ==============
position = panel_pred_to_position(
pred_panel, returns,
gross_target=2.0, sizing="rank",
vol_floor=0.05, ewm_half_life=3, threshold=0.2,
neutralize="risk", max_weight=0.20, min_active_names=5,
active=active_univ,
)
# Optional: also sector-neutralize predictions then re-build
pred_sector_neutral = sector_neutralize(pred_panel, COMMODITY_SECTORS)
position_sn = panel_pred_to_position(
pred_sector_neutral, returns, gross_target=2.0, sizing="rank",
vol_floor=0.05, ewm_half_life=3, threshold=0.2,
neutralize="risk", active=active_univ,
)
# ============== 7) BACKTEST (Deliverable #3) ==============
bt, pnl_per_asset = panel_backtest(
position, returns,
cost_bps=1.0, impact_coef=10.0,
borrow_bps_annual=50.0, funding_bps_annual=25.0,
target_vol=0.10, gross_cap=1.5,
)
# ============== 8) PERFORMANCE (Deliverable #4) ==============
print("\nFull-sample:\n", perf_stats(bt.pnl))
split = int(len(bt) * 0.7)
print("\nIS :\n", perf_stats(bt.pnl.iloc[:split]))
print("\nOOS:\n", perf_stats(bt.pnl.iloc[split:]))
print("\nPer-asset:\n",
per_asset_perf(pnl_per_asset).sort_values("Sharpe", ascending=False).head(10))
print("\nCost sweep:\n", cost_sweep_panel(position, returns))
sr, lo, hi = stationary_bootstrap_sharpe(bt.pnl, n=2000)
print(f"\nSharpe (stationary bootstrap CI95): {sr:.2f} [{lo:.2f}, {hi:.2f}]")
print("\nDeflated Sharpe (n_trials accounted):",
report_deflated_sharpe(bt.pnl, TRIAL_COUNTER["n"]))
# ============== 9) ATTRIBUTION ==============
print("\nPer-signal attribution:\n",
pnl_attribution_by_signal(feats_cs, returns).head(10))
print("\nPer-sector PnL totals:\n",
pnl_attribution_by_sector(pnl_per_asset, COMMODITY_SECTORS).sum().sort_values())
ls = pnl_attribution_long_short(position, returns)
print("\nLong vs Short:\n", ls.sum())
# Factor regression (load benchmark factors externally — example placeholders)
# bench = pd.DataFrame({"BCOM": bcom_ret, "DXY": dxy_ret, "OIL": oil_ret})
# print(factor_regression(bt.pnl, bench)[0])
# ============== 10) EXPOSURE & STABILITY ==============
print("\nExposure:\n", exposure_diagnostics(position))
print("\nSignal autocorr (mean):", signal_stability(pred_panel).mean())
After running your full pipeline:
bundle_all_charts_to_html( bt_summary = bt, pnl_per_asset = pnl_per_asset, position = position, ret_panel = returns, pred_panel = pred_panel, ic_series = ic_ts, ic_decay_df = ic_decay(pred_panel, prices), sector_map = COMMODITY_SECTORS, factor_returns= None, # or your factor DataFrame output_path = “performance_report.html”, title = “Commodity Multi-Asset Strategy”, subtitle = " — [Your Name]", )
15. Robustness Sweep (honest about n_trials)
def panel_parameter_sweep(pred_panel, ret_panel,
half_lives=(None, 3, 7, 14),
thresholds=(0, 0.2, 0.4),
target_vols=(0.05, 0.10, 0.15),
active=None):
rows = []
for hl in half_lives:
for t in thresholds:
for tv in target_vols:
pos = panel_pred_to_position(
pred_panel, ret_panel,
sizing="rank", neutralize="risk",
ewm_half_life=hl, threshold=t, active=active,
)
s, _ = panel_backtest(pos, ret_panel, cost_bps=1.0,
target_vol=tv, gross_cap=1.5)
rows.append({"hl":hl, "t":t, "tv":tv,
"Sharpe": sharpe(s.pnl),
"Turn": s.turnover.mean(),
"MaxDD": max_drawdown_compound(s.pnl)})
bump_trials(1)
df = pd.DataFrame(rows).sort_values("Sharpe", ascending=False)
# Multiple-testing aware: report deflated of headline result
return df
Pitfalls Audited — Deep Dive
Each item below explains what the pitfall is, why it’s dangerous, how it’s implemented in the pipeline, and what the interviewer is likely to probe.
1. position.shift(1) enforced everywhere
What the pitfall is
If you compute today’s signal from today’s close, then multiply by today’s return, you’ve assumed you could trade at the same close you used to compute the signal. That’s a 1-day lookahead — the most common and most damaging backtest bug in quant finance.
Why it matters
A signal that uses r_t (today’s return) to “predict” r_t produces a perfect-looking equity curve that collapses in production. Sharpe inflation of 5-10x is typical for this single bug.
How it’s implemented
Inside panel_backtest:
pos = position.shift(1).fillna(0.0) # ← critical line
pnl_per_asset = pos * ret_panel - costs
The convention enforced everywhere:
- Signal computed using info through close of day
t→position[t] - That position is then shifted forward by one day → effective position is
position[t-1]on dayt - PnL on day
t=position[t-1] × return[t]
This mirrors reality: you compute your signal after Monday’s close, place orders overnight, and earn Tuesday’s return.
What the interviewer asks
“Walk me through a single day. Signal computed when, traded when, PnL accrued when?”
Answer:
Signal at close of
t-1, orders submitted overnight, position established at open oft, PnL =pos × ret_close_to_close[t]. The.shift(1)enforces that the signal at indextonly earns the return at indext+1.
How to verify (sanity check)
# This should always equal zero
sanity = (position * returns).sum(axis=1).corr(
(position.shift(1) * returns).sum(axis=1).shift(-1))
# If close to 1.0, shift is working as intended
2. Embargo = horizon in all train/test splits
What the pitfall is
When you train on data ending at day t and test starting at day t+1, but your target is a forward return over h days, your training set’s last row has a target that overlaps the first h test days. The model gets to “see” the test period’s information through the overlapping target window.
Why it matters
For h=1 it’s a 1-day contamination — small. For h=21 (monthly target) it’s huge: a 3-year training window’s last month is essentially the first month of the test set. Sharpe inflation of 1.5-3x is typical.
How it’s implemented
In panel_walk_forward_predict:
tr_end = max(tr_start + 1, start - embargo_days)
tr_dates = dates[tr_start:tr_end] # drop last `embargo_days` of training
te_dates = dates[start:end]
In panel_tune_hyperparams:
for tr_idx, te_idx in TimeSeriesSplit(n_splits=n_splits, gap=horizon).split(dates):
# gap = horizon enforces embargo
The rule: embargo_days = forecast_horizon. If you predict 1-day returns, drop the last 1 day of training. If you predict 21-day returns, drop the last 21.
Visual
TRAIN ──────────────────────────][embargo][──── TEST ────
^^^^^^^
= horizon days
What the interviewer asks
“Your target is
log(P_{t+h}) - log(P_t). What happens at the boundary between train and test?”
Answer:
The last training row at date
t_last_trainhas target spanning(t_last_train, t_last_train + h]. Without an embargo, that target overlaps the firsthdays of the test window — the model is implicitly trained on the test period. I enforceembargo = hto drop those rows from training entirely.
3. Universe selection from first half only
What the pitfall is
“Drop any asset with >50% missing values” sounds reasonable. But if you compute that missingness over the whole sample, you’re using information from the future (OOS period) to decide which assets to include today. This is called selection bias or lookahead universe construction.
Why it matters
Suppose Coffee futures stopped trading in 2022. If you compute missingness over 2010-2024, Coffee has 30% missing (the last 2 years are NaN) so you keep it. But in 2010 you didn’t know Coffee would survive. Your backtest enjoys survivorship in reverse — you only kept assets that will trade.
How it’s implemented
In clean_panel:
n_half = len(prices) // 2
pct_na_first_half = prices.iloc[:n_half].isna().mean()
drop_cols = pct_na_first_half[pct_na_first_half > drop_pct_first_half].index.tolist()
prices = prices.drop(columns=drop_cols)
Universe rule: Only the first 50% of the sample (≈in-sample period) determines which columns survive. The OOS period inherits whatever first-half decision was made — no peeking forward.
Stricter version implemented in build_active_universe:
# At each date t, active(t) = (history ≥ 252 days) AND (last_obs within 21 days)
This is fully point-in-time — universe membership at date t uses only data through t.
What the interviewer asks
“How do you handle commodities that delisted mid-sample? Or contracts that didn’t exist before 2015?”
Answer:
Two-layer defense. First-pass column drop uses only the first half of the sample, so no asset’s OOS performance influences its inclusion. Then a point-in-time
active_universe(t)mask requires 252 days of history and recent data — applied everywhere features and positions are computed. Late-starting assets are silently zeroed until they qualify; ones that die get masked out the day they stop reporting.
4. Winsorization is expanding (not full-sample)
What the pitfall is
Winsorization caps extreme values at the e.g. 0.5% / 99.5% quantiles. If those quantiles are computed over the entire sample, you’re using future tail-events to define what counts as “extreme” today. A 2020 oil-price collapse changes the historical 99.5th percentile, retroactively shrinking 2015 winsorization caps.
Why it matters
Subtle, but real. A model trained on winsorized 2015 data has implicitly seen 2020’s tails through the quantile boundaries. For tail-sensitive strategies (e.g., short-vol, mean-reversion), this can inflate Sharpe meaningfully.
How it’s implemented
expanding_winsorize_panel:
def expanding_winsorize_panel(panel, q=0.005, min_history=252):
out = panel.copy()
for c in panel.columns:
s = panel[c]
lo = s.expanding(min_history).quantile(q) # ← key: expanding
hi = s.expanding(min_history).quantile(1 - q)
out[c] = s.clip(lower=lo, upper=hi)
return out
The quantile at date t uses only s[:t]. Before min_history=252 days, no winsorization is applied (NaN bounds → no clip).
What the interviewer asks
“Did you winsorize? Show me the line. Was that on the full sample?”
Answer:
Expanding quantile, minimum 252-day history. The cap at date
tis computed only from data throught, so a 2020 outlier cannot influence the 2015 cap. Trade-off: early sample (< 1 year) is unwinsorized, but the first year is in any case excluded from the walk-forward by the 3-year training requirement.
5. Cross-sectional features computed per date
What the pitfall is
If you z-score a momentum feature using the full-sample mean and std, you’ve leaked the entire distribution into every point. Worse, you’ve made the feature non-stationary in a different way: a “high momentum” reading in 2015 is judged against 2024 statistics.
The correct way is cross-sectional: at each date t, rank or z-score across assets (not across time).
Why it matters
Cross-sectional normalization makes signals comparable across heterogeneous assets at any given moment. It also automatically adjusts for regime — in a high-vol month, “extreme” is relative to that month’s spread.
How it’s implemented
def cs_zscore(feat):
mu = feat.mean(axis=1) # axis=1 ⇒ across columns (assets), per row (date)
sd = feat.std(axis=1).replace(0, np.nan)
return feat.sub(mu, axis=0).div(sd, axis=0)
The key is axis=1: mean and std are computed across assets at each date. No time-series statistics involved → no lookahead possible.
Applied via:
feats_cs = apply_cs_transform(feats, transform=cs_zscore, active=active_univ)
The active argument masks inactive assets before computing the cross-sectional statistics so dead names don’t drag the mean.
What the interviewer asks
“Why z-score across assets and not across time?”
Answer:
Three reasons. (1) No lookahead — same-date statistics never involve future data. (2) Comparability — a 21d momentum of 5% means different things for Gold vs Natural Gas; the cross-section normalizes the asset-vol heterogeneity. (3) Regime-adjusted — in a high-dispersion month, the threshold for “high momentum” naturally rises with the cross-section.
6. Inverse-vol uses .shift(1), vol-floored
What the pitfall is
Two pitfalls bundled here:
(a) Lookahead vol: Computing today’s position size using vol[t] (which includes today’s return) and earning ret[t] is a 1-day leak — the position is sized using information from the day it’s traded.
(b) Vol explosion: When realized vol approaches zero (a quiet asset in a quiet period), 1/vol blows up to infinity, producing absurd position sizes. A single calm week can give you 100x leverage to one asset.
Why it matters
(a) Inflates Sharpe modestly (~10-20%) and is hard to spot. (b) Causes catastrophic drawdowns the moment vol normalizes — you’re already at 100x leverage when the next-day return is large.
How it’s implemented
In panel_pred_to_position:
vol = ret_panel.rolling(vol_window, min_periods=vol_window).std() * np.sqrt(TRADING_DAYS)
vol = vol.shift(1).clip(lower=vol_floor)
# ^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^
# 1-day lag floor at e.g. 5%
s = s.div(vol)
shift(1): position at date t uses vol estimated through date t-1.
clip(lower=vol_floor): any vol below 5% is treated as 5%, preventing leverage explosion.
min_periods=vol_window: don’t produce a vol estimate at all until enough history exists.
What the interviewer asks
“What happens to your position when an asset has had zero realized vol for a week?”
Answer:
The vol floor caps the effective vol at 5% annualized. Without it,
1/vol → ∞and the position would be unboundedly large. With the floor, the worst case is1/0.05 = 20, then the per-name cap (max_weight=20% × gross) and the gross renormalization step further bound it. Also note the.shift(1)— the vol estimate uses only data through yesterday.
7. Vol-target scale shifted, capped at 3x
What the pitfall is
Vol-targeting means: if realized portfolio vol is below target, scale up; if above, scale down. The pitfall is using today’s realized vol (which includes today’s PnL) to scale today’s position. That’s the same 1-day leak as inverse-vol weighting.
A second pitfall: when realized vol is very small, the scale factor target_vol / realized_vol can become enormous. A pipeline that scales 30x in a quiet period will produce a catastrophic loss the moment vol returns.
Why it matters
(a) Lookahead inflates Sharpe. (b) The 30x leverage scenario can wipe out a year of PnL in one bad day.
How it’s implemented
In panel_backtest:
pnl_raw = (pos * ret_panel).sum(axis=1)
rv = pnl_raw.rolling(min_vol_history, min_periods=min_vol_history).std() \
* np.sqrt(TRADING_DAYS)
scale = (target_vol / rv).clip(0, max_scale).shift(1).fillna(1.0)
# ^^^^^^^^^^^^^^^^ ^^^^^^^^
# cap at 3x lag by one day
pos = pos.mul(scale, axis=0)
Three protections:
.shift(1)— scale at daytuses realized vol throught-1.clip(0, max_scale=3.0)— never leverage more than 3x or short-scale below 0min_periods=63— no scaling at all until 3 months of PnL history exists; default to 1.0
What the interviewer asks
“Walk me through your vol targeting. What’s the maximum leverage your strategy ever uses?”
Answer:
Target is 10% annualized portfolio vol. The scale factor is
0.10 / realized_vol(63d), clipped between 0 and 3, then shifted by one day so we trade on yesterday’s vol estimate. Combined withgross_cap=1.5applied after the vol-target step, the worst-case gross leverage is 1.5x. Without the cap, you’d see ≈4-5x leverage during low-vol regimes — typical for vol-targeted strategies that blew up in March 2020.
8. Per-asset cost on |Δpos|, not portfolio aggregate
What the pitfall is
If you compute costs as cost_bps × |portfolio_turnover|, where portfolio_turnover = |Σ Δpos_i|, you understate costs. Because offsetting trades cancel in the sum but each still costs spread/impact in reality.
Example: long Gold +0.5 → +0.3 (sells 0.2), short Silver -0.5 → -0.3 (buys 0.2). Portfolio net turnover = 0. Real turnover = 0.4 (both trades happen). Wrong: zero cost. Right: cost on 0.4.
Why it matters
For market-neutral / dollar-neutral strategies (which this is), most trades are offsetting. The aggregation error can hide 30-70% of true costs. A strategy that survives at “2 bps” may actually need to survive at 5-7 bps.
How it’s implemented
In panel_backtest:
turnover_per_asset = pos.diff().abs().fillna(pos.abs()) # |Δpos_i| for each asset
spread_cost = turnover_per_asset * (cost_bps / 1e4) # per-asset cost
impact_cost = (turnover_per_asset ** 1.5) * (impact_coef / 1e4)
# ... then SUM across assets to get daily total
costs_per_asset = spread_cost + impact_cost + borrow_cost + funding_cost
total_cost = costs_per_asset.sum(axis=1) # ← sum AFTER per-asset cost
The critical detail: costs are computed per asset first, then summed. Never sum(Δpos).abs() × bps.
What the interviewer asks
“Show me your turnover calculation. How do you handle offsetting trades in long-short pairs?”
Answer:
Turnover is computed per asset as
|pos[t] - pos[t-1]|, costs applied per asset, then summed across the universe. Offsetting trades each cost their own spread and impact — they don’t net out. This matters for dollar-neutral strategies because most days every trade has an offsetting counter-trade somewhere. Aggregate turnover understates costs by ~50% in my testing.
9. Survivorship: late-starting asset properly masked by active_universe
What the pitfall is
Survivorship bias comes in two flavors:
(a) Backward: Including only assets that survived to today (already handled by §3 universe selection from first half).
(b) Forward / late-listing: Including an asset’s “history” before it actually traded. Your data file may have NaN for Lithium futures pre-2018, but if your pipeline silently fills those NaNs with zeros or carries forward, you’re trading an instrument that didn’t exist.
Why it matters
Late-listing bias is especially nasty because newly-listed assets often have high initial volatility and momentum. Including them retroactively makes the strategy look great in those years, but you couldn’t have actually traded them.
How it’s implemented
Layer 1: build_active_universe produces a T × N boolean mask:
def build_active_universe(prices, min_history=252, max_gap=21):
# For each date t, asset is active iff:
# - n_obs ≥ 252 non-NaN values up to t
# - last valid obs is within 21 days of t
...
return active # T × N bool
Layer 2: Applied at every signal-construction step:
feats_cs = apply_cs_transform(feats, transform=cs_zscore, active=active_univ)
# ...
position = panel_pred_to_position(pred_panel, returns, active=active_univ, ...)
Inside panel_pred_to_position:
if active is not None:
s = s.where(active.reindex_like(s).fillna(False), 0.0)
A new asset gets:
- NaN features for the first 252 days (no history to compute)
active=Falseuntil day 252- Position forced to zero until day 252
- Naturally enters the universe on day 253 once it has enough history
What the interviewer asks
“An asset starts trading in 2018. Your sample begins in 2010. What does your strategy hold in that asset on Jan 1, 2015?”
Answer:
Zero. The
active_universemask requires 252 days of history; the asset has none before 2018, soactive(t) = Falsefor allt < 2018-Jan-1 + 252 days ≈ 2019-Jan-1. The signal builder produces NaN features (no rolling window can fill), and the position builder explicitly forces weight to zero whereactiveis False. The asset enters the universe organically once it has a year of history, mirroring what a live trader could actually do.
10. Roll dates masked from return series
What the pitfall is
Commodity futures contracts expire. A “continuous” front-month price series stitches together different contracts, and on roll dates the price can jump 1-3% purely from the contract change — not a real return. If you feed these synthetic jumps into your momentum signal, you get spurious predictability.
There are two conventions for continuous series:
- Back-adjusted (ratio or difference): smooths over rolls so returns are “clean,” but the price level is fictional
- Non-adjusted: real prices, but returns spike on every roll
Using back-adjusted prices for returns is correct. But if your data is non-adjusted, or if the adjustment is imperfect, those roll-day jumps leak into features.
Why it matters
A 21-day momentum signal that gets a +2% spike from a roll will mistakenly identify the asset as “trending up” — and the strategy will buy a non-existent rally. Roll jumps are ~uncorrelated with future returns, so they pure-noise the signal and degrade IC.
How it’s implemented
Detection:
def detect_roll_dates(front_raw, threshold_jump=0.03):
"""Detect days with abnormal jumps in NON-adjusted front prices."""
r = np.log(front_raw).diff().abs()
return r > threshold_jump
Uses the non-adjusted price series (front_raw, not prices) so the jump is actually visible.
Masking:
roll_mask = detect_roll_dates(front_raw[common], threshold_jump=0.03)
returns_raw = to_log_returns(prices) # log returns from back-adjusted
returns = mask_roll_returns(returns_raw, roll_mask).fillna(0.0)
Where mask_roll_returns sets returns to NaN on suspected roll dates:
def mask_roll_returns(returns, roll_dates):
return returns.where(~roll_dates.reindex_like(returns).fillna(False), np.nan)
The NaN propagates safely through rolling-window features (pandas rolling skips NaN), and the .fillna(0.0) only applies to the position-PnL multiplication — meaning a roll-day return contributes 0 PnL but doesn’t corrupt the momentum signal.
What the interviewer asks
“Your dataset has front-month continuous futures. How do you handle the roll?”
Answer:
Two-track. The back-adjusted series feeds returns and momentum-style features. The non-adjusted series feeds (a) carry computation (front − back is meaningful only on non-adjusted) and (b) roll-date detection: any absolute log return >3% in the non-adjusted front signals a likely roll, and I mask that date’s return from the back-adjusted series to NaN. This keeps roll noise out of the momentum signal while preserving the legitimate carry signal. Sectors most affected: WTI Crude (monthly roll), Natural Gas (monthly roll, large contango shifts).
Summary Table
| # | Pitfall | One-Line Defense |
|---|---|---|
| 1 | Same-day signal-and-return | .shift(1) on position before multiplying by returns |
| 2 | Train-test target overlap | embargo_days = horizon in every CV split |
| 3 | Future-aware universe filtering | Universe drop uses first half only + active_universe mask |
| 4 | Lookahead winsor caps | expanding(min_history=252).quantile(q) per asset |
| 5 | Time-series feature normalization | mean(axis=1), std(axis=1) per date |
| 6 | Same-day vol sizing + vol explosion | vol.shift(1).clip(lower=vol_floor) |
| 7 | Same-day vol target + leverage explosion | scale.clip(0, 3).shift(1) + min_periods=63 |
| 8 | Aggregated turnover hides offsetting trades | Costs per asset on ` |
| 9 | Holding instruments before they existed | active_universe(t) requires 252d history + recent obs |
| 10 | Roll-day price jumps in momentum | Detect on non-adjusted; mask in back-adjusted |
Talking-point synthesis (for the slide)
If asked “summarize how you defended against lookahead”:
Every line of code answers two questions: what did I know at time
t, and what am I predicting att+h? Three principles enforce the answer: (1) any time-shifted operation uses.shift(1)to lag by at least one day; (2) any sliding-window estimate usesexpandingorrollingwithmin_periods, never full-sample statistics; (3) cross-sectional operations useaxis=1(across assets, same date), notaxis=0(across time). Embargoes equal to the forecast horizon, point-in-time universe masks, and roll-date filtering close the remaining gaps specific to commodities.