import os import time import threading import argparse import signal import sys from dotenv import load_dotenv import logging # Modular imports from src.common import logger, setup_logger, HOLDINGS_FILE from src.config import load_config, read_symbols, get_symbols_file, build_runtime_config from src.threading_utils import run_sequential, run_with_threads from src.notifications import send_telegram, report_error, send_startup_test_message from src.holdings import load_holdings, holdings_lock from src.signals import check_stop_loss_conditions, check_profit_taking_conditions # NOTE: Keep pandas_ta exposure for test monkeypatch compatibility import pandas_ta as ta load_dotenv() # [중요] pyupbit/requests 교착 상태 방지용 초기화 코드 # dry_run=False로 설정 시 프로그램이 멈추는 현상을 해결합니다. try: import requests requests.get("https://api.upbit.com/v1/market/all", timeout=1) except Exception: pass def minutes_to_timeframe(minutes: int) -> str: """분 단위를 캔들봉 timeframe 문자열로 변환 (예: 60 -> '1h', 240 -> '4h')""" if minutes < 60: return f"{minutes}m" elif minutes % 1440 == 0: return f"{minutes // 1440}d" elif minutes % 60 == 0: return f"{minutes // 60}h" else: return f"{minutes}m" def _check_buy_signals(cfg, symbols_to_check, config): buy_signal_count = 0 buy_interval_minutes = config.get("buy_check_interval_minutes", 240) buy_timeframe = minutes_to_timeframe(buy_interval_minutes) logger.info("[SYSTEM] 매수 조건 확인 시작 (주기: %d분, 데이터: %s)", buy_interval_minutes, buy_timeframe) if symbols_to_check: from src.config import RuntimeConfig cfg_with_buy_timeframe = RuntimeConfig( timeframe=buy_timeframe, indicator_timeframe=buy_timeframe, candle_count=cfg.candle_count, symbol_delay=cfg.symbol_delay, interval=cfg.interval, loop=cfg.loop, dry_run=cfg.dry_run, max_threads=cfg.max_threads, telegram_parse_mode=cfg.telegram_parse_mode, trading_mode=cfg.trading_mode, telegram_bot_token=cfg.telegram_bot_token, telegram_chat_id=cfg.telegram_chat_id, upbit_access_key=cfg.upbit_access_key, upbit_secret_key=cfg.upbit_secret_key, aggregate_alerts=cfg.aggregate_alerts, benchmark=cfg.benchmark, telegram_test=getattr(cfg, "telegram_test", False), config=cfg.config, ) if cfg.max_threads > 1: buy_signal_count = run_with_threads( symbols_to_check, cfg=cfg_with_buy_timeframe, aggregate_enabled=cfg.aggregate_alerts ) else: buy_signal_count = run_sequential( symbols_to_check, cfg=cfg_with_buy_timeframe, aggregate_enabled=cfg.aggregate_alerts ) return buy_signal_count def _check_sell_signals(cfg, config, holdings, current_time, last_stop_loss_check_time, last_profit_taking_check_time): stop_loss_signal_count = 0 profit_taking_signal_count = 0 # 손절 분석 stop_loss_interval_min = config.get("stop_loss_check_interval_minutes", 60) stop_loss_interval = stop_loss_interval_min * 60 if current_time - last_stop_loss_check_time >= stop_loss_interval: logger.info("[SYSTEM] 손절 조건 확인 시작 (주기: %d분)", stop_loss_interval_min) if holdings: _, stop_loss_signal_count = check_stop_loss_conditions(holdings, cfg=cfg, config=config) logger.info("보유 코인 손절 조건 확인 완료: %d개 신호", stop_loss_signal_count) else: logger.debug("보유 코인 없음 (손절 검사 건너뜀)") last_stop_loss_check_time = current_time else: logger.debug( "[DEBUG] 손절 조건 확인 대기 중 (다음 확인까지 %.1f분 남음)", (stop_loss_interval - (current_time - last_stop_loss_check_time)) / 60, ) # 익절 분석 profit_taking_interval_min = config.get("profit_taking_check_interval_minutes", 240) profit_taking_interval = profit_taking_interval_min * 60 if current_time - last_profit_taking_check_time >= profit_taking_interval: logger.info("[SYSTEM] 익절 조건 확인 시작 (주기: %d분)", profit_taking_interval_min) if holdings: _, profit_taking_signal_count = check_profit_taking_conditions(holdings, cfg=cfg, config=config) logger.info("보유 코인 익절 조건 확인 완료: %d개 신호", profit_taking_signal_count) else: logger.debug("보유 코인 없음 (익절 검사 건너뜀)") last_profit_taking_check_time = current_time else: logger.debug( "[DEBUG] 익절 조건 확인 대기 중 (다음 확인까지 %.1f분 남음)", (profit_taking_interval - (current_time - last_profit_taking_check_time)) / 60, ) return stop_loss_signal_count, profit_taking_signal_count, last_stop_loss_check_time, last_profit_taking_check_time def process_symbols_and_holdings( cfg, symbols: list, config: dict, last_buy_check_time: float, last_stop_loss_check_time: float, last_profit_taking_check_time: float, last_balance_warning_time: float = 0, ) -> tuple: """Process all symbols once and check sell conditions for holdings.""" with holdings_lock: holdings = load_holdings(HOLDINGS_FILE) held_symbols = set(holdings.keys()) if holdings else set() buy_candidate_symbols = [s for s in symbols if s not in held_symbols] current_time = time.time() buy_signal_count = 0 # 매수 분석 buy_interval_minutes = config.get("buy_check_interval_minutes", 240) buy_interval = buy_interval_minutes * 60 if current_time - last_buy_check_time >= buy_interval: buy_signal_count = _check_buy_signals(cfg, buy_candidate_symbols, config) last_buy_check_time = current_time else: logger.debug( "[DEBUG] 매수 조건 확인 대기 중 (다음 확인까지 %.1f분 남음)", (buy_interval - (current_time - last_buy_check_time)) / 60, ) # Upbit 최신 보유 정보 동기화 if cfg.upbit_access_key and cfg.upbit_secret_key: from src.holdings import save_holdings, fetch_holdings_from_upbit updated_holdings = fetch_holdings_from_upbit(cfg) if updated_holdings is not None: holdings = updated_holdings save_holdings(holdings, HOLDINGS_FILE) else: logger.error("Upbit에서 보유 정보를 가져오지 못했습니다. 이번 주기에서는 매도 분석을 건너뜁니다.") # 매도 분석 stop_loss_signal_count, profit_taking_signal_count, last_stop_loss_check_time, last_profit_taking_check_time = ( _check_sell_signals( cfg, config, holdings, current_time, last_stop_loss_check_time, last_profit_taking_check_time ) ) logger.info( "[INFO] [요약] 매수 신호: %d개, 손절 신호: %d개, 익절 신호: %d개", buy_signal_count, stop_loss_signal_count, profit_taking_signal_count, ) return last_buy_check_time, last_stop_loss_check_time, last_profit_taking_check_time, last_balance_warning_time def execute_benchmark(cfg, symbols): """Execute benchmark to compare single-thread and multi-thread performance.""" logger.info("[SYSTEM] 간단 벤치마크 시작: 심볼=%d", len(symbols)) # Run with single-thread start = time.time() run_sequential(symbols, cfg=cfg, aggregate_enabled=False) elapsed_single = time.time() - start logger.info("[INFO] 순차 처리 소요 시간: %.2f초", elapsed_single) # Run with configured threads (but in dry-run; user should enable dry_run for safe benchmark) start = time.time() run_with_threads(symbols, cfg=cfg, aggregate_enabled=False) elapsed_parallel = time.time() - start logger.info("[INFO] 병렬 처리(%d 스레드) 소요 시간: %.2f초", cfg.max_threads, elapsed_parallel) # Simple recommendation if elapsed_parallel < elapsed_single: reduction = (elapsed_single - elapsed_parallel) / elapsed_single * 100 logger.info("[INFO] 병렬로 %.1f%% 빨라졌습니다 (권장 스레드=%d).", reduction, cfg.max_threads) else: logger.info("[INFO] 병렬이 순차보다 빠르지 않습니다. 네트워크/IO 패턴을 점검하세요.") # Global flag for graceful shutdown _shutdown_requested = False def _signal_handler(signum, frame): """Handle SIGTERM and SIGINT for graceful shutdown.""" global _shutdown_requested sig_name = signal.Signals(signum).name if hasattr(signal, "Signals") else str(signum) logger.info("[SYSTEM] 종료 시그널 수신: %s. 안전 종료를 시작합니다...", sig_name) _shutdown_requested = True def main(): # Parse command-line arguments parser = argparse.ArgumentParser(description="MACD 알림 봇") parser.add_argument("--benchmark", action="store_true", help="벤치마크 실행") args = parser.parse_args() # Load config config = load_config() if not config: logger.error("[ERROR] 설정 로드 실패; 종료합니다") return # Build runtime config and derive core settings cfg = build_runtime_config(config) # dry_run 값에 따라 logger 핸들러 재설정 setup_logger(cfg.dry_run) logger.info("[SYSTEM] " + "=" * 70) logger.info("[SYSTEM] MACD 알림 봇 시작") logger.info("[SYSTEM] " + "=" * 70) # Load symbols symbols = read_symbols(get_symbols_file(config)) if not symbols: logger.error("[ERROR] 심볼 로드 실패; 종료합니다") return # Replace runtime_settings references with cfg attributes if cfg.telegram_test: send_startup_test_message(cfg.telegram_bot_token, cfg.telegram_chat_id, cfg.telegram_parse_mode, cfg.dry_run) if not cfg.dry_run and (not cfg.telegram_bot_token or not cfg.telegram_chat_id): logger.error("[ERROR] dry-run이 아닐 때 텔레그램 환경변수 필수: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID") return # 텔레그램 토큰 형식 검증 if cfg.telegram_bot_token: from src.config import validate_telegram_token if not validate_telegram_token(cfg.telegram_bot_token): logger.warning("[WARNING] 텔레그램 봇 토큰 형식이 올바르지 않을 수 있습니다") # Register signal handlers for graceful shutdown signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) buy_check_minutes = config.get("buy_check_interval_minutes", 240) stop_loss_check_minutes = config.get("stop_loss_check_interval_minutes", 60) profit_taking_check_minutes = config.get("profit_taking_check_interval_minutes", 240) logger.info( "[SYSTEM] 설정: symbols=%d, symbol_delay=%.2f초, candle_count=%d, loop=%s, dry_run=%s, max_threads=%d, trading_mode=%s", len(symbols), cfg.symbol_delay, cfg.candle_count, cfg.loop, cfg.dry_run, cfg.max_threads, cfg.trading_mode, ) logger.info( "[SYSTEM] 확인 주기: 매수=%d분, 손절=%d분, 익절=%d분", buy_check_minutes, stop_loss_check_minutes, profit_taking_check_minutes, ) # Check if benchmark flag is set if args.benchmark: execute_benchmark(cfg, symbols) return # Main execution loop last_buy_check_time = 0 last_stop_loss_check_time = 0 last_profit_taking_check_time = 0 last_balance_warning_time = 0 if not cfg.loop: process_symbols_and_holdings( cfg, symbols, config, last_buy_check_time, last_stop_loss_check_time, last_profit_taking_check_time, last_balance_warning_time, ) else: # 프로그램 루프 주기는 모든 확인 주기 중 가장 작은 값으로 자동 설정 loop_interval_minutes = min(buy_check_minutes, stop_loss_check_minutes, profit_taking_check_minutes) interval_seconds = max(10, loop_interval_minutes * 60) logger.info( "[SYSTEM] 루프 모드 시작: %d분 간격 (매수: %d분, 손절: %d분, 익절: %d분 마다)", loop_interval_minutes, buy_check_minutes, stop_loss_check_minutes, profit_taking_check_minutes, ) try: while not _shutdown_requested: start_time = time.time() try: ( last_buy_check_time, last_stop_loss_check_time, last_profit_taking_check_time, last_balance_warning_time, ) = process_symbols_and_holdings( cfg, symbols, config, last_buy_check_time, last_stop_loss_check_time, last_profit_taking_check_time, last_balance_warning_time, ) except Exception as e: logger.exception("[ERROR] 루프 내 작업 중 오류: %s", e) report_error( cfg.telegram_bot_token, cfg.telegram_chat_id, f"[오류] 루프 내 작업 실패: {e}", cfg.dry_run ) if _shutdown_requested: logger.info("[SYSTEM] 종료 요청으로 루프를 종료합니다") break # ✅ 작업 시간을 차감한 대기 시간 계산 (지연 누적 방지) elapsed = time.time() - start_time wait_seconds = max(10, interval_seconds - elapsed) logger.info( "[SYSTEM] 작업 소요: %.1f초 | 다음 실행까지 %.1f초 대기 (목표 주기: %d초)", elapsed, wait_seconds, interval_seconds, ) # Sleep in small intervals to check shutdown flag sleep_interval = 1.0 slept = 0.0 while slept < wait_seconds and not _shutdown_requested: time.sleep(min(sleep_interval, wait_seconds - slept)) slept += sleep_interval except KeyboardInterrupt: logger.info("[SYSTEM] 사용자가 루프를 중단함") finally: logger.info("[SYSTEM] 프로그램 종료 완료") if __name__ == "__main__": main()