# strategy.py # ----------------------------------------------------------------------------- # Finder - Strategy Logic Module # ----------------------------------------------------------------------------- import config import pandas as pd import numpy as np from scipy.signal import find_peaks # '직전 언덕' 탐지를 위해 임포트 from collections import deque from typing import Optional, Tuple def find_last_peak_price(df_window: pd.DataFrame) -> Optional[float]: """ 매수일 직전 N일(config.STOPLOSS_PEAK_FIND_PERIOD)의 데이터에서 '직전 언덕(peak)'의 고가(High)를 찾아 손절 라인으로 반환합니다. (scipy.signal.find_peaks 사용) Args: df_window: 'High' 컬럼을 포함하는 주가 데이터프레임 Returns: 마지막 피크의 고가, 피크가 없으면 기간 내 최고가, 데이터가 없으면 None """ if df_window.empty: return None # 'High' 가격을 기준으로 피크 탐지 peaks_indices, _ = find_peaks(df_window['High'], distance=config.PEAK_DETECTION_MIN_DISTANCE) if len(peaks_indices) > 0: # 피크가 하나 이상 발견된 경우, 가장 마지막(최근) 피크의 'High' 가격 반환 last_peak_index = peaks_indices[-1] last_peak_price = df_window.iloc[last_peak_index]['High'] return last_peak_price else: # 피크를 찾지 못한 경우 (예: 지속적 하락 또는 횡보) # 해당 기간의 최고가를 손절 라인으로 사용하는 등 대체 전략 필요 # 여기서는 기간 내 최고가를 대체 손절 라인으로 반환 return df_window['High'].max() def compute_last_peak_series(df: pd.DataFrame, period: int = 30) -> pd.Series: """ 종목 전체 기간에 대해, 각 날짜 기준으로 "직전 period 기간 내의 마지막 피크(High) 값"을 빠르게 조회할 수 있는 Series를 생성합니다. Args: df: 'High' 컬럼을 포함하는 주가 데이터프레임 period: 피크 탐색 기간 (일 단위) Returns: 각 날짜별 마지막 피크 가격을 담은 Series (df.index와 동일한 길이) Note: - 각 날짜 i에 대해, (i 이전의) 최근 period 범위 내에서 발견된 마지막 피크의 High를 반환. - 피크가 없으면 해당 윈도우의 최고가(max)를 반환합니다. - 첫 번째 날(이전 데이터 없음)은 None으로 남깁니다. """ if df.empty: return pd.Series([], dtype=float) highs = df['High'].values n = len(highs) peaks_indices, _ = find_peaks(highs, distance=config.PEAK_DETECTION_MIN_DISTANCE) last_peak_price = [None] * n dq = deque() peak_ptr = 0 # Iterate days; for day i we consider peaks with index < i and >= i-period for i in range(n): # add peaks that are before day i while peak_ptr < len(peaks_indices) and peaks_indices[peak_ptr] < i: dq.append(peaks_indices[peak_ptr]) peak_ptr += 1 # drop peaks older than window start while dq and dq[0] < i - period: dq.popleft() if dq: last_idx = dq[-1] last_peak_price[i] = highs[last_idx] else: # no peak in window: use max High in the window (previous days only) start = max(0, i - period) end = i # exclusive (we only look at days before i) if end - start >= 1: last_peak_price[i] = highs[start:end].max() else: last_peak_price[i] = None return pd.Series(last_peak_price, index=df.index) def compute_buy_signals_vectorized(df: pd.DataFrame) -> pd.DataFrame: """ 주어진 데이터프레임에 대해 벡터화된 방식으로 매수 신호를 계산합니다. config.py에 정의된 모든 매수 조건을 종합적으로 평가합니다. - 이동평균 계산 - 역배열 조건 확인 - 골든 크로스 조건 확인 - 이격도 필터 적용 - 강한 상승 돌파 필터 적용 Args: df: 'Open', 'High', 'Low', 'Close', 'Volume' 및 'LAST_PEAK' 컬럼을 포함하는 주가 데이터 Returns: 'BUY_SIGNAL'(bool), 'STOP_LOSS_PRICE'(float), 'SIGNAL_TYPE'(str) 컬럼을 포함하며, 입력 df와 동일한 인덱스를 갖는 데이터프레임 """ if df.empty or len(df) < max(config.ALL_MA_LIST): return pd.DataFrame( columns=['BUY_SIGNAL', 'STOP_LOSS_PRICE', 'SIGNAL_TYPE'], index=df.index ) # --- 0. 이동평균선 계산 또는 확인 --- # 이미 MA 컬럼이 있으면 재사용, 없으면 계산 missing_mas = [ma for ma in config.ALL_MA_LIST if f'MA_{ma}' not in df.columns] if missing_mas: # 누락된 이평선만 계산 (이때만 copy) df_ma = df.copy() for ma in missing_mas: df_ma[f'MA_{ma}'] = df_ma['Close'].rolling(window=ma, min_periods=ma).mean() # 이평선 계산 후 생긴 NA 값 제거 df_ma.dropna(inplace=True) else: # 이미 모든 MA가 있으면 복사 없이 그대로 사용 df_ma = df if df_ma.empty: # 빈 DataFrame도 원본 인덱스 유지 return pd.DataFrame( columns=['BUY_SIGNAL', 'STOP_LOSS_PRICE', 'SIGNAL_TYPE'], index=df.index ) # --- 1. 역배열 조건 --- # 모든 역배열 조건을 만족하는지 여부를 나타내는 boolean Series reverse_array_signals = pd.Series(True, index=df_ma.index) active_reverse_conditions = [ (int(k.split('_vs_')[0]), int(k.split('_vs_')[1])) for k, v in config.REVERSE_ARRAY_CONDITION.items() if v ] for ma1, ma2 in active_reverse_conditions: reverse_array_signals &= (df_ma[f'MA_{ma1}'] < df_ma[f'MA_{ma2}']) # --- 2. 골든 크로스 조건 --- # 설정된 골든크로스 조건 중 하나라도 만족하는지 여부 golden_cross_signals = pd.Series(False, index=df_ma.index) signal_type_map = {} # {date: signal_name} active_gc_conditions = [ (int(k.split('_vs_')[0]), int(k.split('_vs_')[1])) for k, v in config.GOLDEN_CROSS_CONDITION.items() if v ] for ma_short, ma_long in active_gc_conditions: # 어제는 단기 < 장기, 오늘은 단기 > 장기 was_below = df_ma[f'MA_{ma_short}'].shift(1) < df_ma[f'MA_{ma_long}'].shift(1) is_above = df_ma[f'MA_{ma_short}'] > df_ma[f'MA_{ma_long}'] current_gc = was_below & is_above # 현재 GC가 발생한 날짜에 신호 타입 기록 signal_name = f"GC_{ma_short}_{ma_long}" for date in df_ma.index[current_gc]: if date not in signal_type_map: # 첫 번째 발생한 GC 타입만 기록 signal_type_map[date] = signal_name golden_cross_signals |= current_gc # --- 3. 이격도 필터 --- disparity_filter_signals = pd.Series(True, index=df_ma.index) if config.USE_DISPARITY_FILTER: ma_base = config.DISPARITY_MA_BASE ma_target = config.DISPARITY_MA_TARGET min_disparity = config.MIN_DISPARITY_PCT / 100.0 # target 이평선이 base 이평선보다 N% 이상 '아래'에 있어야 함 disparity_filter_signals = (df_ma[f'MA_{ma_target}'] < df_ma[f'MA_{ma_base}'] * (1 - min_disparity)) # --- 4. 강한 상승 돌파 필터 --- breakthrough_signals = pd.Series(True, index=df_ma.index) if config.USE_STRONG_BREAKTHROUGH_FILTER: cfg = config.STRONG_BREAKTHROUGH_CONDITIONS # 4-1. 양봉 몸통 % 조건 if cfg.get("check_candle_body", False): candle_body_pct = (df_ma['Close'] - df_ma['Open']) / df_ma['Open'] * 100 body_condition = candle_body_pct >= cfg.get("min_candle_body_pct", 3.0) breakthrough_signals &= body_condition # 4-2. 거래량 조건 if cfg.get("check_volume", False): avg_volume = df_ma['Volume'].rolling(window=cfg.get("volume_avg_period", 20), min_periods=1).mean() # 어제까지의 평균 거래량과 비교 volume_condition = df_ma['Volume'] > (avg_volume.shift(1) * cfg.get("volume_multiplier", 1.5)) breakthrough_signals &= volume_condition # --- 5. 최종 매수 신호 조합 --- final_buy_signal = (reverse_array_signals & golden_cross_signals & disparity_filter_signals & breakthrough_signals) # --- 6. 결과 DataFrame 생성 --- results = pd.DataFrame(index=df.index) results['BUY_SIGNAL'] = final_buy_signal.reindex(df.index, fill_value=False) # 신호가 True인 경우에만 손절가와 신호 타입 기록 results['STOP_LOSS_PRICE'] = np.nan results['SIGNAL_TYPE'] = '' buy_dates = results.index[results['BUY_SIGNAL']] if not buy_dates.empty: # 손절가 설정 (backtester에서 미리 계산된 'LAST_PEAK' 값 사용) if 'LAST_PEAK' in df.columns: # NaN이 없는 유효한 피크만 설정 valid_peaks = df.loc[buy_dates, 'LAST_PEAK'].dropna() results.loc[valid_peaks.index, 'STOP_LOSS_PRICE'] = valid_peaks # NaN이 있는 경우 해당 매수 신호 무효화 (손절가 없으면 매수 불가) invalid_peaks = df.loc[buy_dates, 'LAST_PEAK'].isna() if invalid_peaks.any(): results.loc[invalid_peaks[invalid_peaks].index, 'BUY_SIGNAL'] = False # 신호 타입 설정 (유효한 매수 신호에만 적용) final_buy_dates = results.index[results['BUY_SIGNAL']] signal_type_series = pd.Series(signal_type_map) results.loc[final_buy_dates, 'SIGNAL_TYPE'] = signal_type_series.reindex(final_buy_dates).values return results[['BUY_SIGNAL', 'STOP_LOSS_PRICE', 'SIGNAL_TYPE']] def check_sell_signal( today_data: pd.Series, yesterday_data: pd.Series, position_info: dict ) -> Tuple[Optional[str], float]: """ 보유 중인 포지션에 대해 새로운 매도 전략에 따라 매도 신호를 확인합니다. 매도 전략: 1. 매수가 대비 5% 이상 하락 시 전량 매도 (Stop-loss). 2. 수익률 10% 이하: 최고점 대비 5% 하락 시 전량 매도. 3. 수익률 10% 이상 도달 시: 50% 부분 익절 (1회). 4. 수익률 10% 초과 ~ 30% 이하: 최고점 대비 5% 하락 또는 수익률 10% 이하로 하락 시 전량 매도. 5. 수익률 30% 초과: 최고점 대비 15% 하락 또는 수익률 30% 이하로 하락 시 전량 매도. 6. 그 외의 경우는 보유. Args: today_data: 오늘 날짜의 데이터 (Open, High, Low, Close 등 포함) yesterday_data: 어제 날짜의 데이터 (현재 로직에서는 사용되지 않음) position_info: 포지션 정보 딕셔너리 - 'buy_price' (float): 매수 가격 - 'highest_price' (float): 매수 이후 최고가 - 'partial_sold' (bool): 부분 익절 여부 Returns: (매도 신호 타입, 매도 비율) 튜플. 매도 신호 없으면 (None, 0.0) """ buy_price = position_info['buy_price'] highest_price = position_info['highest_price'] partial_sold = position_info['partial_sold'] low_price = today_data['Low'] high_price = today_data['High'] # 수익률 계산 (저가 기준) profit_ratio_low = (low_price - buy_price) / buy_price # 1. 매수가격 대비 5% 이상 하락 시 전량 매도 (절대 손절 라인) if low_price <= buy_price * (1 - config.SELL_STOP_LOSS_PCT): return 'STOP_LOSS_5PCT', 1.0 # 3. 수익률 15% 이상 도달 시 전량 매도 (27.65% 성공 설정) # 장중 고가가 15% 수익을 달성했는지 체크 # SELL_PROFIT_TAKE_RATIO = 1.0이면 전량 매도 (partial_sold 무시) if (high_price - buy_price) / buy_price >= config.SELL_PROFIT_TAKE_PCT: if config.SELL_PROFIT_TAKE_RATIO >= 1.0: return 'PROFIT_TAKE_FULL', 1.0 # 전량 매도 elif not partial_sold: return 'PROFIT_TAKE_10PCT_HALF', config.SELL_PROFIT_TAKE_RATIO # 부분 익절 # --- 전량 매도 조건 (트레일링 스탑) --- # 매수 후 최고가 기준 수익률 highest_profit_ratio = (highest_price - buy_price) / buy_price # 5. 최고 수익률이 30%를 초과했던 경우 if highest_profit_ratio > config.SELL_PROFIT_THRESHOLD_HIGH: # 최고점 대비 15% 하락 또는 현재 수익률이 30% 이하로 떨어지면 전량 매도 if (low_price <= highest_price * (1 - config.SELL_TRAILING_STOP_HIGH_PCT) or profit_ratio_low <= config.SELL_PROFIT_THRESHOLD_HIGH): return 'TRAILING_STOP_GT30PCT', 1.0 # 4. 최고 수익률이 10% 초과 30% 이하인 경우 elif highest_profit_ratio > config.SELL_PROFIT_THRESHOLD_MID: # 최고점 대비 5% 하락 또는 현재 수익률이 10% 이하로 떨어지면 전량 매도 if (low_price <= highest_price * (1 - config.SELL_TRAILING_STOP_MID_PCT) or profit_ratio_low <= config.SELL_PROFIT_THRESHOLD_MID): return 'TRAILING_STOP_10_30PCT', 1.0 # 2. 최고 수익률이 10% 이하인 경우 else: # 최고점 대비 5% 하락 시 전량 매도 if low_price <= highest_price * (1 - config.SELL_TRAILING_STOP_LOW_PCT): return 'TRAILING_STOP_LT10PCT', 1.0 # 6. 매도 조건 없음, 보유 return None, 0.0