# data_manager.py # ----------------------------------------------------------------------------- # Finder - Data Management Module # ----------------------------------------------------------------------------- import pandas as pd import yfinance as yf from pykrx.stock import get_market_cap_by_ticker, get_market_ticker_name import requests from io import StringIO import time import os import config # 설정 파일 임포트 import pandas_ta as ta from datetime import datetime import pickle from typing import Dict, Optional # 데이터 캐시 디렉토리 생성 if not os.path.exists(config.DATA_CACHE_DIR): os.makedirs(config.DATA_CACHE_DIR) def normalize_timezone(df: pd.DataFrame) -> pd.DataFrame: """타임존을 제거하고 UTC 기준으로 정규화 Args: df: 인덱스가 DatetimeIndex인 데이터프레임 Returns: 타임존이 제거된 데이터프레임 """ if df.index.tz is not None: df.index = df.index.tz_localize(None) return df def get_kr_tickers(start_rank: int, end_rank: int) -> Dict[str, str]: """ [v1.0 업그레이드] KOSPI/KOSDAQ 시총 상위 종목을 {티커: 이름} 딕셔너리로 반환 """ print("한국 주식 시가총액 순위 로드 중...") today_str = datetime.now().strftime('%Y%m%d') cache_file = os.path.join(config.DATA_CACHE_DIR, f"kr_tickers_{start_rank}_{end_rank}_{today_str}.pkl") # 캐시가 있으면 바로 로드하여 네트워크 호출을 피함 if config.LOAD_FROM_LOCAL and os.path.exists(cache_file): try: with open(cache_file, 'rb') as f: tickers_dict = pickle.load(f) print(f"한국 티커 캐시 로드: {cache_file}") return tickers_dict except Exception as e: print(f"경고: 티커 캐시 로드 실패({cache_file}): {e}") df_kospi = get_market_cap_by_ticker(date=today_str, market="KOSPI") df_kosdaq = get_market_cap_by_ticker(date=today_str, market="KOSDAQ") df_all = pd.concat([df_kospi, df_kosdaq]).sort_values(by="시가총액", ascending=False) # 순위에 따라 슬라이싱 df_sliced = df_all.iloc[start_rank - 1 : end_rank] # [수정] 리스트 대신 딕셔너리 생성 tickers_dict = {} for ticker in df_sliced.index: try: # pykrx에서 티커 이름 조회 name = get_market_ticker_name(ticker) tickers_dict[ticker] = name except (KeyError, ValueError) as e: # 알려진 에러: 조회 실패 시 (예: ETF, ETN 등) tickers_dict[ticker] = f"Unknown_KR_{ticker}" except Exception as e: # 예상치 못한 에러는 로그 출력 후 재발생 print(f"예상치 못한 에러: {ticker} 이름 조회 - {type(e).__name__}: {e}") raise print(f"한국 주식 {len(tickers_dict)}개 종목 로드 완료.") # 캐시에 저장 try: with open(cache_file, 'wb') as f: pickle.dump(tickers_dict, f) except Exception as e: print(f"경고: 티커 캐시 저장 실패({cache_file}): {e}") return tickers_dict # <-- 딕셔너리 반환 def get_us_tickers(start_rank: int, end_rank: int) -> Dict[str, str]: """ [v1.0 업그레이드] S&P 500 종목을 {티커: 이름} 딕셔너리로 반환 (403 Forbidden 방지 및 StringIO 경고 수정 포함) Args: start_rank: 시작 순위 (1부터 시작) end_rank: 종료 순위 (포함) Returns: {티커: 종목명} 딕셔너리 """ print("미국 S&P 500 종목 리스트 로드 중 (Wikipedia)...") try: url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers) response.raise_for_status() html_content = response.text # [수정] FutureWarning 해결 tables = pd.read_html(StringIO(html_content)) # [v3.2 수정] 'Symbol'과 'Security' 컬럼이 있는 테이블을 찾아 안정성 확보 sp500_table = None for table in tables: if 'Symbol' in table.columns and 'Security' in table.columns: sp500_table = table break if sp500_table is None: raise ValueError("Could not find S&P 500 table with 'Symbol' and 'Security' columns in Wikipedia page.") # 순위에 따라 슬라이싱 df_selected = sp500_table.iloc[start_rank - 1 : end_rank] # [수정] yfinance 티커 형식(. -> -)으로 변환하고 딕셔너리 생성 df_selected['Symbol_yf'] = df_selected['Symbol'].str.replace('.', '-', regex=False) stock_dict = dict(zip(df_selected['Symbol_yf'], df_selected['Security'])) # {티커: 이름} print(f"미국 S&P 500 종목 {len(stock_dict)}개 로드 완료.") return stock_dict # <-- 딕셔너리 반환 except Exception as e: print(f"에러: 미국 S&P 500 리스트 스크래핑 실패 - {e}") return {} def get_stock_data(ticker: str, start_date: str, end_date: str) -> pd.DataFrame: """ [v2.4 업그레이드] 캐시 파일 범위 재사용 기능 추가 지정된 티커의 주가 데이터를 로드합니다. Pickle(.pkl)과 CSV(.csv)를 모두 지원합니다. Args: ticker: 종목 티커 (예: '005930', 'AAPL') start_date: 시작 날짜 (YYYY-MM-DD) end_date: 종료 날짜 (YYYY-MM-DD) Returns: 주가 데이터 데이터프레임 (인덱스: Date) LOAD_FROM_LOCAL = True일 때: - .pkl 파일이 있으면 우선적으로 로드합니다 (속도 향상). - 정확히 일치하지 않으면 요청 범위를 포함하는 더 넓은 .pkl 파일을 찾아 재활용합니다. - .pkl 파일이 없고 .csv 파일만 있으면, .csv를 로드 후 .pkl로 변환 저장합니다. LOAD_FROM_LOCAL = False일 때: - API에서 새로 다운로드하고, 보조지표를 계산한 후, - SAVE_TO_LOCAL=True이면 .pkl과 .csv 두 가지 포맷으로 모두 저장합니다. """ import glob # 요청된 범위 변환 start_dt = pd.to_datetime(start_date) end_dt = pd.to_datetime(end_date) # --- 1. 티커 및 캐시 파일 경로 설정 --- if len(ticker) == 6 and ticker.isdigit(): yf_ticker = f"{ticker}.KS" else: yf_ticker = ticker base_filename = f"{yf_ticker}_{start_date}_{end_date}" pkl_file = os.path.join(config.DATA_CACHE_DIR, f"{base_filename}.pkl") csv_file = os.path.join(config.DATA_CACHE_DIR, f"{base_filename}.csv") # --- 2. 로컬 데이터 로드 모드 (LOAD_FROM_LOCAL = True) --- if config.LOAD_FROM_LOCAL: # 2-1. 정확히 일치하는 .pkl 파일 먼저 체크 if os.path.exists(pkl_file): try: df = pd.read_pickle(pkl_file) df = normalize_timezone(df) if not df.empty: # print(f"'{yf_ticker}' Pickle 로드 성공.") # 디버깅용 return df except (pickle.UnpicklingError, EOFError) as e: print(f"경고: Pickle 파일 {pkl_file}이 손상되었습니다: {e}") except Exception as e: print(f"경고: Pickle 로드 예상치 못한 오류 ({pkl_file}): {type(e).__name__}: {e}") # 2-2. 요청 범위를 포함하는 더 넓은 .pkl 파일 찾아 재활용 try: # 같은 티커로 시작하는 모든 pkl 파일 탐색 pattern = os.path.join(config.DATA_CACHE_DIR, f"{yf_ticker}_*.pkl") pkl_files = glob.glob(pattern) # 파일명에서 시작/종료 날짜 추출하여 요청된 범위를 포함하는 파일 찾기 valid_files = [] for f in pkl_files: try: # 파일명 예: "000080.KS_2018-01-01_2023-12-31.pkl" basename = os.path.basename(f) parts = basename.replace('.pkl', '').split('_') if len(parts) >= 3: file_start = pd.to_datetime(parts[-2]) file_end = pd.to_datetime(parts[-1]) # 요청 범위를 포함하는 파일인지 확인 if file_start <= start_dt and file_end >= end_dt: valid_files.append((f, file_start, file_end)) except (ValueError, IndexError) as e: # 파일명 형식이 맞지 않는 경우 무시 continue # 유효한 파일이 있으면 가장 범위가 넓은 것 선택 if valid_files: valid_files.sort(key=lambda x: (x[2] - x[1]), reverse=True) # 범위가 넓은 순 best_file = valid_files[0][0] try: df = pd.read_pickle(best_file) df = normalize_timezone(df) if not df.empty: # 요청된 범위로 slice df_sliced = df.loc[(df.index >= start_dt) & (df.index <= end_dt)] if not df_sliced.empty: # print(f"'{yf_ticker}' 캐시 {os.path.basename(best_file)}에서 재사용 로드 완료.") return df_sliced except Exception as e: print(f"경고: 캐시 파일 파싱 실패 ({f}): {e}") except Exception as e: print(f"경고: 캐시 재활용 실패 ({best_file}): {e}") except Exception as e: print(f"경고: glob 처리 실패: {e}") # .pkl 파일이 없거나 손상된 경우, .csv 파일 체크 if os.path.exists(csv_file): try: df = pd.read_csv(csv_file, index_col='Date', parse_dates=True) df = normalize_timezone(df) if not df.empty: # .csv로 로드 성공 시, 다음을 위해 .pkl 파일 생성 print(f"'{yf_ticker}' CSV 로드 성공. Pickle 파일로 변환 저장합니다...") df.to_pickle(pkl_file) return df except (pd.errors.ParserError, KeyError) as e: print(f"경고: CSV 파일 {csv_file} 파싱 실패: {e}") except Exception as e: print(f"경고: CSV 로드 예상치 못한 오류 ({csv_file}): {type(e).__name__}: {e}") return pd.DataFrame() # --- 3. 신규 다운로드 모드 (LOAD_FROM_LOCAL = False) --- print(f"{yf_ticker}: yfinance API로부터 데이터 다운로드 중...") try: time.sleep(config.API_REQUEST_DELAY) data = yf.Ticker(yf_ticker) df = data.history(start=start_date, end=end_date, interval="1d") if df.empty and yf_ticker.endswith(".KS"): yf_ticker_kq = f"{ticker}.KQ" print(f".KS 실패, {yf_ticker_kq} 재시도...") time.sleep(config.API_REQUEST_DELAY) data_kq = yf.Ticker(yf_ticker_kq) df = data_kq.history(start=start_date, end=end_date, interval="1d") if df.empty: print(f"경고: {ticker}({yf_ticker}) 데이터 로드 실패. 빈 파일 저장.") if config.SAVE_TO_LOCAL: # 빈 파일도 pkl과 csv 둘 다 생성하여 불필요한 재시도 방지 pd.DataFrame().to_pickle(pkl_file) pd.DataFrame().to_csv(csv_file) return pd.DataFrame() df = normalize_timezone(df) # --- 다운로드 직후 보조지표 계산 --- print(f"{yf_ticker}: 보조지표 계산 중...") df = calculate_indicators(df) # 4. 계산된 지표 포함하여 두 포맷으로 모두 저장 if config.SAVE_TO_LOCAL: print(f"{yf_ticker}: 데이터 및 지표를 pkl, csv 캐시 파일로 저장 중...") df.to_pickle(pkl_file) df.to_csv(csv_file) return df except (ConnectionError, TimeoutError) as e: print(f"에러: {yf_ticker} 네트워크 오류 - {e}") return pd.DataFrame() except Exception as e: print(f"에러: {yf_ticker} 예상치 못한 오류 - {type(e).__name__}: {e}") return pd.DataFrame() def calculate_indicators(df: pd.DataFrame) -> pd.DataFrame: """ [v1.0 업그레이드 / FIX] pandas_ta를 사용하여 필요한 모든 보조지표를 계산합니다. (KeyError 방지를 위해 '명시적 할당' 방식으로 수정) Args: df: OHLCV 데이터를 포함하는 데이터프레임 Returns: 보조지표가 추가된 데이터프레임 """ if df.empty: return df try: # 1. 이동평균선 계산 (v2.0에서 사용하는 모든 MA) # [FIX] 'df.ta.sma(append=True)' 대신 명시적 할당 'df['col'] = ta.sma(...)' 사용 for ma in config.ALL_MA_LIST: df[f'MA_{ma}'] = ta.sma(df['Close'], length=ma) # 2. 거래량 평균 계산 vol_period = config.STRONG_BREAKTHROUGH_CONDITIONS.get('volume_avg_period', 20) vol_ma_name = f'Volume_MA_{vol_period}' # [FIX] 'df.ta.sma'가 아닌 'ta.sma'를 직접 호출하여 할당 df[vol_ma_name] = ta.sma(df['Volume'], length=vol_period) # 3. (선택) v1.0의 다른 지표들도 미리 계산 (향후 전략 확장을 위함) # [FIX] 'df.ta.rsi(append=True)' 대신 명시적 할당 df['RSI_14'] = ta.rsi(df['Close'], length=14) # bbands와 macd는 여러 컬럼을 반환하므로 df.ta.xxx(append=True) 방식 유지 df.ta.bbands(length=20, std=2, append=True) df.ta.macd(fast=12, slow=26, signal=9, append=True) except (AttributeError, KeyError) as e: print(f" [경고] pandas_ta 보조 지표 계산 중 오류 (컬럼 누락): {e}") except Exception as e: print(f" [경고] pandas_ta 예상치 못한 오류: {type(e).__name__}: {e}") return df def get_financial_data(ticker: str) -> Optional[pd.Series]: """ yfinance를 사용해 펀더멘털 데이터 (매출액) 로드 (백테스트 결과 출력용) Args: ticker: 종목 티커 Returns: 최근 8분기 매출액 Series, 로드 실패 시 None """ try: yf_ticker = yf.Ticker(ticker) # yfinance는 티커 자동 보정 시도 quarterly_financials = yf_ticker.quarterly_financials if 'Total Revenue' in quarterly_financials.index: revenue = quarterly_financials.loc['Total Revenue'] # 최근 8분기(2년) 데이터 반환 return revenue.head(8) else: return None except (KeyError, AttributeError) as e: print(f"경고: {ticker} 재무 데이터 없음 - {e}") return None except Exception as e: print(f"경고: {ticker} 재무 데이터 예상치 못한 오류 - {type(e).__name__}: {e}") return None