Enhance logging and progress reporting: implement console locking for thread-safe output, improve progress and stage messages, and replace tqdm with custom progress functions.

This commit is contained in:
Gnill82
2025-08-30 17:41:43 +09:00
parent 21e53a5164
commit 901e5ba647

View File

@ -32,7 +32,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
from tqdm import tqdm
from opensearchpy import OpenSearch from opensearchpy import OpenSearch
from opensearchpy.helpers import scan from opensearchpy.helpers import scan
@ -60,7 +59,7 @@ OPENSEARCH_CONFIG = {
KST = timezone(timedelta(hours=9)) KST = timezone(timedelta(hours=9))
# 성능 최적화 설정 (오픈서치 스펙 기반 최적화) # 성능 최적화 설정 (오픈서치 스펙 기반 최적화)
DEFAULT_BATCH_SIZE = 2000 DEFAULT_BATCH_SIZE = 1000
DEFAULT_MAX_WORKERS = 16 DEFAULT_MAX_WORKERS = 16
DEFAULT_COMPOSITE_SIZE = 2000 DEFAULT_COMPOSITE_SIZE = 2000
DEFAULT_TIMEOUT = 180 DEFAULT_TIMEOUT = 180
@ -75,6 +74,7 @@ OUTPUT_DIR.mkdir(exist_ok=True)
# 전역 변수 # 전역 변수
stop_timer_event = threading.Event() stop_timer_event = threading.Event()
console_lock = threading.Lock() # 콘솔 출력용 스레드 락
logger = None logger = None
@ -151,12 +151,83 @@ def setup_logging(log_file_path: str) -> logging.Logger:
def print_progress(message: str, end_with_newline: bool = False): def print_progress(message: str, end_with_newline: bool = False):
"""진행률을 제자리에서 갱신하여 출력""" """진행률을 제자리에서 갱신하여 출력 (세부 진행률용)"""
if end_with_newline: with console_lock:
print(f"\r{message}") try:
sys.stdout.flush() if end_with_newline:
else: print(f"\r{message}")
print(f"\r{message}", end="", flush=True) sys.stdout.flush()
else:
print(f"\r{message}", end="", flush=True)
except (OSError, IOError) as e:
# 파이프 오류 시 로깅으로 대체
if logger:
logger.info(f"PROGRESS: {message}")
pass
def print_stage(message: str):
"""주요 단계 시작을 새 줄로 표시"""
with console_lock:
try:
print(f"\n{message}")
sys.stdout.flush()
except (OSError, IOError) as e:
# 파이프 오류 시 로깅으로 대체
if logger:
logger.info(f"STAGE: {message}")
pass
def print_complete(message: str):
"""단계 완료를 새 줄로 표시"""
with console_lock:
try:
print(f"\r{message}")
sys.stdout.flush()
except (OSError, IOError) as e:
# 파이프 오류 시 로깅으로 대체
if logger:
logger.info(f"COMPLETE: {message}")
pass
class ProcessingTimer:
"""긴 작업 중 타임아웃 메시지 표시용 타이머"""
def __init__(self, timeout_seconds: int = 10):
self.timeout_seconds = timeout_seconds
self.timer = None
self.is_active = False
self.last_message = ""
def start(self, message: str):
"""타이머 시작"""
self.stop() # 기존 타이머 정리
self.last_message = message
self.is_active = True
self._schedule_next()
def stop(self):
"""타이머 중지"""
self.is_active = False
if self.timer:
self.timer.cancel()
self.timer = None
def _schedule_next(self):
"""다음 타임아웃 메시지 스케줄링"""
if self.is_active:
def show_timeout():
if self.is_active:
print(f"\r{self.last_message} (처리 중...)", end="", flush=True)
self._schedule_next()
self.timer = threading.Timer(self.timeout_seconds, show_timeout)
self.timer.start()
# 전역 타이머 인스턴스
processing_timer = ProcessingTimer()
# ============================================================================== # ==============================================================================
@ -854,6 +925,8 @@ def get_new_user_cohort_optimized(
logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}")
logger.info(f"페이지 크기: {page_size}") logger.info(f"페이지 크기: {page_size}")
print_stage("🔍 신규 유저 코호트 선정 시작...")
cohort = {} cohort = {}
after_key = None after_key = None
@ -936,8 +1009,8 @@ def get_new_user_cohort_optimized(
logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}") logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}")
break break
# 최종 진행률 표시 # 최종 진행률 표시 및 완료 메시지
print_progress(f"코호트 선정 완료: 총 {len(new_user_map)}명 확인", end_with_newline=True) print_complete(f"신규 유저 코호트 선정 완료: 총 {len(new_user_map)}명 확인")
logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨") logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨")
# Step 2: 모든 create_uid 유저를 cohort에 추가 # Step 2: 모든 create_uid 유저를 cohort에 추가
@ -963,12 +1036,14 @@ def get_new_user_cohort_optimized(
} }
total_users += 1 total_users += 1
print_progress(f"코호트 초기화 완료: {total_users}") print_complete(f"코호트 초기화 완료: {total_users}")
logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료") logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료")
# Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위) # Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위)
print_stage("📊 login_comp 인덱스에서 추가 정보 수집 중...")
logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...") logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...")
login_comp_collected = set() login_comp_collected = set()
processing_timer.start("login_comp 정보 수집")
for i in range(0, len(uid_list), chunk_size): for i in range(0, len(uid_list), chunk_size):
chunk_uids = uid_list[i:i+chunk_size] chunk_uids = uid_list[i:i+chunk_size]
@ -1044,7 +1119,8 @@ def get_new_user_cohort_optimized(
except Exception as e: except Exception as e:
logger.error(f"login_comp 정보 수집 중 오류: {e}") logger.error(f"login_comp 정보 수집 중 오류: {e}")
print_progress(f"login_comp 정보 수집: {len(login_comp_collected)}/{total_users}명 완료") processing_timer.stop()
print_complete(f"login_comp 정보 수집 완료: {len(login_comp_collected)}/{total_users}")
logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료") logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료")
# Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위) # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위)
@ -1052,8 +1128,10 @@ def get_new_user_cohort_optimized(
missing_uids = [uid for uid in uid_list if uid not in login_comp_collected] missing_uids = [uid for uid in uid_list if uid not in login_comp_collected]
if missing_uids: if missing_uids:
print_stage(f"🏠 lobby 인덱스에서 차선 정보 수집 중 ({len(missing_uids)}명)...")
logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...") logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...")
lobby_collected = set() lobby_collected = set()
processing_timer.start("lobby 정보 수집")
for i in range(0, len(missing_uids), chunk_size): for i in range(0, len(missing_uids), chunk_size):
chunk_uids = missing_uids[i:i+chunk_size] chunk_uids = missing_uids[i:i+chunk_size]
@ -1123,11 +1201,13 @@ def get_new_user_cohort_optimized(
except Exception as e: except Exception as e:
logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}")
print_progress(f"lobby 추가 정보 수집: {len(lobby_collected)}명 완료") processing_timer.stop()
print_complete(f"lobby 추가 정보 수집 완료: {len(lobby_collected)}")
logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료") logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료")
# 최종 통계 # 최종 통계
auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A') auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A')
print_complete(f"코호트 정보 수집 완료: auth.id {auth_id_count}/{total_users}명 수집")
logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}") logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}")
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
@ -1647,11 +1727,11 @@ def process_cohort_fixed_parallel(
logger.info(f"분석 지표: {len(metrics_config)}") logger.info(f"분석 지표: {len(metrics_config)}")
logger.info(f"세션 지표 포함: {'' if include_session_metrics else '아니오'}") logger.info(f"세션 지표 포함: {'' if include_session_metrics else '아니오'}")
# 진행률 추적용 로거
uid_list = list(cohort.keys()) uid_list = list(cohort.keys())
chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)]
print_stage(f"⚡ 병렬 배치 처리 시작 ({len(cohort)}명, {len(chunks)}개 배치)...")
all_results = [] all_results = []
failed_chunks = [] failed_chunks = []
@ -1662,32 +1742,35 @@ def process_cohort_fixed_parallel(
} }
completed_chunks = 0 completed_chunks = 0
with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar: processing_timer.start("병렬 배치 처리")
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
batch_results = future.result(timeout=600)
if batch_results:
all_results.extend(batch_results)
else:
failed_chunks.append(chunk)
completed_chunks += 1 for future in as_completed(future_to_chunk):
# 콘솔 진행률 업데이트 chunk = future_to_chunk[future]
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨") try:
batch_results = future.result(timeout=600)
except Exception as e: if batch_results:
logger.warning(f"배치 처리 실패: {e}") all_results.extend(batch_results)
else:
failed_chunks.append(chunk) failed_chunks.append(chunk)
completed_chunks += 1
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨") completed_chunks += 1
finally: # 콘솔 진행률 업데이트
pbar.update(1) print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨")
except Exception as e:
logger.warning(f"배치 처리 실패: {e}")
failed_chunks.append(chunk)
completed_chunks += 1
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨")
processing_timer.stop()
# 실패한 청크 재처리 # 실패한 청크 재처리
if failed_chunks: if failed_chunks:
print_stage(f"🔄 실패한 배치 재처리 중 ({len(failed_chunks)}개)...")
logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...")
print_progress(f"재처리 중: {len(failed_chunks)}개 배치...") processing_timer.start("배치 재처리")
for i, chunk in enumerate(failed_chunks): for i, chunk in enumerate(failed_chunks):
try: try:
batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics) batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics)
@ -1696,8 +1779,11 @@ def process_cohort_fixed_parallel(
except Exception as e: except Exception as e:
logger.error(f"재처리 실패: {e}") logger.error(f"재처리 실패: {e}")
processing_timer.stop()
print_complete(f"재처리 완료: {len(failed_chunks)}개 배치 처리됨")
# 최종 결과 # 최종 결과
print_progress(f"병렬 처리 완료: 총 {len(all_results)}명 성공", end_with_newline=True) print_complete(f"병렬 처리 완료: 총 {len(all_results)}명 성공")
logger.info(f"2단계 완료: {len(all_results)}명 처리 성공") logger.info(f"2단계 완료: {len(all_results)}명 처리 성공")
logger.info("=" * 80) logger.info("=" * 80)
return all_results return all_results
@ -1709,6 +1795,9 @@ def write_fixed_results(results: List[Dict], output_path: str, include_session_m
logger.info("=" * 80) logger.info("=" * 80)
logger.info("3단계: 결과 저장") logger.info("3단계: 결과 저장")
print_stage("💾 분석 결과 CSV 파일 저장 중...")
processing_timer.start("CSV 파일 저장")
if not results: if not results:
logger.error("저장할 결과 데이터가 없습니다.") logger.error("저장할 결과 데이터가 없습니다.")
return return
@ -1764,6 +1853,8 @@ def write_fixed_results(results: List[Dict], output_path: str, include_session_m
for result in results: for result in results:
writer.writerow(result) writer.writerow(result)
processing_timer.stop()
print_complete(f"CSV 파일 저장 완료: {len(results)}명 데이터, {len(headers)}개 지표")
logger.info(f"결과 파일 저장 완료: {output_path}") logger.info(f"결과 파일 저장 완료: {output_path}")
logger.info(f"{len(results)}명의 데이터 저장") logger.info(f"{len(results)}명의 데이터 저장")
logger.info(f"분석 지표: {len(headers)}") logger.info(f"분석 지표: {len(headers)}")
@ -1936,6 +2027,10 @@ def main():
logger.info("수정된 분석 완료!") logger.info("수정된 분석 완료!")
logger.info("=" * 80) logger.info("=" * 80)
print_complete(f"🎉 전체 분석 완료! 소요 시간: {total_time}")
# 타이머 정리
processing_timer.stop()
if __name__ == "__main__": if __name__ == "__main__":
main() main()