Hyperparameter optimisation with a strategy backtesting
Optuna + CV + pybroker = optimal model of choice!
Introduction
In the world of algorithmic trading, the quest for the perfect trading strategy is an ongoing pursuit. One crucial aspect of this journey is the optimization of hyperparameters, which can significantly impact the performance of your trading model. In this Substack post, we'll dive into the process of hyperparameter optimization using Optuna, coupled with cross-validation (CV) and the powerful pybroker library for backtesting.
Let's begin by breaking down the code and understanding the key steps involved in this process. Note the blocks of code and descriptions below are just indicative of how to put hyperparameter optimisation and strategy backtesting together. The details on the strategies are not the focus of this article. For more info of using pybroker with your data, head here:
Integrating your ML model with PyBroker
Introduction Are you tired of using traditional trading strategies that don't always deliver consistent results? If so, you're not alone. Many traders are turning to machine learning signals to improve their trading performance. In this blog post, I'll describe how I integrated PyBroker with machine learning signals to achieve realistic backtesting perfo…
Steps to partake
The optimize_model
function is the heart of our optimization process. It takes in various parameters, including the training data, labels, cross-validation strategy, and other configuration settings. The function then defines several helper functions, such as mcc_eval
or fbeta_eval
, which are used to evaluate the machine learning performance of the model based on different metrics of choice. Notice, that the above mentioned functions are dedicated for both multiclass and binary classifications.
from lightgbm import LGBMClassifier
import numpy as np
import optuna
import pandas as pd
from pybroker import StrategyConfig, Strategy
from pybroker.common import FeeMode, PriceType
from pybroker.scope import register_columns
from sklearn.utils.class_weight import compute_sample_weight
from sklearn.metrics import matthews_corrcoef, fbeta_score
from tscv import GapRollForward
def optimize_model(
train_data,
train_label,
cv,
cv_type=CV_TYPE,
n_trials=50,
strat_type=STRAT_TYPE,
init_cash=INIT_CASH,
costs=COSTS,
prediction_type=LABEL,
eval_score="mcc",
use_sample_weights=False,
bootstrap=False,
):
... <other functions>
data = train_data.copy()
label = train_label.copy()
if cv_type == "roll":
groups = None
else:
groups = train_data.index.values
direction = "maximize"
study = optuna.create_study(direction=direction)
study.optimize(objective, n_trials=n_trials, timeout=11000, n_jobs=1)
return study.best_params
def mcc_eval(y_true, y_pred, eval_sample_weight):
if LABEL == "multiclass":
labels = np.argmax(y_pred, axis=1)
labels[y_pred[:, 0] > 0.5] = 0.0
labels[y_pred[:, 1] > 0.5] = 1.0
labels[y_pred[:, 2] > 0.5] = 2.0
else:
labels = np.round(y_pred)
mcc = matthews_corrcoef(y_true, labels, sample_weight=eval_sample_weight)
return "mcc", mcc, True
def fbeta_eval(y_true, y_pred, eval_sample_weight):
if LABEL == "multiclass":
labels = np.argmax(y_pred, axis=1)
labels[y_pred[:, 0] > 0.5] = 0
labels[y_pred[:, 1] > 0.5] = 1
labels[y_pred[:, 2] > 0.5] = 2
average = "micro"
else:
labels = np.round(y_pred)
average = "binary"
fbeta = fbeta_score(
y_true,
labels,
beta=0.5,
sample_weight=eval_sample_weight,
average=average,
)
return "fbeta", fbeta, True
Note the optimize_model
function takes cv
and cv_type
arguments. Here the type is roll which, with the use of the tscv.GapRollForward
package can be created as:
max_train_size = math.floor(data_length / n_splits) - (gap_size * n_splits)
min_train_size = max_train_size
max_test_size = math.floor(max_train_size / n_splits)
min_test_size = max_test_size
cv = GapRollForward(
min_train_size=min_train_size,
max_train_size=max_train_size,
gap_size=gap_size,
min_test_size=min_test_size,
max_test_size=max_test_size,
roll_size=max_test_size,
)
The objective
function is where the magic happens. This function is passed to Optuna, which will explore the hyperparameter space and find the optimal combination. Inside the objective
function, we define the hyperparameters to be tuned, including the number of estimators, number of leaves, maximum depth, learning rate, regularization parameters, and more. We also define the strategy-specific parameters, such as the buy threshold, number of stocks to hold, and allocation weights.
params = {
"n_estimators": trial.suggest_int("n_estimators", 10, 1000, step=10),
"num_leaves": trial.suggest_int("num_leaves", 2, 256, step=4),
"max_depth": trial.suggest_int("max_depth", 3, 25, step=1),
"learning_rate": trial.suggest_float(
"learning_rate", 0.001, 1.0, step=0.005, log=True,
),
"reg_lambda": trial.suggest_float("reg_lambda", 0.01, 10.0, step=0.05),
"reg_alpha": trial.suggest_float("reg_alpha", 0.01, 10.0, step=0.05),
"subsample": trial.suggest_float("subsample", 0.3, 0.9, step=0.05),
"subsample_freq": trial.suggest_int("subsample_freq", 1, 7),
"colsample_bytree": trial.suggest_float(
"colsample_bytree", 0.2, 0.9, step=0.05
),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 100, step=2),
"boosting_type": trial.suggest_categorical(
"boosting_type", ["gbdt", "rf", "dart"]
),
}
strat_params = {
"buy_threshold": trial.suggest_float("buy_threshold", 0.15, 0.5, step=0.01),
"number_stocks_to_hold": trial.suggest_int("number_stocks_to_hold", 2, 10),
"allocation_weights": trial.suggest_categorical(
"allocation_weights", ["mean", "softmax"]
),
}
The function then proceeds to perform the cross-validation process, where it splits the data into training and testing sets. For each split, the code trains the LightGBM model, and evaluates the performance using the specified metric (MCC, or F-beta score). If the performance score is below a certain threshold, the function assigns a negative Sharpe ratio to the trial, effectively penalizing it.
scores = []
sample_weights = compute_sample_weight(
class_weight="balanced",
y=label.values,
)
sample_weights = pd.Series(sample_weights, index=label.index)
if eval_score == "mcc":
eval_metric = mcc_eval
thresh_score = 0.1
else:
eval_metric = fbeta_eval
thresh_score = 0.4
for train_index, test_index in cv.split(data, groups=groups):
train_split = data.iloc[train_index]
label_train_split = label.iloc[train_index]
test_split = data.iloc[test_index]
label_test_split = label.iloc[test_index]
data_backtest = test_split[
["date", "close", "open", "high", "low", "volume", "symbol"]
]
train_split = train_split.drop(data_backtest.columns, axis=1)
test_split = test_split.drop(data_backtest.columns, axis=1)
if use_sample_weights:
sample_weight_train = sample_weights.iloc[train_index]
sample_weight_eval = sample_weights.iloc[test_index]
else:
sample_weight_train = None
sample_weight_eval = None
if prediction_type == "multiclass":
model = LGBMClassifier(
objective=LABEL,
random_state=42,
early_stopping_rounds=30,
n_jobs=mp.cpu_count(),
verbosity=-1,
num_class=3,
**params,
)
model.fit(
train_split.values,
label_train_split.values,
sample_weight=sample_weight_train,
eval_set=[(test_split.values, label_test_split.values)],
eval_metric=eval_metric,
eval_sample_weight=[sample_weight_eval],
)
pred = model.predict_proba(test_split)
else:
model = LGBMClassifier(
objective=LABEL,
random_state=42,
early_stopping_rounds=30,
n_jobs=mp.cpu_count(),
verbosity=-1,
**params,
)
model.fit(
train_split.values,
label_train_split.values,
sample_weight=sample_weight_train,
eval_set=[(test_split.values, label_test_split.values)],
eval_metric=eval_metric,
eval_sample_weight=[sample_weight_eval],
)
pred = model.predict_proba(test_split)
if eval_score == "mcc":
score = calc_mcc(
label_test_split, pred, eval_sample_weight=sample_weight_eval
)
else:
score = calc_fbeta(
label_test_split, pred, eval_sample_weight=sample_weight_eval
)
if score < thresh_score:
sharpe = -1
scores.append(sharpe)
else:
pred = pred[:, 1]
data_backtest["pred"] = pred
If the performance score meets the threshold, the function proceeds to the backtesting phase. It creates a StrategyConfig
object, which defines the trading parameters, such as the maximum number of long positions, fees, and initial cash. The function then creates a Strategy
object using the pybroker library, adds the trading strategy, and sets the position size handler. Finally, it runs the backtest and calculates the Sharpe ratio, which is used as the objective function for Optuna to optimize.
register_columns("pred")
config = StrategyConfig(
max_long_positions=strat_params["number_stocks_to_hold"],
fee_amount=costs,
fee_mode=FeeMode.ORDER_PERCENT,
buy_delay=1,
sell_delay=1,
exit_on_last_bar=True,
initial_cash=init_cash,
enable_fractional_shares=True,
exit_sell_fill_price=PriceType.AVERAGE,
)
start_date = data_backtest["date"].iloc[0].strftime("%Y-%m-%d")
end_date = data_backtest["date"].iloc[-1].strftime("%Y-%m-%d")
symbols = data_backtest["symbol"].unique().tolist()
strategy = Strategy(data_backtest, start_date, end_date, config=config)
strategy.add_execution(strat, symbols)
pos_size_handler = positioning_picker(
allocation_weights=strat_params["allocation_weights"]
)
strategy.set_pos_size_handler(pos_size_handler)
result = strategy.backtest(calc_bootstrap=bootstrap, warmup=warmup)
if bootstrap:
conf_intervals = result.bootstrap.conf_intervals
sharpe = conf_intervals.loc[("Sharpe Ratio", "95%")].mean()
else:
results_df = result.metrics_df
sharpe = results_df[results_df["name"] == "sharpe"]["value"].values[
0
]
if sharpe == 0.0:
sharpe = -1
scores.append(sharpe)
The optimize_model
function then returns the final Sharpe ratio, which Optuna uses to guide the search for the optimal hyperparameters. The best parameters are then saved to a file for future reference.
final_score = np.median(scores)
print(f"Final robust Sharpe: {final_score}")
return final_score
Conclusions
In conclusion, this code demonstrates a powerful approach to optimizing trading strategies by combining hyperparameter tuning, cross-validation, and backtesting. By leveraging Optuna, LightGBM, and the pybroker library, you can efficiently explore the hyperparameter space and find the optimal model configuration that maximizes the Sharpe ratio of your trading strategy. This comprehensive workflow ensures that your trading model is not only well-tuned but also thoroughly tested and validated, giving you the confidence to deploy it in the real-world markets.
See the full code below:
def optimize_model(
train_data,
train_label,
cv,
cv_type=CV_TYPE,
n_trials=50,
strat_type=STRAT_TYPE,
init_cash=INIT_CASH,
costs=COSTS,
prediction_type=LABEL,
eval_score="mcc",
use_sample_weights=False,
bootstrap=False,
):
def objective(trial):
def mcc_eval(y_true, y_pred, eval_sample_weight):
if LABEL == "multiclass":
labels = np.argmax(y_pred, axis=1)
labels[y_pred[:, 0] > 0.5] = 0.0
labels[y_pred[:, 1] > 0.5] = 1.0
labels[y_pred[:, 2] > 0.5] = 2.0
else:
labels = np.round(y_pred)
mcc = matthews_corrcoef(y_true, labels, sample_weight=eval_sample_weight)
return "mcc", mcc, True
def fbeta_eval(y_true, y_pred, eval_sample_weight):
if LABEL == "multiclass":
labels = np.argmax(y_pred, axis=1)
labels[y_pred[:, 0] > 0.5] = 0
labels[y_pred[:, 1] > 0.5] = 1
labels[y_pred[:, 2] > 0.5] = 2
average = "micro"
else:
labels = np.round(y_pred)
average = "binary"
fbeta = fbeta_score(
y_true,
labels,
beta=0.5,
sample_weight=eval_sample_weight,
average=average,
)
return "fbeta", fbeta, True
params = {
"n_estimators": trial.suggest_int("n_estimators", 10, 1000, step=10),
"num_leaves": trial.suggest_int("num_leaves", 2, 256, step=4),
"max_depth": trial.suggest_int("max_depth", 3, 25, step=1),
"learning_rate": trial.suggest_float(
"learning_rate", 0.001, 1.0, step=0.005, log=True,
),
"reg_lambda": trial.suggest_float("reg_lambda", 0.01, 10.0, step=0.05),
"reg_alpha": trial.suggest_float("reg_alpha", 0.01, 10.0, step=0.05),
"subsample": trial.suggest_float("subsample", 0.3, 0.9, step=0.05),
"subsample_freq": trial.suggest_int("subsample_freq", 1, 7),
"colsample_bytree": trial.suggest_float(
"colsample_bytree", 0.2, 0.9, step=0.05
),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 100, step=2),
"boosting_type": trial.suggest_categorical(
"boosting_type", ["gbdt", "rf", "dart"]
),
}
strat_params = {
"buy_threshold": trial.suggest_float("buy_threshold", 0.15, 0.5, step=0.01),
"number_stocks_to_hold": trial.suggest_int("number_stocks_to_hold", 2, 10),
"allocation_weights": trial.suggest_categorical(
"allocation_weights", ["mean", "softmax"]
),
}
if strat_type == "long_wait":
strat_params.update(
{"confirm_period": trial.suggest_int("confirm_period", 2, 5, step=1)}
)
strat = strategy_picker(
strat_type=strat_type,
buy_threshold=float(strat_params["buy_threshold"]),
confirm_period=int(strat_params["confirm_period"]),
)
warmup = int(strat_params["confirm_period"]) + 1
else:
strat = strategy_picker(
strat_type=strat_type,
buy_threshold=float(strat_params["buy_threshold"]),
)
warmup = None
scores = []
sample_weights = compute_sample_weight(
class_weight="balanced",
y=label.values,
)
sample_weights = pd.Series(sample_weights, index=label.index)
if eval_score == "mcc":
eval_metric = mcc_eval
thresh_score = 0.1
elif eval_score == "f1":
eval_metric = f1_eval
thresh_score = 0.4
else:
eval_metric = fbeta_eval
thresh_score = 0.4
for train_index, test_index in cv.split(data, groups=groups):
train_split = data.iloc[train_index]
label_train_split = label.iloc[train_index]
test_split = data.iloc[test_index]
label_test_split = label.iloc[test_index]
data_backtest = test_split[
["date", "close", "open", "high", "low", "volume", "symbol"]
]
train_split = train_split.drop(data_backtest.columns, axis=1)
test_split = test_split.drop(data_backtest.columns, axis=1)
if use_sample_weights:
sample_weight_train = sample_weights.iloc[train_index]
sample_weight_eval = sample_weights.iloc[test_index]
else:
sample_weight_train = None
sample_weight_eval = None
if prediction_type == "multiclass":
model = LGBMClassifier(
objective=LABEL,
random_state=42,
early_stopping_rounds=30,
n_jobs=mp.cpu_count(),
verbosity=-1,
num_class=3,
**params,
)
model.fit(
train_split.values,
label_train_split.values,
sample_weight=sample_weight_train,
eval_set=[(test_split.values, label_test_split.values)],
eval_metric=eval_metric,
eval_sample_weight=[sample_weight_eval],
)
pred = model.predict_proba(test_split)
else:
model = LGBMClassifier(
objective=LABEL,
random_state=42,
early_stopping_rounds=30,
n_jobs=mp.cpu_count(),
verbosity=-1,
**params,
)
model.fit(
train_split.values,
label_train_split.values,
sample_weight=sample_weight_train,
eval_set=[(test_split.values, label_test_split.values)],
eval_metric=eval_metric,
eval_sample_weight=[sample_weight_eval],
)
pred = model.predict_proba(test_split)
if eval_score == "mcc":
score = calc_mcc(
label_test_split, pred, eval_sample_weight=sample_weight_eval
)
else:
score = calc_fbeta(
label_test_split, pred, eval_sample_weight=sample_weight_eval
)
if score < thresh_score:
sharpe = -1
scores.append(sharpe)
else:
pred = pred[:, 1]
data_backtest["pred"] = pred
register_columns("pred")
config = StrategyConfig(
max_long_positions=strat_params["number_stocks_to_hold"],
fee_amount=costs,
fee_mode=FeeMode.ORDER_PERCENT,
buy_delay=1,
sell_delay=1,
exit_on_last_bar=True,
initial_cash=init_cash,
enable_fractional_shares=True,
exit_sell_fill_price=PriceType.AVERAGE,
)
start_date = data_backtest["date"].iloc[0].strftime("%Y-%m-%d")
end_date = data_backtest["date"].iloc[-1].strftime("%Y-%m-%d")
symbols = data_backtest["symbol"].unique().tolist()
strategy = Strategy(data_backtest, start_date, end_date, config=config)
strategy.add_execution(strat, symbols)
pos_size_handler = positioning_picker(
allocation_weights=strat_params["allocation_weights"]
)
strategy.set_pos_size_handler(pos_size_handler)
result = strategy.backtest(calc_bootstrap=bootstrap, warmup=warmup)
if bootstrap:
conf_intervals = result.bootstrap.conf_intervals
sharpe = conf_intervals.loc[("Sharpe Ratio", "95%")].mean()
else:
results_df = result.metrics_df
sharpe = results_df[results_df["name"] == "sharpe"]["value"].values[
0
]
if sharpe == 0.0:
sharpe = -1
scores.append(sharpe)
final_score = np.median(scores)
print(f"Final robust Sharpe: {final_score}")
return final_score
data = train_data.copy()
label = train_label.copy()
if cv_type == "roll":
groups = None
else:
groups = train_data.index.values
direction = "maximize"
study = optuna.create_study(direction=direction)
study.optimize(objective, n_trials=n_trials, timeout=11000, n_jobs=1)
return study.best_params
I like to listen to these articles but having code blocks in the middle of the text makes this unbearable. Would it be possible to put the code at the end of the article?