Files
opensearch-tracker/utils.py
2025-08-18 21:21:29 +09:00

18 lines
611 B
Python

import os
from typing import Any, List, Optional
# =========================
# 유틸: 체크포인트 저장/로드
# =========================
def load_checkpoint(path: str) -> Optional[List[Any]]:
"""search_after에 전달할 마지막 sort 값 배열([timestamp, _id]) 로드."""
if not os.path.exists(path):
return None
with open(path, "r", encoding="utf-8") as f:
return f.readline()
def save_checkpoint(path: str, lastTime: str) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(lastTime)