Data pre-processing for equity predictions
A step-by-step guide to fetching, pre-processing, and feature generation for machine learning
Introduction
In the vast field of machine learning, harnessing the power of data is paramount to achieving accurate predictions and uncovering valuable insights. In this substack post, we will explore the crucial steps of fetching data, pre-processing it, and generating features for a machine learning project involving predicting US equities. By following this step-by-step guide, you will gain a solid understanding of how to manipulate financial data to build robust predictive models.
Note, this entry does not cover building machine learning algorithms to predict equities; this step will be covered in a separate substack!
Step 1: Data fetching and initial pre-processing
To begin our journey, we need to gather reliable financial data; here, the index of choice is Russell 1000. In our Python environment, we will leverage the alpacatradeapi and yfinance libraries to access asset information and historical stock market data, respectively. alpacatradeapi can be replaced by any broker (and thus package) of your choice, it has been used here to make sure the stocks we are about to fetch can be actually traded there. Let's start by fetching the data using the following code:
from alpacatradeapi.rest import REST
import yfinance as yf
import pandas as pd
import numpy as np
import datetime
import multiprocessing as mp
import warnings
import os
def clean_asset_data(data):
df = data.copy()
df = df[df['Exchange'] != 'OTC']
df = df[df['Exchange'] != 'ARCA']
df = df[df['Tradable'] == True]
df = df[df['Fractionable'] == True]
df['Name'] = df['Name'].str.lower()
df = df[df['Name'].str.contains('etf') == False]
df = df[df['Name'].str.contains('fund') == False]
df = df[df['Name'].str.contains('depositary') == False]
df = df[df['Name'].str.contains('subordinated') == False]
df = df[df['Name'].str.contains('voting') == False]
df = df[df['Name'].str.contains('trust') == False]
df = df[df['Name'].str.contains('futures') == False]
df = df[df['Name'].str.contains('units') == False]
df = df[df['Name'].str.contains('etn') == False]
df = df[df['Name'].str.contains('etns') == False]
return df
warnings.filterwarnings('ignore')
BENCHMARK = '^RUI'
TODAY = datetime.date.today().strftime('%Y-%m-%d')
ALPACA_API_KEY = os.getenv('ALPACA_API_KEY')
ALPACA_SECRET_KEY= os.getenv('ALPACA_SECRET_KEY')
BASE_URL = 'https://paper-api.alpaca.markets'
api = REST(keyid=ALPACA_API_KEY, secretkey=ALPACA_SECRET_KEY, baseurl=BASE_URL, apiversion='v2')
active_assets = api.list_assets(status='active', asset_class='us_equity')
assets = []
for asset in active_assets:
assets.append([asset.name, asset.symbol, asset.exchange, asset.fractionable, asset.tradable])
assets_df = pd.DataFrame(assets, columns=['Name', 'Symbol', 'Exchange', 'Fractionable', 'Tradable'])
assets_df = clean_asset_data(data=assets_df)
russell_stocks = pd.read_csv('Data/russell1000.csv')['Ticker'].tolist()
symbols = assets_df['Symbol'].tolist()
symbols = [s.strip() for s in symbols if s in russell_stocks]
symbols.append(BENCHMARK)In this code snippet, we import the necessary libraries and define the parameters for data retrieval. By utilizing APIs like the AlpacaTradeAPI and Yahoo Finance (coming shortly), we gain access to a wealth of financial data that will serve as the foundation for our machine learning project. The russell_stocks list can be fetched from here. The pre-processing function, clean_asset_data, makes sure we do not include any funds or trusts, or tickers that actually keep track of the prices of, for instance, voting or depositary stock.
After we have set up the necessary variables, the code snippet below fetches the necessary data using the Yahoo Finance API, as well as does further pre-processing, such as filling NaN values, dropping stocks with a short history, or stocks which are buggy to begin with. Here, we are initially fetching daily data; however, ultimately we are interested in weekly, Friday data, though this can be changed by completely abandoning the groupby function (to keep the daily data, as specified by the interval='1d' parameter inside yf.download) or changing to any other day, instead of W-FRI:
if __name__ == "__main__":
historical_data = yf.download(symbols, start='2009-12-31', end=TODAY, period='max', interval='1d', threads=mp.cpu_count(), auto_adjust=True, repair=False, ignore_tz=True)
historical_data.index.name = 'Date'
data_symbols = historical_data.columns.levels[1].values.tolist()
benchmark = historical_data.xs(BENCHMARK, level=1, axis=1)['Close'].to_frame('Close')
benchmark.index = pd.to_datetime(benchmark.index)
benchmark = benchmark.sort_values(by='Date')
benchmark = benchmark.groupby(pd.Grouper(freq='W-FRI')).last()
stock_dataframes = []
for symbol in data_symbols:
stock = historical_data.xs(symbol, level=1, axis=1)
stock.index = pd.to_datetime(stock.index)
stock = stock.sort_values(by='Date')
open_prices = stock['Open'].groupby(pd.Grouper(freq='W-FRI')).first()
stock = stock.groupby(pd.Grouper(freq='W-FRI')).agg({'Close': 'last', 'Open': 'first', 'High': 'max', 'Low': 'min', 'Volume': 'sum'})
if not symbol == BENCHMARK:
stock = stock.replace(0, np.NaN)
stock = stock.fillna(stock.rolling(window=5, min_periods=1).median())
stock = stock.dropna(axis=0)
if stock.empty:
print(f'Some error with {symbol}, dropping it...')
continue
if stock.index[0] > TRAIN_DATE:
print(f'Oops, {symbol} has traded for less than 2 years! Dropping it...')
continue
benchmark = benchmark.loc[stock.index[0]:]
benchmark['Symbol'] = stock['Close']
if benchmark['Close'].isna().sum() > 10:
print(f'Oops, {symbol} has something wrong with prices! Dropping it...')
continue
stock['Symbol'] = symbol
stock_dataframes.append(stock)
print(f'Done: {symbol}')
full_stock_dataframe = pd.concat(stock_dataframes, axis=0)
print(f'Last date in df: {full_stock_dataframe.index[-1]}')
print('Saving file to pickle...')
full_stock_dataframe.to_pickle('Data/Russell_asset_historical_data.p')
print('Done!')All done and stored in the pickle file! Let’s head to the second step now which is really the core of this post.
Step 2: Feature generation and secondary pre-processing
With our fetched and initially pre-processed data in hand, we can now generate meaningful features that will enhance the predictive power of our models, as well as perform the second layer of pre-processing. Let's take a look at the code snippet below; make sure you have tsfresh installed:
import pandas as pd
import numpy as np
from tsfresh.feature_extraction.settings import EfficientFCParameters
from numpy.random import seed
import random
import multiprocessing as mp
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
seed(42)
random.seed(42)
def generate_extract_settings(settings):
keys = ['energy_ratio_by_chunks', 'augmented_dickey_fuller', 'mean_change', 'time_reversal_asymmetry_statistic', 'index_mass_quantile']
filtered_settings = {new_key: settings[new_key] for new_key in keys}
return filtered_settings
def prepare_panel_dataset(asset_data, symbol):
asset_df = asset_data[asset_data['Symbol'] == symbol]
asset_df.sort_values(by='Date', inplace=True)
benchmark_srs = asset_data.loc[asset_data['Symbol'] == BENCHMARK, 'Close']
extract_settings = generate_extract_settings(EfficientFCParameters())
technical_features = add_tech_indicators(price_df=asset_df, benchmark_srs=benchmark_srs)
abstract_features = add_abstract_factors(price_df=asset_df, settings=extract_settings)
close, open_, high, low, volume_ = (asset_df['Close'], asset_df['Open'], asset_df['High'], asset_df['Low'], asset_df['Volume'])
panel_data = technical_features.join(abstract_features)
panel_data = panel_data.replace([np.inf, -np.inf], np.NaN)
panel_data = panel_data.fillna(panel_data.rolling(window=5, min_periods=1).median())
panel_data = panel_data.dropna(axis=0)
panel_data = scale_fracdiff(panel_data, window=20, frac_weight=0.7, mode='valid')
panel_data['unique_id'] = symbol
panel_data['close'] = close
panel_data['open'] = open_
panel_data['high'] = high
panel_data['low'] = low
panel_data['volume'] = volume_
# panel_data['y'] = 'THIS IS ANY LABEL OF YOUR CHOICE'
return panel_data
def update(*symbol):
pbar.update()
BENCHMARK = '^RUI'
historical_data = pd.read_pickle('Data/Russell_asset_historical_data.p')
symbol_list = historical_data['Symbol'].unique().tolist()
symbol_list.remove(BENCHMARK)
pbar = tqdm(total=len(symbol_list))
if __name__ == "__main__":
pool = mp.Pool(processes=mp.cpu_count())
print('Getting results...')
results_raw = [pool.apply_async(prepare_panel_dataset, args=(historical_data, s), callback=update) for s in symbol_list]
results = [res.get() for res in results_raw]
print('Closing pool...')
pbar.close()
pool.close()
pool.join()
print('Putting stuff together...')
panel_dataset = pd.concat(results, axis=0)
panel_dataset.index.name = 'ds'
panel_dataset.sort_values(by=['unique_id', 'ds'], inplace=True)
print('Saving file to pickle...')
panel_dataset.to_pickle('Data/preprocessed_data_regimes.p')
print('Done!')Before moving on to the helper functions, let’s see what is going on in the code snippet above. The historical_data we have just fetched in the first step is loaded back and put into a multiprocessing pool in order to asynchronously generate features and pre-process data for every stock of the Russell 1000 index with the use of the prepare_panel_dataset function. The prepare_panel_dataset function takes each stock and the benchmark itself (here the Russell 1000 index) and later generates technical features (add_tech_indicators) and abstract features (add_abstract_factors; I called them like that, they are actually useful statistical features from the tsfresh package), and later scales them using fractional differentiation inside scale_fracdiff. The unique_id, close, high, low, open, and volume columns are necessary for the strategy backtesting via PyBroker based on the ML predictions, something I already wrote about here. The y variable is for you to decide, i.e., what specifically you want to predict; overall, this topic is planned to be covered in a separate post.
Let’s take a look at each of the aforementioned functions now.
Adding technical indicators
The code snippet below shows add_tech_indicators in detail; make sure to install ta, talib, and statsmodels:
from ta import momentum, volume, volatility, trend
import talib
from statsmodels.regression.rolling import RollingOLS
from statsmodels.tools.tools import add_constant
def add_tech_indicators(price_df, benchmark_srs):
def add_momentum(close, high, low, volume_):
rsi = momentum.RSIIndicator(close=close).rsi()
ultimate = momentum.UltimateOscillator(high=high, low=low, close=close, window1=5, window2=13, window3=26).ultimate_oscillator()
awesome = momentum.AwesomeOscillatorIndicator(high=high, low=low, window2=26).awesome_oscillator()
pct_volume = momentum.PercentageVolumeOscillator(volume=volume_).pvo()
roc_1w = close.copy().pct_change()
roc_1m = momentum.ROCIndicator(close=close, window=5).roc()
roc_3m = momentum.ROCIndicator(close=close, window=13).roc()
roc_6m = momentum.ROCIndicator(close=close, window=26).roc()
adxr = talib.ADXR(high, low, close, timeperiod=13)
cmo = talib.CMO(close, timeperiod=13)
plus_di = talib.PLUS_DI(high, low, close, timeperiod=26)
minus_di = talib.MINUS_DI(high, low, close, timeperiod=26)
trix = talib.TRIX(close, timeperiod=13)
momentum_df = pd.concat([rsi, ultimate, awesome, pct_volume, roc_1w, roc_1m, roc_3m, roc_6m, adxr, cmo, plus_di, minus_di, trix], axis=1)
momentum_df.columns = ['RSI', 'Ultimate', 'Awesome', 'PVO', 'ROC_1W', 'ROC_1M', 'ROC_3M', 'ROC_12M', 'ADXR', 'CMO', 'Plus_DI', 'Minus_DI', 'TRIX']
return momentum_df
def add_volatility(close, high, low, open_):
atr = volatility.AverageTrueRange(high=high, low=low, close=close).average_true_range()
atr = atr.replace(0, np.NaN)
bb = volatility.BollingerBands(close=close, window=13).bollinger_pband()
ulc = volatility.UlcerIndex(close=close).ulcer_index()
typical_px = (close + high + low + open_) / 4
mean_std_semi = typical_px.ewm(span=26).std()
mean_std_qtr = typical_px.ewm(span=13).std()
mean_std_month = typical_px.ewm(span=5).std()
beta = talib.BETA(high, low, timeperiod=5)
volatility_df = pd.concat([atr, bb, ulc, mean_std_semi, mean_std_month, mean_std_qtr, beta], axis=1)
volatility_df.columns = ['ATR', 'BB', 'Ulcer', 'Mean_Std_Semi', 'Mean_Std_Month', 'Mean_Std_Qtr', 'Beta']
return volatility_df
def add_volume(close, high, low, volume_):
ad = talib.ADOSC(high=high, low=low, close=close, volume=np.log(volume_), fastperiod=3, slowperiod=10)
obv = volume.OnBalanceVolumeIndicator(close=close, volume=np.log(volume_)).on_balance_volume()
fi = volume.ForceIndexIndicator(close=close, volume=np.log(volume_)).force_index()
mfi = volume.MFIIndicator(high=high, low=low, close=close, volume=np.log(volume_)).money_flow_index()
volume_df = pd.concat([obv, fi, mfi, ad], axis=1)
volume_df.columns = ['OBV', 'Force_Inx', 'MFI', 'Chaikin']
return volume_df
def add_trend(close, high, low):
macd = trend.MACD(close=close).macd_diff()
ema_diff = talib.APO(close, fastperiod=5, slowperiod=20, matype=talib.MA_Type.EMA)
cci = trend.CCIIndicator(high=high, low=low, close=close, window=13).cci()
kst = trend.KSTIndicator(close=close, roc1=5, roc2=10, roc3=15, roc4=20, window1=5, window2=5, window3=5, window4=10, nsig=7).kst_diff()
dpo = trend.DPOIndicator(close=close, window=20).dpo()
psar = trend.PSARIndicator(high=high, low=low, close=close).psar()
vortex = trend.VortexIndicator(high=high, low=low, close=close).vortex_indicator_diff()
trend_df = pd.concat([macd, cci, ema_diff, kst, dpo, psar, vortex], axis=1)
trend_df.columns = ['MACD', 'CCI', 'EMA_diff', 'KST', 'DPO', 'PSAR', 'Vortex']
return trend_df
def add_benchmark_stats(close, benchmark_close):
benchmark_close = benchmark_close.copy().loc[close.index[0]:close.index[-1]]
close = close.copy()
if len(close) != len(benchmark_close):
benchmark_close = benchmark_close.to_frame('Benchmark')
benchmark_close['Close'] = close
benchmark_close['Close'] = benchmark_close['Close'].fillna(method='ffill')
close = benchmark_close['Close']
benchmark_close = benchmark_close['Benchmark']
stock_log_rets = np.log(close).diff()
benchmark_log_rets = np.log(benchmark_close).diff()
quarterly_outperform = np.log(close).diff(13) - np.log(benchmark_close).diff(13)
monthly_outperform = np.log(close).diff(5) - np.log(benchmark_close).diff(5)
weekly_outperform = stock_log_rets - benchmark_log_rets
mod_month = RollingOLS(benchmark_log_rets, stock_log_rets, window=5)
rolling_params = mod_month.fit(params_only=True)
monthly_beta = rolling_params.params.squeeze()
mod_q = RollingOLS(benchmark_log_rets, stock_log_rets, window=13)
rolling_params = mod_q.fit(params_only=True)
quarterly_beta = rolling_params.params.squeeze()
monthly_correl = stock_log_rets.rolling(window=5, min_periods=5).corr(benchmark_log_rets)
quarterly_correl = stock_log_rets.rolling(window=13, min_periods=13).corr(benchmark_log_rets)
benchmark_stats_df = pd.concat([monthly_outperform, weekly_outperform, quarterly_outperform, monthly_beta, quarterly_beta, monthly_correl, quarterly_correl], axis=1)
benchmark_stats_df.columns = ['Index_outperf_1M', 'Index_outperf_1W', 'Index_outperf_1Q', 'Beta_1M', 'Beta_1Q', 'Monthly_correl', 'Quarterly_correl']
return benchmark_stats_df
def add_liquidity(close, volume_):
def add_amihoud(px_c, px_v):
dollar_volume = px_c * px_v
returns = close.pct_change()[1:]
amihoud = pd.concat([dollar_volume, returns], axis=1)
amihoud.columns = ['Dollar Volume', 'Returns']
amihoud = amihoud.ewm(span=13).mean()
amihoud = abs(amihoud['Returns']).div(amihoud['Dollar Volume']) * 1000000
amihoud = amihoud.to_frame('Amihoud')
return amihoud
def add_kyle(px_c, px_v):
returns = px_c.pct_change()[1:]
sign_rets = returns.apply(lambda x: 1 if x > 0 else -1)
dollar_volume = sign_rets * np.log(px_c * px_v)
exog = add_constant(dollar_volume.values, prepend=True)[1:]
mod = RollingOLS(returns, exog, window=13)
rolling_params = mod.fit(params_only=True)
params = rolling_params.params
return params
kyle = add_kyle(close, volume_)
assert isinstance(kyle, pd.DataFrame)
kyle = kyle.iloc[:, 1]
amih = add_amihoud(close, volume_)
data_liq = pd.concat([kyle, amih], axis=1)
data_liq.columns = ['Kyle_L', 'Amih_L']
return data_liq
def add_corwin_schultz(high_series, low_series):
def get_beta(high, low, window=5):
hl = np.log(high.values / low.values) ** 2
hl = pd.Series(hl, index=high.index)
beta = hl.rolling(window=2).sum()
beta = beta.rolling(window=window).mean()
return beta.dropna()
def get_gamma(high, low):
px_h = high.rolling(window=2).max()
px_l = low.rolling(window=2).min()
gamma = np.log(px_h.values / px_l.values) ** 2
gamma = pd.Series(gamma, index=high.index)
return gamma.dropna()
def get_alpha(beta, gamma):
den = 3 - 2 * 2 ** .5
alpha = (2 ** .5 - 1) * (beta ** .5) / den
alpha -= (gamma / den) ** .5
alpha[alpha < 0] = 0
return alpha.dropna()
def get_corwin_schultz(high, low, window=5):
beta = get_beta(high, low, window)
gamma = get_gamma(high, low)
alpha = get_alpha(beta, gamma)
spread = 2 * (np.exp(alpha) - 1) / (1 + np.exp(alpha))
start_time = pd.Series(high.index[0:spread.shape[0]], index=spread.index)
spread = pd.concat([spread, start_time], axis=1)
spread.columns = ['Spread', 'Start_Time']
return spread['Spread']
cw_spread = get_corwin_schultz(high_series, low_series, window=5)
cw_spread = cw_spread.to_frame('Corwin_Schultz')
return cw_spread
px_close = price_df['Close']
px_high = price_df['High']
px_low = price_df['Low']
px_volume = price_df['Volume']
px_open = price_df['Open']
px_benchmark = benchmark_srs.copy()
tech_momentum = add_momentum(px_close, px_high, px_low, px_volume)
tech_vol = add_volatility(px_close, px_high, px_low, px_open)
tech_volm = add_volume(px_close, px_high, px_low, px_volume)
tech_trend = add_trend(px_close, px_high, px_low)
liquidity = add_liquidity(px_close, px_volume)
corwin_schultz = add_corwin_schultz(px_high, px_low)
benchmark_stats = add_benchmark_stats(px_close, px_benchmark)
tech_indicators = pd.concat([tech_momentum, tech_trend, tech_volm, tech_vol, liquidity, corwin_schultz, benchmark_stats], axis=1)
return tech_indicatorsThere are bunch of different technical factors split between several groups: momentum, volatility, volume, trend, liquidity, and benchmark-related. The parameters inside specific functions have been supported by my experience, although you could change it as you please. The liquidity features (including corwin_schultz) have been inspired by Marcos Lopez de Prado’s work; the magic 1000000 by amihoud is there to simply enlarge the feature, as it usually is very close to 0.
Let’s take a look at add_abstract_factors now.
Adding statistical (“abstract”) factors
For the code snippet below, you can optionally have swifter installed (on the top of the packages from above); I use swifter to speed up some of the processes below but you don’t need to and can simply remove inside the appropriate lines:
from statsmodels.regression.rolling import RollingOLS
from statsmodels.tools.tools import add_constant
from numpy.linalg import LinAlgError
from statsmodels.tsa.stattools import adfuller
from statsmodels.tools.sm_exceptions import MissingDataError
import functools
def add_abstract_factors(price_df, settings):
def _roll(a, shift):
if not isinstance(a, np.ndarray):
a = np.asarray(a)
idx = shift % len(a)
return np.concatenate([a[-idx:], a[:-idx]])
def add_energy_ratio(price_srs, params, window=26):
def energy_ratio_by_chunks(x, param_):
full_series_energy = np.sum(x ** 2)
num_segments = param_["num_segments"]
segment_focus = param_["segment_focus"]
assert segment_focus < num_segments
assert num_segments > 0
if full_series_energy == 0:
return np.NaN
else:
res_data = np.sum(np.array_split(x, num_segments)[segment_focus] ** 2.0) / full_series_energy
return res_data
price_ = price_srs.copy()
price_ = price_.swifter.rolling(window=window).apply(energy_ratio_by_chunks, args=(params[-1],))
return price_
def add_time_reversal_stats(price_srs, window=26):
def time_reversal_asymmetry_statistic(x, lag):
n = len(x)
x = np.asarray(x)
if 2 * lag >= n:
return 0
else:
one_lag = _roll(x, -lag)
two_lag = _roll(x, 2 * -lag)
return np.mean(
(two_lag * two_lag * one_lag - one_lag * x * x)[0: (n - 2 * lag)]
)
price_ = price_srs.copy()
price_ = price_.swifter.rolling(window=window).apply(time_reversal_asymmetry_statistic, args=(1,))
return price_
def add_augmented_dickey_fuller(price_srs, params, window=26):
def augmented_dickey_fuller(x, param_):
@functools.lru_cache()
def compute_adf(autolag_):
try:
return adfuller(x, autolag=autolag_)
except LinAlgError:
return np.NaN, np.NaN, np.NaN
except ValueError:
return np.NaN, np.NaN, np.NaN
except MissingDataError:
return np.NaN, np.NaN, np.NaN
autolag = 'AIC'
adf = compute_adf(autolag)
if param_["attr"] == "teststat":
return adf[0]
elif param_["attr"] == "pvalue":
return adf[1]
elif param_["attr"] == "usedlag":
return adf[2]
else:
return np.NaN
price_ = price_srs.copy()
price_ = price_.swifter.rolling(window=window).apply(augmented_dickey_fuller, args=(params[1],))
return price_
def mean_change(x):
x = np.asarray(x)
return (x[-1] - x[0]) / (len(x) - 1) if len(x) > 1 else np.NaN
typical_price = np.log((price_df['Open'] + 2 * price_df['Close'] + price_df['High'] + price_df['Low']) / 5)
energy_ratio = add_energy_ratio(typical_price, params=settings['energy_ratio_by_chunks'], window=26)
time_reversal = add_time_reversal_stats(typical_price, window=26)
aug_dfuller = add_augmented_dickey_fuller(typical_price, settings['augmented_dickey_fuller'], window=26)
mean_chg = typical_price.swifter.rolling(window=26).apply(mean_change)
abstract_factors = pd.concat([energy_ratio, time_reversal, aug_dfuller, mean_chg], axis=1)
abstract_factors.columns = ['energy_ratio', 'time_reversal_asymmetry', 'augmented_dfuller_pvalue', 'mean_change']
return abstract_factorsThe statistical factors above are actually some of the features that tsfresh has to offer, which I have found to be the most useful in predictive tasks. Again, some parameters (e.g., window) inside the functions given are arbitrary.
Finally, let’s see the scale_fracdiff function I use for scaling data.
Fractional differentiation
The concept of fractional differentiation is best explained in the Marcos Lopez de Prado’s book or just in the documentation of the package I have used: FracDiff. Generally:
Fractional differentiation is a technique used in time series analysis to balance between stationarity and non-stationarity of data. Stationary time series have stable statistical properties, while non-stationary time series exhibit evolving patterns. Fractional differentiation involves applying fractional differencing operators to adjust the degree of differencing applied to the data. By selecting an appropriate fractional differencing order, we can reduce non-stationarity in highly non-stationary data or capture long-term dependencies in already stationary data. This balancing act allows us to transform the data into a more manageable and analyzable form using traditional stationary-based techniques, such as ARIMA models, while retaining important patterns and characteristics of the original data.
The code snippet below shows its implementation:
from fracdiff.sklearn import Fracdiff
def scale_fracdiff(df, window=26, mode='valid', frac_weight=0.7):
data = df.copy()
if len(data) < window:
window = len(data)-1
scaler_frac = Fracdiff(d=frac_weight, window=len(data)-1, mode=mode)
scaled_data = pd.DataFrame(scaler_frac.fit_transform(data), index=data.index[window-1:], columns=list(data))
else:
scaler_frac = Fracdiff(d=frac_weight, window=window, mode=mode)
scaled_data = pd.DataFrame(scaler_frac.fit_transform(data), index=data.index[window-1:], columns=list(data))
return scaled_dataThe values of window and frac_weight are, again, arbitrary. Particularly for frac_weight, it specifies the degree of stationarity, i.e., the closer to 1, the more stationary it is. I like setting everything to 0.7, although you may find other value more useful.
Before wrapping up this sub-section, there is, however, two critical facts to mention:
Fractional differentiation scales the underlying data by a moving window (here represented by the
validmode). This makes sure there is no data leakage and that there is no need to separately scale data for training-validation-testing sets, as the results would not differ between splitting and not splitting the scaling process……however, you must introduce gaps between these splits at least as long as the
windowparameter in the code snippet above, so that the beginnings of the validation and testing splits do not incorporate “seen” data from the previous splits. This concept will be discussed in a separate substack post to come on the time-series cross-validation and hyperparameter optimisation.
That’s it! Let’s summarise everything we have gone through thus far.
Conclusion
In this substack post, we embarked on a journey to explore the essential steps of fetching data, pre-processing it, and generating features for a machine learning project involving predicting US equities. We witnessed how the integration of libraries like alpacatradeapi and yfinance empowered us to access a wealth of financial data effortlessly. By employing data pre-processing techniques we ensured that our dataset was clean, relevant, and later properly scaled thanks to fractional differentiation and the FracDiff library. Finally, through feature generation and the use of packages like ta, talib, and tsfresh, we transformed our data into informative representations that would fuel accurate predictive models.
Understanding the intricacies of data manipulation is crucial for any machine learning practitioner. Armed with the knowledge gained from this guide, you are now equipped to embark on your own adventures in the realm of financial machine learning, where the possibilities are boundless. Happy coding and may your data-driven endeavors lead to remarkable discoveries!
Call to Action
Feel free to leave a comment or ask a question if you have any doubts. Don’t forget to subscribe to stay on top of similar posts.

Really impressive articles! I joined too late.