fix: Improve indicator calculation with robust error handling and trend preservation

This commit is contained in:
Bobby (aider) 2025-02-13 23:12:50 -08:00
parent af51b62d49
commit 7fab19c9d6

View File

@ -18,25 +18,55 @@ class DynamicStrategy(Strategy):
# Initialize all selected indicators # Initialize all selected indicators
for ind_name, ind_config in self.indicator_configs.items(): for ind_name, ind_config in self.indicator_configs.items():
if ind_config['type'] == 'SMA': if ind_config['type'] == 'SMA':
self.indicators[ind_name] = self.I(lambda x: ta.sma(x, length=int(ind_config['params']['length'])).fillna(0).to_numpy(), self.data.Close) def sma_calc(x):
result = ta.sma(x, length=int(ind_config['params']['length']))
return result.fillna(method='ffill').fillna(0).values
self.indicators[ind_name] = self.I(sma_calc, self.data.Close)
elif ind_config['type'] == 'EMA': elif ind_config['type'] == 'EMA':
self.indicators[ind_name] = self.I(lambda x: ta.ema(x, length=int(ind_config['params']['length'])).fillna(0).to_numpy(), self.data.Close) def ema_calc(x):
result = ta.ema(x, length=int(ind_config['params']['length']))
return result.fillna(method='ffill').fillna(0).values
self.indicators[ind_name] = self.I(ema_calc, self.data.Close)
elif ind_config['type'] == 'RSI': elif ind_config['type'] == 'RSI':
self.indicators[ind_name] = self.I(lambda x: ta.rsi(x, length=int(ind_config['params']['length'])).fillna(0).to_numpy(), self.data.Close) def rsi_calc(x):
result = ta.rsi(x, length=int(ind_config['params']['length']))
return result.fillna(method='ffill').fillna(0).values
self.indicators[ind_name] = self.I(rsi_calc, self.data.Close)
elif ind_config['type'] == 'MACD': elif ind_config['type'] == 'MACD':
fast = int(ind_config['params']['fast']) fast = int(ind_config['params']['fast'])
slow = int(ind_config['params']['slow']) slow = int(ind_config['params']['slow'])
signal = int(ind_config['params']['signal']) signal = int(ind_config['params']['signal'])
macd = self.I(lambda x: ta.macd(x, fast=fast, slow=slow, signal=signal), self.data.Close)
self.indicators[f"{ind_name}_macd"] = self.I(lambda x: x.iloc[:,0].fillna(0).to_numpy(), macd) def macd_calc(x):
self.indicators[f"{ind_name}_signal"] = self.I(lambda x: x.iloc[:,2].fillna(0).to_numpy(), macd) result = ta.macd(x, fast=fast, slow=slow, signal=signal)
if result is None or result.empty:
return np.zeros(len(x)), np.zeros(len(x))
return (result.iloc[:,0].fillna(method='ffill').fillna(0).values,
result.iloc[:,2].fillna(method='ffill').fillna(0).values)
macd_vals = self.I(macd_calc, self.data.Close)
self.indicators[f"{ind_name}_macd"] = macd_vals[0]
self.indicators[f"{ind_name}_signal"] = macd_vals[1]
elif ind_config['type'] == 'BB': elif ind_config['type'] == 'BB':
length = int(ind_config['params']['length']) length = int(ind_config['params']['length'])
std = float(ind_config['params']['std']) std = float(ind_config['params']['std'])
bbands = self.I(lambda x: ta.bbands(x, length=length, std=std), self.data.Close)
self.indicators[f"{ind_name}_upper"] = self.I(lambda x: x.iloc[:,0].fillna(0).to_numpy(), bbands) def bb_calc(x):
self.indicators[f"{ind_name}_middle"] = self.I(lambda x: x.iloc[:,1].fillna(0).to_numpy(), bbands) result = ta.bbands(x, length=length, std=std)
self.indicators[f"{ind_name}_lower"] = self.I(lambda x: x.iloc[:,2].fillna(0).to_numpy(), bbands) if result is None or result.empty:
return np.zeros(len(x)), np.zeros(len(x)), np.zeros(len(x))
return (result.iloc[:,0].fillna(method='ffill').fillna(0).values,
result.iloc[:,1].fillna(method='ffill').fillna(0).values,
result.iloc[:,2].fillna(method='ffill').fillna(0).values)
bb_vals = self.I(bb_calc, self.data.Close)
self.indicators[f"{ind_name}_upper"] = bb_vals[0]
self.indicators[f"{ind_name}_middle"] = bb_vals[1]
self.indicators[f"{ind_name}_lower"] = bb_vals[2]
def next(self): def next(self):
# Simple example strategy - should be customized based on user input # Simple example strategy - should be customized based on user input