업데이트

This commit is contained in:
2025-12-09 21:39:23 +09:00
parent dd9acf62a3
commit 37a150bd0d
35 changed files with 5587 additions and 493 deletions

View File

@@ -1,160 +1,170 @@
import time
import threading
from typing import List
from .config import RuntimeConfig
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
from .common import logger
from .config import RuntimeConfig
from .notifications import send_telegram_with_retry
from .signals import process_symbol
from .notifications import send_telegram
def run_sequential(symbols: List[str], cfg: RuntimeConfig, aggregate_enabled: bool = False):
logger.info("순차 처리 시작 (심볼 수=%d)", len(symbols))
alerts = []
buy_signal_count = 0
for i, sym in enumerate(symbols):
try:
res = process_symbol(sym, cfg=cfg)
for line in res.get("summary", []):
logger.info(line)
if res.get("telegram"):
buy_signal_count += 1
if cfg.dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
else:
# dry_run이 아닐 때는 콘솔에 메시지 출력하지 않음
pass
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
res["telegram"],
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": sym, "text": res["telegram"]})
except Exception as e:
logger.exception("심볼 처리 오류: %s -> %s", sym, e)
if i < len(symbols) - 1 and cfg.symbol_delay is not None:
logger.debug("다음 심볼까지 %.2f초 대기", cfg.symbol_delay)
time.sleep(cfg.symbol_delay)
if aggregate_enabled and len(alerts) > 1:
def _process_result_and_notify(
symbol: str, res: dict[str, Any], cfg: RuntimeConfig, alerts: list[dict[str, str]]
) -> bool:
"""
Process the result of process_symbol and send notifications if needed.
Returns True if a buy signal was triggered (telegram message sent), False otherwise.
"""
if not res:
logger.warning("심볼 결과 없음: %s", symbol)
return False
for line in res.get("summary", []):
logger.info(line)
buy_signal_triggered = False
if res.get("telegram"):
buy_signal_triggered = True
if cfg.dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
if cfg.telegram_bot_token and cfg.telegram_chat_id:
# ✅ 재시도 로직 포함
if not send_telegram_with_retry(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
res["telegram"],
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
):
logger.error("심볼 %s 알림 전송 최종 실패", symbol)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": symbol, "text": res["telegram"]})
return buy_signal_triggered
def _send_aggregated_summary(alerts: list[dict[str, str]], cfg: RuntimeConfig):
"""Send aggregated summary if multiple alerts occurred."""
if len(alerts) > 1:
summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"]
summary_lines += [f"- {a['symbol']}" for a in alerts]
summary_text = "\n".join(summary_lines)
if cfg.dry_run:
logger.info("[dry-run] 알림 요약:\n%s", summary_text)
else:
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
# ✅ 재시도 로직 포함
if not send_telegram_with_retry(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
summary_text,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
):
logger.error("알림 요약 전송 최종 실패")
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가")
# 매수 조건이 하나도 충족되지 않은 경우 알림 전송
def _notify_no_signals(alerts: list[dict[str, str]], cfg: RuntimeConfig):
"""Notify if no buy signals were triggered."""
if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts):
send_telegram(
# ✅ 재시도 로직 포함
if not send_telegram_with_retry(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
):
logger.error("정상 작동 알림 전송 최종 실패")
def run_sequential(symbols: list[str], cfg: RuntimeConfig, aggregate_enabled: bool = False):
logger.info("순차 처리 시작 (심볼 수=%d)", len(symbols))
alerts = []
buy_signal_count = 0
for i, sym in enumerate(symbols):
try:
res = process_symbol(sym, cfg=cfg)
if _process_result_and_notify(sym, res, cfg, alerts):
buy_signal_count += 1
except Exception as e:
logger.exception("심볼 처리 오류: %s -> %s", sym, e)
if i < len(symbols) - 1 and cfg.symbol_delay is not None:
logger.debug("다음 심볼까지 %.2f초 대기", cfg.symbol_delay)
time.sleep(cfg.symbol_delay)
if aggregate_enabled:
_send_aggregated_summary(alerts, cfg)
_notify_no_signals(alerts, cfg)
return buy_signal_count
def run_with_threads(symbols: List[str], cfg: RuntimeConfig, aggregate_enabled: bool = False):
def run_with_threads(symbols: list[str], cfg: RuntimeConfig, aggregate_enabled: bool = False):
logger.info(
"병렬 처리 시작 (심볼 수=%d, 스레드 수=%d, 심볼 간 지연=%.2f초)",
len(symbols),
cfg.max_threads or 0,
cfg.symbol_delay or 0.0,
)
semaphore = threading.Semaphore(cfg.max_threads)
threads = []
last_request_time = [0]
alerts = []
buy_signal_count = 0
max_workers = cfg.max_threads or 4
# Throttle control
last_request_time = [0.0]
throttle_lock = threading.Lock()
results = {}
results_lock = threading.Lock()
def worker(symbol: str):
try:
with semaphore:
with throttle_lock:
elapsed = time.time() - last_request_time[0]
if cfg.symbol_delay is not None and elapsed < cfg.symbol_delay:
sleep_time = cfg.symbol_delay - elapsed
logger.debug("[%s] 스로틀 대기: %.2f", symbol, sleep_time)
time.sleep(sleep_time)
last_request_time[0] = time.time()
res = process_symbol(symbol, cfg=cfg)
with results_lock:
results[symbol] = res
with throttle_lock:
elapsed = time.time() - last_request_time[0]
if cfg.symbol_delay is not None and elapsed < cfg.symbol_delay:
sleep_time = cfg.symbol_delay - elapsed
logger.debug("[%s] 스로틀 대기: %.2f", symbol, sleep_time)
time.sleep(sleep_time)
last_request_time[0] = time.time()
return symbol, process_symbol(symbol, cfg=cfg)
except Exception as e:
logger.exception("[%s] 워커 스레드 오류: %s", symbol, e)
return symbol, None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_symbol = {executor.submit(worker, sym): sym for sym in symbols}
# Collect results as they complete
results = {}
for future in as_completed(future_to_symbol):
sym = future_to_symbol[future]
try:
symbol, res = future.result()
results[symbol] = res
except Exception as e:
logger.exception("[%s] Future 결과 조회 오류: %s", sym, e)
# Process results in original order to maintain consistent log/alert order if desired,
# or just process as is. Here we process in original symbol order.
for sym in symbols:
t = threading.Thread(target=worker, args=(sym,), name=f"Worker-{sym}")
threads.append(t)
t.start()
for t in threads:
t.join()
alerts = []
buy_signal_count = 0
for sym in symbols:
with results_lock:
res = results.get(sym)
if not res:
logger.warning("심볼 결과 없음: %s", sym)
continue
for line in res.get("summary", []):
logger.info(line)
if res.get("telegram"):
buy_signal_count += 1
if cfg.dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
res["telegram"],
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": sym, "text": res["telegram"]})
if aggregate_enabled and len(alerts) > 1:
summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"]
summary_lines += [f"- {a['symbol']}" for a in alerts]
summary_text = "\n".join(summary_lines)
if cfg.dry_run:
logger.info("[dry-run] 알림 요약:\n%s", summary_text)
else:
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
summary_text,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가")
# 매수 조건이 하나도 충족되지 않은 경우 알림 전송
if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts):
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
res = results.get(sym)
if res:
if _process_result_and_notify(sym, res, cfg, alerts):
buy_signal_count += 1
if aggregate_enabled:
_send_aggregated_summary(alerts, cfg)
_notify_no_signals(alerts, cfg)
logger.info("병렬 처리 완료")
return buy_signal_count