Files
AutoCoinTrader/src/notifications.py

173 lines
6.3 KiB
Python

import threading
import time
import requests
from .common import logger
from .constants import (
TELEGRAM_MAX_MESSAGE_LENGTH,
TELEGRAM_RATE_LIMIT_DELAY,
TELEGRAM_REQUEST_TIMEOUT,
)
__all__ = ["send_telegram", "send_telegram_with_retry", "report_error", "send_startup_test_message"]
def send_telegram_with_retry(
token: str,
chat_id: str,
text: str,
add_thread_prefix: bool = True,
parse_mode: str = None,
max_retries: int | None = None,
) -> bool:
"""
재시도 로직이 포함된 텔레그램 메시지 전송
Args:
token: 텔레그램 봇 토큰
chat_id: 채팅 ID
text: 메시지 내용
add_thread_prefix: 스레드 prefix 추가 여부
parse_mode: HTML/Markdown 파싱 모드
max_retries: 최대 재시도 횟수 (None이면 기본값 3)
Returns:
성공 여부 (True/False)
"""
if max_retries is None:
max_retries = 3
for attempt in range(max_retries):
try:
# 이제 send_telegram은 실패 시 예외를 발생시킴
send_telegram(token, chat_id, text, add_thread_prefix, parse_mode)
return True
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2**attempt # Exponential backoff: 1s, 2s, 4s
logger.warning(
"텔레그램 전송 실패 (시도 %d/%d), %d초 후 재시도: %s", attempt + 1, max_retries, wait_time, e
)
time.sleep(wait_time)
else:
logger.error("텔레그램 전송 최종 실패 (%d회 시도): %s", max_retries, e)
return False
return False
def send_telegram(
token: str,
chat_id: str,
text: str,
add_thread_prefix: bool = True,
parse_mode: str = None,
max_length: int = TELEGRAM_MAX_MESSAGE_LENGTH,
):
"""
텔레그램 메시지를 전송합니다 (자동 분할 지원).
Args:
token: 텔레그램 봇 토큰
chat_id: 채팅 ID
text: 메시지 내용
add_thread_prefix: 스레드 이름 prefix 추가 여부
parse_mode: HTML/Markdown 파싱 모드
max_length: 최대 메시지 길이 (Telegram 제한 4096자, 안전하게 4000자)
Note:
- 메시지가 max_length를 초과하면 자동으로 분할하여 전송합니다.
- 실패 시 예외를 발생시킵니다.
- 프로덕션에서는 send_telegram_with_retry() 사용 권장
"""
if add_thread_prefix:
thread_name = threading.current_thread().name
# 기본 Thread-N 이름이면 prefix 생략 (의미 없는 정보)
if not thread_name.startswith("Thread-"):
payload_text = f"[{thread_name}] {text}"
else:
payload_text = text
else:
payload_text = text
url = f"https://api.telegram.org/bot{token}/sendMessage"
# ✅ 메시지 길이 확인 및 분할
if len(payload_text) <= max_length:
# 단일 메시지 전송
payload = {"chat_id": chat_id, "text": payload_text}
if parse_mode:
payload["parse_mode"] = parse_mode
try:
resp = requests.post(url, json=payload, timeout=TELEGRAM_REQUEST_TIMEOUT)
resp.raise_for_status()
logger.debug("텔레그램 메시지 전송 성공: %s", payload_text[:80])
return True
except requests.exceptions.Timeout as e:
logger.warning("텔레그램 타임아웃: %s", e)
raise
except requests.exceptions.ConnectionError as e:
logger.warning("텔레그램 연결 오류: %s", e)
raise
except requests.exceptions.HTTPError as e:
logger.warning("텔레그램 HTTP 오류: %s", e)
raise
except requests.exceptions.RequestException as e:
logger.warning("텔레그램 API 요청 실패: %s", e)
raise
else:
# ✅ 메시지 분할 전송
chunks = [payload_text[i : i + max_length] for i in range(0, len(payload_text), max_length)]
logger.info("텔레그램 메시지 길이 초과 (%d자), %d개로 분할 전송", len(payload_text), len(chunks))
for i, chunk in enumerate(chunks, 1):
header = f"[메시지 {i}/{len(chunks)}]\n" if len(chunks) > 1 else ""
payload = {"chat_id": chat_id, "text": header + chunk}
if parse_mode:
payload["parse_mode"] = parse_mode
try:
resp = requests.post(url, json=payload, timeout=TELEGRAM_REQUEST_TIMEOUT)
resp.raise_for_status()
logger.debug("텔레그램 분할 메시지 전송 성공 (%d/%d)", i, len(chunks))
# Rate Limit 방지
if i < len(chunks):
time.sleep(TELEGRAM_RATE_LIMIT_DELAY)
except requests.exceptions.RequestException as e:
logger.error("텔레그램 분할 메시지 전송 실패 (%d/%d): %s", i, len(chunks), e)
raise
return True
def report_error(bot_token: str, chat_id: str, message: str, dry_run: bool):
"""
Report an error via Telegram.
"""
if not dry_run and bot_token and chat_id:
# 재시도 로직이 포함된 함수 사용
send_telegram_with_retry(bot_token, chat_id, message, add_thread_prefix=True)
def send_startup_test_message(bot_token: str, chat_id: str, parse_mode: str, dry_run: bool):
"""
Send a startup test message to verify Telegram settings.
"""
if dry_run:
logger.info("[dry-run] Telegram 테스트 메시지 전송 생략")
return
if bot_token and chat_id:
test_msg = "[테스트] Telegram 설정 확인용 메시지입니다. 봇/채팅 설정이 올바르면 이 메시지가 도착합니다."
logger.info("텔레그램 테스트 메시지 전송 시도")
# 재시도 로직이 포함된 함수 사용
if send_telegram_with_retry(bot_token, chat_id, test_msg, add_thread_prefix=False, parse_mode=parse_mode):
logger.info("텔레그램 테스트 메시지 전송 성공")
else:
logger.warning("텔레그램 테스트 메시지 전송 실패")
else:
logger.warning("TELEGRAM_TEST=1 이지만 TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID가 설정되어 있지 않습니다")