import time import threading from typing import List from .config import RuntimeConfig from .common import logger from .signals import process_symbol from .notifications import send_telegram def run_sequential(symbols: List[str], cfg: RuntimeConfig, aggregate_enabled: bool = False): logger.info("순차 처리 시작 (심볼 수=%d)", len(symbols)) alerts = [] buy_signal_count = 0 for i, sym in enumerate(symbols): try: res = process_symbol(sym, cfg=cfg) for line in res.get("summary", []): logger.info(line) if res.get("telegram"): buy_signal_count += 1 if cfg.dry_run: logger.info("[dry-run] 알림 내용:\n%s", res["telegram"]) else: # dry_run이 아닐 때는 콘솔에 메시지 출력하지 않음 pass if cfg.telegram_bot_token and cfg.telegram_chat_id: send_telegram( cfg.telegram_bot_token, cfg.telegram_chat_id, res["telegram"], add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ) else: logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가") alerts.append({"symbol": sym, "text": res["telegram"]}) except Exception as e: logger.exception("심볼 처리 오류: %s -> %s", sym, e) if i < len(symbols) - 1 and cfg.symbol_delay is not None: logger.debug("다음 심볼까지 %.2f초 대기", cfg.symbol_delay) time.sleep(cfg.symbol_delay) if aggregate_enabled and len(alerts) > 1: summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"] summary_lines += [f"- {a['symbol']}" for a in alerts] summary_text = "\n".join(summary_lines) if cfg.dry_run: logger.info("[dry-run] 알림 요약:\n%s", summary_text) else: if cfg.telegram_bot_token and cfg.telegram_chat_id: send_telegram( cfg.telegram_bot_token, cfg.telegram_chat_id, summary_text, add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ) else: logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가") # 매수 조건이 하나도 충족되지 않은 경우 알림 전송 if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts): send_telegram( cfg.telegram_bot_token, cfg.telegram_chat_id, "[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ) return buy_signal_count def run_with_threads(symbols: List[str], cfg: RuntimeConfig, aggregate_enabled: bool = False): logger.info( "병렬 처리 시작 (심볼 수=%d, 스레드 수=%d, 심볼 간 지연=%.2f초)", len(symbols), cfg.max_threads or 0, cfg.symbol_delay or 0.0, ) semaphore = threading.Semaphore(cfg.max_threads) threads = [] last_request_time = [0] throttle_lock = threading.Lock() results = {} results_lock = threading.Lock() def worker(symbol: str): try: with semaphore: with throttle_lock: elapsed = time.time() - last_request_time[0] if cfg.symbol_delay is not None and elapsed < cfg.symbol_delay: sleep_time = cfg.symbol_delay - elapsed logger.debug("[%s] 스로틀 대기: %.2f초", symbol, sleep_time) time.sleep(sleep_time) last_request_time[0] = time.time() res = process_symbol(symbol, cfg=cfg) with results_lock: results[symbol] = res except Exception as e: logger.exception("[%s] 워커 스레드 오류: %s", symbol, e) for sym in symbols: t = threading.Thread(target=worker, args=(sym,), name=f"Worker-{sym}") threads.append(t) t.start() for t in threads: t.join() alerts = [] buy_signal_count = 0 for sym in symbols: with results_lock: res = results.get(sym) if not res: logger.warning("심볼 결과 없음: %s", sym) continue for line in res.get("summary", []): logger.info(line) if res.get("telegram"): buy_signal_count += 1 if cfg.dry_run: logger.info("[dry-run] 알림 내용:\n%s", res["telegram"]) if cfg.telegram_bot_token and cfg.telegram_chat_id: send_telegram( cfg.telegram_bot_token, cfg.telegram_chat_id, res["telegram"], add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ) else: logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가") alerts.append({"symbol": sym, "text": res["telegram"]}) if aggregate_enabled and len(alerts) > 1: summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"] summary_lines += [f"- {a['symbol']}" for a in alerts] summary_text = "\n".join(summary_lines) if cfg.dry_run: logger.info("[dry-run] 알림 요약:\n%s", summary_text) else: if cfg.telegram_bot_token and cfg.telegram_chat_id: send_telegram( cfg.telegram_bot_token, cfg.telegram_chat_id, summary_text, add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ) else: logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가") # 매수 조건이 하나도 충족되지 않은 경우 알림 전송 if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts): send_telegram( cfg.telegram_bot_token, cfg.telegram_chat_id, "[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode, ) logger.info("병렬 처리 완료") return buy_signal_count