Files
StockBackTester/strategy.py

318 lines
13 KiB
Python

# strategy.py
# -----------------------------------------------------------------------------
# Finder - Strategy Logic Module
# -----------------------------------------------------------------------------
import config
import pandas as pd
import numpy as np
from scipy.signal import find_peaks # '직전 언덕' 탐지를 위해 임포트
from collections import deque
from typing import Optional, Tuple
def find_last_peak_price(df_window: pd.DataFrame) -> Optional[float]:
"""
매수일 직전 N일(config.STOPLOSS_PEAK_FIND_PERIOD)의 데이터에서
'직전 언덕(peak)'의 고가(High)를 찾아 손절 라인으로 반환합니다.
(scipy.signal.find_peaks 사용)
Args:
df_window: 'High' 컬럼을 포함하는 주가 데이터프레임
Returns:
마지막 피크의 고가, 피크가 없으면 기간 내 최고가, 데이터가 없으면 None
"""
if df_window.empty:
return None
# 'High' 가격을 기준으로 피크 탐지
peaks_indices, _ = find_peaks(df_window['High'], distance=config.PEAK_DETECTION_MIN_DISTANCE)
if len(peaks_indices) > 0:
# 피크가 하나 이상 발견된 경우, 가장 마지막(최근) 피크의 'High' 가격 반환
last_peak_index = peaks_indices[-1]
last_peak_price = df_window.iloc[last_peak_index]['High']
return last_peak_price
else:
# 피크를 찾지 못한 경우 (예: 지속적 하락 또는 횡보)
# 해당 기간의 최고가를 손절 라인으로 사용하는 등 대체 전략 필요
# 여기서는 기간 내 최고가를 대체 손절 라인으로 반환
return df_window['High'].max()
def compute_last_peak_series(df: pd.DataFrame, period: int = 30) -> pd.Series:
"""
종목 전체 기간에 대해, 각 날짜 기준으로 "직전 period 기간 내의 마지막 피크(High) 값"
빠르게 조회할 수 있는 Series를 생성합니다.
Args:
df: 'High' 컬럼을 포함하는 주가 데이터프레임
period: 피크 탐색 기간 (일 단위)
Returns:
각 날짜별 마지막 피크 가격을 담은 Series (df.index와 동일한 길이)
Note:
- 각 날짜 i에 대해, (i 이전의) 최근 period 범위 내에서 발견된 마지막 피크의 High를 반환.
- 피크가 없으면 해당 윈도우의 최고가(max)를 반환합니다.
- 첫 번째 날(이전 데이터 없음)은 None으로 남깁니다.
"""
if df.empty:
return pd.Series([], dtype=float)
highs = df['High'].values
n = len(highs)
peaks_indices, _ = find_peaks(highs, distance=config.PEAK_DETECTION_MIN_DISTANCE)
last_peak_price = [None] * n
dq = deque()
peak_ptr = 0
# Iterate days; for day i we consider peaks with index < i and >= i-period
for i in range(n):
# add peaks that are before day i
while peak_ptr < len(peaks_indices) and peaks_indices[peak_ptr] < i:
dq.append(peaks_indices[peak_ptr])
peak_ptr += 1
# drop peaks older than window start
while dq and dq[0] < i - period:
dq.popleft()
if dq:
last_idx = dq[-1]
last_peak_price[i] = highs[last_idx]
else:
# no peak in window: use max High in the window (previous days only)
start = max(0, i - period)
end = i # exclusive (we only look at days before i)
if end - start >= 1:
last_peak_price[i] = highs[start:end].max()
else:
last_peak_price[i] = None
return pd.Series(last_peak_price, index=df.index)
def compute_buy_signals_vectorized(df: pd.DataFrame) -> pd.DataFrame:
"""
주어진 데이터프레임에 대해 벡터화된 방식으로 매수 신호를 계산합니다.
config.py에 정의된 모든 매수 조건을 종합적으로 평가합니다.
- 이동평균 계산
- 역배열 조건 확인
- 골든 크로스 조건 확인
- 이격도 필터 적용
- 강한 상승 돌파 필터 적용
Args:
df: 'Open', 'High', 'Low', 'Close', 'Volume''LAST_PEAK' 컬럼을 포함하는 주가 데이터
Returns:
'BUY_SIGNAL'(bool), 'STOP_LOSS_PRICE'(float), 'SIGNAL_TYPE'(str) 컬럼을 포함하며,
입력 df와 동일한 인덱스를 갖는 데이터프레임
"""
if df.empty or len(df) < max(config.ALL_MA_LIST):
return pd.DataFrame(
columns=['BUY_SIGNAL', 'STOP_LOSS_PRICE', 'SIGNAL_TYPE'],
index=df.index
)
# --- 0. 이동평균선 계산 또는 확인 ---
# 이미 MA 컬럼이 있으면 재사용, 없으면 계산
missing_mas = [ma for ma in config.ALL_MA_LIST if f'MA_{ma}' not in df.columns]
if missing_mas:
# 누락된 이평선만 계산 (이때만 copy)
df_ma = df.copy()
for ma in missing_mas:
df_ma[f'MA_{ma}'] = df_ma['Close'].rolling(window=ma, min_periods=ma).mean()
# 이평선 계산 후 생긴 NA 값 제거
df_ma.dropna(inplace=True)
else:
# 이미 모든 MA가 있으면 복사 없이 그대로 사용
df_ma = df
if df_ma.empty:
# 빈 DataFrame도 원본 인덱스 유지
return pd.DataFrame(
columns=['BUY_SIGNAL', 'STOP_LOSS_PRICE', 'SIGNAL_TYPE'],
index=df.index
)
# --- 1. 역배열 조건 ---
# 모든 역배열 조건을 만족하는지 여부를 나타내는 boolean Series
reverse_array_signals = pd.Series(True, index=df_ma.index)
active_reverse_conditions = [
(int(k.split('_vs_')[0]), int(k.split('_vs_')[1]))
for k, v in config.REVERSE_ARRAY_CONDITION.items() if v
]
for ma1, ma2 in active_reverse_conditions:
reverse_array_signals &= (df_ma[f'MA_{ma1}'] < df_ma[f'MA_{ma2}'])
# --- 2. 골든 크로스 조건 ---
# 설정된 골든크로스 조건 중 하나라도 만족하는지 여부
golden_cross_signals = pd.Series(False, index=df_ma.index)
signal_type_map = {} # {date: signal_name}
active_gc_conditions = [
(int(k.split('_vs_')[0]), int(k.split('_vs_')[1]))
for k, v in config.GOLDEN_CROSS_CONDITION.items() if v
]
for ma_short, ma_long in active_gc_conditions:
# 어제는 단기 < 장기, 오늘은 단기 > 장기
was_below = df_ma[f'MA_{ma_short}'].shift(1) < df_ma[f'MA_{ma_long}'].shift(1)
is_above = df_ma[f'MA_{ma_short}'] > df_ma[f'MA_{ma_long}']
current_gc = was_below & is_above
# 현재 GC가 발생한 날짜에 신호 타입 기록
signal_name = f"GC_{ma_short}_{ma_long}"
for date in df_ma.index[current_gc]:
if date not in signal_type_map: # 첫 번째 발생한 GC 타입만 기록
signal_type_map[date] = signal_name
golden_cross_signals |= current_gc
# --- 3. 이격도 필터 ---
disparity_filter_signals = pd.Series(True, index=df_ma.index)
if config.USE_DISPARITY_FILTER:
ma_base = config.DISPARITY_MA_BASE
ma_target = config.DISPARITY_MA_TARGET
min_disparity = config.MIN_DISPARITY_PCT / 100.0
# target 이평선이 base 이평선보다 N% 이상 '아래'에 있어야 함
disparity_filter_signals = (df_ma[f'MA_{ma_target}'] < df_ma[f'MA_{ma_base}'] * (1 - min_disparity))
# --- 4. 강한 상승 돌파 필터 ---
breakthrough_signals = pd.Series(True, index=df_ma.index)
if config.USE_STRONG_BREAKTHROUGH_FILTER:
cfg = config.STRONG_BREAKTHROUGH_CONDITIONS
# 4-1. 양봉 몸통 % 조건
if cfg.get("check_candle_body", False):
candle_body_pct = (df_ma['Close'] - df_ma['Open']) / df_ma['Open'] * 100
body_condition = candle_body_pct >= cfg.get("min_candle_body_pct", 3.0)
breakthrough_signals &= body_condition
# 4-2. 거래량 조건
if cfg.get("check_volume", False):
avg_volume = df_ma['Volume'].rolling(window=cfg.get("volume_avg_period", 20), min_periods=1).mean()
# 어제까지의 평균 거래량과 비교
volume_condition = df_ma['Volume'] > (avg_volume.shift(1) * cfg.get("volume_multiplier", 1.5))
breakthrough_signals &= volume_condition
# --- 5. 최종 매수 신호 조합 ---
final_buy_signal = (reverse_array_signals &
golden_cross_signals &
disparity_filter_signals &
breakthrough_signals)
# --- 6. 결과 DataFrame 생성 ---
results = pd.DataFrame(index=df.index)
results['BUY_SIGNAL'] = final_buy_signal.reindex(df.index, fill_value=False)
# 신호가 True인 경우에만 손절가와 신호 타입 기록
results['STOP_LOSS_PRICE'] = np.nan
results['SIGNAL_TYPE'] = ''
buy_dates = results.index[results['BUY_SIGNAL']]
if not buy_dates.empty:
# 손절가 설정 (backtester에서 미리 계산된 'LAST_PEAK' 값 사용)
if 'LAST_PEAK' in df.columns:
# NaN이 없는 유효한 피크만 설정
valid_peaks = df.loc[buy_dates, 'LAST_PEAK'].dropna()
results.loc[valid_peaks.index, 'STOP_LOSS_PRICE'] = valid_peaks
# NaN이 있는 경우 해당 매수 신호 무효화 (손절가 없으면 매수 불가)
invalid_peaks = df.loc[buy_dates, 'LAST_PEAK'].isna()
if invalid_peaks.any():
results.loc[invalid_peaks[invalid_peaks].index, 'BUY_SIGNAL'] = False
# 신호 타입 설정 (유효한 매수 신호에만 적용)
final_buy_dates = results.index[results['BUY_SIGNAL']]
signal_type_series = pd.Series(signal_type_map)
results.loc[final_buy_dates, 'SIGNAL_TYPE'] = signal_type_series.reindex(final_buy_dates).values
return results[['BUY_SIGNAL', 'STOP_LOSS_PRICE', 'SIGNAL_TYPE']]
def check_sell_signal(
today_data: pd.Series,
yesterday_data: pd.Series,
position_info: dict
) -> Tuple[Optional[str], float]:
"""
보유 중인 포지션에 대해 새로운 매도 전략에 따라 매도 신호를 확인합니다.
매도 전략:
1. 매수가 대비 5% 이상 하락 시 전량 매도 (Stop-loss).
2. 수익률 10% 이하: 최고점 대비 5% 하락 시 전량 매도.
3. 수익률 10% 이상 도달 시: 50% 부분 익절 (1회).
4. 수익률 10% 초과 ~ 30% 이하: 최고점 대비 5% 하락 또는 수익률 10% 이하로 하락 시 전량 매도.
5. 수익률 30% 초과: 최고점 대비 15% 하락 또는 수익률 30% 이하로 하락 시 전량 매도.
6. 그 외의 경우는 보유.
Args:
today_data: 오늘 날짜의 데이터 (Open, High, Low, Close 등 포함)
yesterday_data: 어제 날짜의 데이터 (현재 로직에서는 사용되지 않음)
position_info: 포지션 정보 딕셔너리
- 'buy_price' (float): 매수 가격
- 'highest_price' (float): 매수 이후 최고가
- 'partial_sold' (bool): 부분 익절 여부
Returns:
(매도 신호 타입, 매도 비율) 튜플. 매도 신호 없으면 (None, 0.0)
"""
buy_price = position_info['buy_price']
highest_price = position_info['highest_price']
partial_sold = position_info['partial_sold']
low_price = today_data['Low']
high_price = today_data['High']
# 수익률 계산 (저가 기준)
profit_ratio_low = (low_price - buy_price) / buy_price
# 1. 매수가격 대비 5% 이상 하락 시 전량 매도 (절대 손절 라인)
if low_price <= buy_price * (1 - config.SELL_STOP_LOSS_PCT):
return 'STOP_LOSS_5PCT', 1.0
# 3. 수익률 15% 이상 도달 시 전량 매도 (27.65% 성공 설정)
# 장중 고가가 15% 수익을 달성했는지 체크
# SELL_PROFIT_TAKE_RATIO = 1.0이면 전량 매도 (partial_sold 무시)
if (high_price - buy_price) / buy_price >= config.SELL_PROFIT_TAKE_PCT:
if config.SELL_PROFIT_TAKE_RATIO >= 1.0:
return 'PROFIT_TAKE_FULL', 1.0 # 전량 매도
elif not partial_sold:
return 'PROFIT_TAKE_10PCT_HALF', config.SELL_PROFIT_TAKE_RATIO # 부분 익절
# --- 전량 매도 조건 (트레일링 스탑) ---
# 매수 후 최고가 기준 수익률
highest_profit_ratio = (highest_price - buy_price) / buy_price
# 5. 최고 수익률이 30%를 초과했던 경우
if highest_profit_ratio > config.SELL_PROFIT_THRESHOLD_HIGH:
# 최고점 대비 15% 하락 또는 현재 수익률이 30% 이하로 떨어지면 전량 매도
if (low_price <= highest_price * (1 - config.SELL_TRAILING_STOP_HIGH_PCT) or
profit_ratio_low <= config.SELL_PROFIT_THRESHOLD_HIGH):
return 'TRAILING_STOP_GT30PCT', 1.0
# 4. 최고 수익률이 10% 초과 30% 이하인 경우
elif highest_profit_ratio > config.SELL_PROFIT_THRESHOLD_MID:
# 최고점 대비 5% 하락 또는 현재 수익률이 10% 이하로 떨어지면 전량 매도
if (low_price <= highest_price * (1 - config.SELL_TRAILING_STOP_MID_PCT) or
profit_ratio_low <= config.SELL_PROFIT_THRESHOLD_MID):
return 'TRAILING_STOP_10_30PCT', 1.0
# 2. 최고 수익률이 10% 이하인 경우
else:
# 최고점 대비 5% 하락 시 전량 매도
if low_price <= highest_price * (1 - config.SELL_TRAILING_STOP_LOW_PCT):
return 'TRAILING_STOP_LT10PCT', 1.0
# 6. 매도 조건 없음, 보유
return None, 0.0