import os import signal import threading import time from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed from typing import Any from .common import logger from .config import RuntimeConfig from .constants import THREADPOOL_MAX_WORKERS_CAP from .notifications import send_telegram_with_retry from .signals import process_symbol # ============================================================================ # MEDIUM-004: Graceful Shutdown 지원 # ============================================================================ _shutdown_requested = False _shutdown_lock = threading.Lock() def _signal_handler(signum, frame): """ SIGTERM/SIGINT 신호 수신 시 graceful shutdown 시작 Args: signum: 신호 번호 (SIGTERM=15, SIGINT=2) frame: 현재 스택 프레임 """ global _shutdown_requested with _shutdown_lock: if not _shutdown_requested: _shutdown_requested = True logger.warning( "[Graceful Shutdown] 종료 신호 수신 (signal=%d). 진행 중인 작업 완료 후 종료합니다...", signum ) def request_shutdown(): """프로그래밍 방식으로 shutdown 요청 (테스트용)""" global _shutdown_requested with _shutdown_lock: _shutdown_requested = True logger.info("[Graceful Shutdown] 프로그래밍 방식 종료 요청") def is_shutdown_requested() -> bool: """Shutdown 요청 상태 확인""" with _shutdown_lock: return _shutdown_requested # Signal handler 등록 (프로그램 시작 시 자동 등록) try: signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) logger.debug("[Graceful Shutdown] Signal handler 등록 완료 (SIGTERM, SIGINT)") except (ValueError, OSError) as e: # Windows에서 SIGTERM이 없거나, 메인 스레드가 아닌 경우 무시 logger.debug("[Graceful Shutdown] Signal handler 등록 실패 (무시): %s", e) def _get_optimal_thread_count(max_threads: int | None) -> int: """CPU 코어 수 기반으로 최적 스레드 수 계산. I/O bound 작업이므로 CPU 코어 수 * 2를 기본값으로 사용합니다. 사용자가 명시적으로 값을 설정한 경우 해당 값을 사용합니다. Args: max_threads: 사용자 지정 스레드 수 (None이면 자동 계산) Returns: 최적 스레드 수 (최대 8개로 제한) """ cap = int(os.getenv("THREADPOOL_MAX_WORKERS_CAP", THREADPOOL_MAX_WORKERS_CAP)) if max_threads is not None and max_threads > 0: return min(max_threads, cap) # I/O bound 작업이므로 CPU 코어 수 * 2 cpu_count = os.cpu_count() or 4 optimal = cpu_count * 2 # 최대 8개로 제한 (너무 많은 스레드는 오히려 성능 저하) return min(optimal, cap) def _process_result_and_notify( symbol: str, res: dict[str, Any], cfg: RuntimeConfig, alerts: list[dict[str, str]] ) -> bool: """ Process the result of process_symbol and send notifications if needed. Returns True if a buy signal was triggered (telegram message sent), False otherwise. """ if not res: logger.warning("심볼 결과 없음: %s", symbol) return False for line in res.get("summary", []): logger.info(line) buy_signal_triggered = False if res.get("telegram"): buy_signal_triggered = True if cfg.dry_run: logger.info("[dry-run] 알림 내용:\n%s", res["telegram"]) if cfg.telegram_bot_token and cfg.telegram_chat_id: # ✅ 재시도 로직 포함 if not send_telegram_with_retry( cfg.telegram_bot_token, cfg.telegram_chat_id, res["telegram"], add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ): logger.error("심볼 %s 알림 전송 최종 실패", symbol) else: logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가") alerts.append({"symbol": symbol, "text": res["telegram"]}) return buy_signal_triggered def _send_aggregated_summary(alerts: list[dict[str, str]], cfg: RuntimeConfig): """Send aggregated summary if multiple alerts occurred.""" if len(alerts) > 1: summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"] summary_lines += [f"- {a['symbol']}" for a in alerts] summary_text = "\n".join(summary_lines) if cfg.dry_run: logger.info("[dry-run] 알림 요약:\n%s", summary_text) else: if cfg.telegram_bot_token and cfg.telegram_chat_id: # ✅ 재시도 로직 포함 if not send_telegram_with_retry( cfg.telegram_bot_token, cfg.telegram_chat_id, summary_text, add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ): logger.error("알림 요약 전송 최종 실패") else: logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가") def _notify_no_signals(alerts: list[dict[str, str]], cfg: RuntimeConfig): """Notify if no buy signals were triggered.""" if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts): # ✅ 재시도 로직 포함 if not send_telegram_with_retry( cfg.telegram_bot_token, cfg.telegram_chat_id, "[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ): logger.error("정상 작동 알림 전송 최종 실패") def run_sequential(symbols: list[str], cfg: RuntimeConfig, aggregate_enabled: bool = False): logger.info("순차 처리 시작 (심볼 수=%d)", len(symbols)) alerts = [] buy_signal_count = 0 for i, sym in enumerate(symbols): try: res = process_symbol(sym, cfg=cfg) if _process_result_and_notify(sym, res, cfg, alerts): buy_signal_count += 1 except Exception as e: logger.exception("심볼 처리 오류: %s -> %s", sym, e) if i < len(symbols) - 1 and cfg.symbol_delay is not None: logger.debug("다음 심볼까지 %.2f초 대기", cfg.symbol_delay) time.sleep(cfg.symbol_delay) if aggregate_enabled: _send_aggregated_summary(alerts, cfg) _notify_no_signals(alerts, cfg) return buy_signal_count def run_with_threads(symbols: list[str], cfg: RuntimeConfig, aggregate_enabled: bool = False): """ 병렬 처리로 여러 심볼 분석 (MEDIUM-004: Graceful Shutdown 지원) Args: symbols: 처리할 심볼 리스트 cfg: RuntimeConfig 객체 aggregate_enabled: 집계 알림 활성화 여부 Returns: 매수 신호 발생 횟수 """ global _shutdown_requested max_workers = _get_optimal_thread_count(cfg.max_threads) cpu_cores = os.cpu_count() or 4 logger.info( "병렬 처리 시작 (심볼 수=%d, 스레드 수=%d [CPU 코어: %d], 심볼 간 지연=%.2f초)", len(symbols), max_workers, cpu_cores, cfg.symbol_delay or 0.0, ) alerts = [] buy_signal_count = 0 # Throttle control last_request_time = [0.0] throttle_lock = threading.Lock() def worker(symbol: str): """워커 함수 (조기 종료 지원)""" # 종료 요청 확인 if is_shutdown_requested(): logger.info("[%s] 종료 요청으로 스킵", symbol) return symbol, None try: with throttle_lock: elapsed = time.time() - last_request_time[0] if cfg.symbol_delay is not None and elapsed < cfg.symbol_delay: sleep_time = cfg.symbol_delay - elapsed logger.debug("[%s] 스로틀 대기: %.2f초", symbol, sleep_time) time.sleep(sleep_time) last_request_time[0] = time.time() return symbol, process_symbol(symbol, cfg=cfg) except Exception as e: logger.exception("[%s] 워커 스레드 오류: %s", symbol, e) return symbol, None with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_symbol = {} # 심볼 제출 (조기 종료 지원) for sym in symbols: if is_shutdown_requested(): logger.warning( "[Graceful Shutdown] 종료 요청으로 나머지 심볼 제출 중단 (%d/%d 제출 완료)", len(future_to_symbol), len(symbols), ) break future = executor.submit(worker, sym) future_to_symbol[future] = sym # 결과 수집 (타임아웃 적용) results = {} timeout_seconds = 90 # 전체 작업 타임아웃 90초 individual_timeout = 15 # 개별 결과 조회 타임아웃 15초 try: for future in as_completed(future_to_symbol, timeout=timeout_seconds): # 종료 요청 시 즉시 중단 if is_shutdown_requested(): logger.warning( "[Graceful Shutdown] 종료 요청으로 결과 수집 중단 (%d/%d 수집 완료)", len(results), len(future_to_symbol), ) break sym = future_to_symbol[future] try: symbol, res = future.result(timeout=individual_timeout) results[symbol] = res except TimeoutError: logger.warning("[%s] 결과 조회 타임아웃 (%d초 초과), 건너뜀", sym, individual_timeout) except Exception as e: logger.exception("[%s] Future 결과 조회 오류: %s", sym, e) except TimeoutError: logger.error( "[경고] 전체 작업 타임아웃 (%d초 초과). 진행 중인 작업 강제 종료 중... (%d/%d 완료)", timeout_seconds, len(results), len(future_to_symbol), ) # Graceful shutdown 완료 체크 if is_shutdown_requested(): logger.warning( "[Graceful Shutdown] 병렬 처리 조기 종료 완료 (처리 심볼: %d/%d, 매수 신호: %d)", len(results), len(symbols), buy_signal_count, ) return buy_signal_count # 결과 처리 (원래 순서대로) for sym in symbols: res = results.get(sym) if res: if _process_result_and_notify(sym, res, cfg, alerts): buy_signal_count += 1 if aggregate_enabled: _send_aggregated_summary(alerts, cfg) _notify_no_signals(alerts, cfg) logger.info("병렬 처리 완료 (처리 심볼: %d, 매수 신호: %d)", len(results), buy_signal_count) return buy_signal_count