From 901e5ba6479678664c5645db98c656a03d14b596 Mon Sep 17 00:00:00 2001 From: Gnill82 Date: Sat, 30 Aug 2025 17:41:43 +0900 Subject: [PATCH] Enhance logging and progress reporting: implement console locking for thread-safe output, improve progress and stage messages, and replace tqdm with custom progress functions. --- ds_new_user_analy.py | 169 +++++++++++++++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 37 deletions(-) diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index 80d938f..9b447a9 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -32,7 +32,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed, Future from pathlib import Path import pandas as pd -from tqdm import tqdm from opensearchpy import OpenSearch from opensearchpy.helpers import scan @@ -60,7 +59,7 @@ OPENSEARCH_CONFIG = { KST = timezone(timedelta(hours=9)) # 성능 최적화 설정 (오픈서치 스펙 기반 최적화) -DEFAULT_BATCH_SIZE = 2000 +DEFAULT_BATCH_SIZE = 1000 DEFAULT_MAX_WORKERS = 16 DEFAULT_COMPOSITE_SIZE = 2000 DEFAULT_TIMEOUT = 180 @@ -75,6 +74,7 @@ OUTPUT_DIR.mkdir(exist_ok=True) # 전역 변수 stop_timer_event = threading.Event() +console_lock = threading.Lock() # 콘솔 출력용 스레드 락 logger = None @@ -151,12 +151,83 @@ def setup_logging(log_file_path: str) -> logging.Logger: def print_progress(message: str, end_with_newline: bool = False): - """진행률을 제자리에서 갱신하여 출력""" - if end_with_newline: - print(f"\r{message}") - sys.stdout.flush() - else: - print(f"\r{message}", end="", flush=True) + """진행률을 제자리에서 갱신하여 출력 (세부 진행률용)""" + with console_lock: + try: + if end_with_newline: + print(f"\r{message}") + sys.stdout.flush() + else: + print(f"\r{message}", end="", flush=True) + except (OSError, IOError) as e: + # 파이프 오류 시 로깅으로 대체 + if logger: + logger.info(f"PROGRESS: {message}") + pass + + +def print_stage(message: str): + """주요 단계 시작을 새 줄로 표시""" + with console_lock: + try: + print(f"\n{message}") + sys.stdout.flush() + except (OSError, IOError) as e: + # 파이프 오류 시 로깅으로 대체 + if logger: + logger.info(f"STAGE: {message}") + pass + + +def print_complete(message: str): + """단계 완료를 새 줄로 표시""" + with console_lock: + try: + print(f"\r✓ {message}") + sys.stdout.flush() + except (OSError, IOError) as e: + # 파이프 오류 시 로깅으로 대체 + if logger: + logger.info(f"COMPLETE: {message}") + pass + + +class ProcessingTimer: + """긴 작업 중 타임아웃 메시지 표시용 타이머""" + def __init__(self, timeout_seconds: int = 10): + self.timeout_seconds = timeout_seconds + self.timer = None + self.is_active = False + self.last_message = "" + + def start(self, message: str): + """타이머 시작""" + self.stop() # 기존 타이머 정리 + self.last_message = message + self.is_active = True + self._schedule_next() + + def stop(self): + """타이머 중지""" + self.is_active = False + if self.timer: + self.timer.cancel() + self.timer = None + + def _schedule_next(self): + """다음 타임아웃 메시지 스케줄링""" + if self.is_active: + def show_timeout(): + if self.is_active: + print(f"\r{self.last_message} (처리 중...)", end="", flush=True) + self._schedule_next() + + self.timer = threading.Timer(self.timeout_seconds, show_timeout) + self.timer.start() + + +# 전역 타이머 인스턴스 +processing_timer = ProcessingTimer() # ============================================================================== @@ -854,6 +925,8 @@ def get_new_user_cohort_optimized( logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") logger.info(f"페이지 크기: {page_size}") + print_stage("🔍 신규 유저 코호트 선정 시작...") + cohort = {} after_key = None @@ -936,8 +1009,8 @@ def get_new_user_cohort_optimized( logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}") break - # 최종 진행률 표시 - print_progress(f"코호트 선정 완료: 총 {len(new_user_map)}명 확인", end_with_newline=True) + # 최종 진행률 표시 및 완료 메시지 + print_complete(f"신규 유저 코호트 선정 완료: 총 {len(new_user_map)}명 확인") logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") # Step 2: 모든 create_uid 유저를 cohort에 추가 @@ -963,12 +1036,14 @@ def get_new_user_cohort_optimized( } total_users += 1 - print_progress(f"코호트 초기화 완료: {total_users}명") + print_complete(f"코호트 초기화 완료: {total_users}명") logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료") # Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위) + print_stage("📊 login_comp 인덱스에서 추가 정보 수집 중...") logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...") login_comp_collected = set() + processing_timer.start("login_comp 정보 수집") for i in range(0, len(uid_list), chunk_size): chunk_uids = uid_list[i:i+chunk_size] @@ -1044,7 +1119,8 @@ def get_new_user_cohort_optimized( except Exception as e: logger.error(f"login_comp 정보 수집 중 오류: {e}") - print_progress(f"login_comp 정보 수집: {len(login_comp_collected)}/{total_users}명 완료") + processing_timer.stop() + print_complete(f"login_comp 정보 수집 완료: {len(login_comp_collected)}/{total_users}명") logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료") # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위) @@ -1052,8 +1128,10 @@ def get_new_user_cohort_optimized( missing_uids = [uid for uid in uid_list if uid not in login_comp_collected] if missing_uids: + print_stage(f"🏠 lobby 인덱스에서 차선 정보 수집 중 ({len(missing_uids)}명)...") logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...") lobby_collected = set() + processing_timer.start("lobby 정보 수집") for i in range(0, len(missing_uids), chunk_size): chunk_uids = missing_uids[i:i+chunk_size] @@ -1123,11 +1201,13 @@ def get_new_user_cohort_optimized( except Exception as e: logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") - print_progress(f"lobby 추가 정보 수집: {len(lobby_collected)}명 완료") + processing_timer.stop() + print_complete(f"lobby 추가 정보 수집 완료: {len(lobby_collected)}명") logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료") # 최종 통계 auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A') + print_complete(f"코호트 정보 수집 완료: auth.id {auth_id_count}/{total_users}명 수집") logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}명") logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") @@ -1647,11 +1727,11 @@ def process_cohort_fixed_parallel( logger.info(f"분석 지표: {len(metrics_config)}개") logger.info(f"세션 지표 포함: {'예' if include_session_metrics else '아니오'}") - # 진행률 추적용 로거 - uid_list = list(cohort.keys()) chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] + print_stage(f"⚡ 병렬 배치 처리 시작 ({len(cohort)}명, {len(chunks)}개 배치)...") + all_results = [] failed_chunks = [] @@ -1662,32 +1742,35 @@ def process_cohort_fixed_parallel( } completed_chunks = 0 - with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar: - for future in as_completed(future_to_chunk): - chunk = future_to_chunk[future] - try: - batch_results = future.result(timeout=600) - if batch_results: - all_results.extend(batch_results) - else: - failed_chunks.append(chunk) - - completed_chunks += 1 - # 콘솔 진행률 업데이트 - print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨") - - except Exception as e: - logger.warning(f"배치 처리 실패: {e}") + processing_timer.start("병렬 배치 처리") + + for future in as_completed(future_to_chunk): + chunk = future_to_chunk[future] + try: + batch_results = future.result(timeout=600) + if batch_results: + all_results.extend(batch_results) + else: failed_chunks.append(chunk) - completed_chunks += 1 - print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨") - finally: - pbar.update(1) + + completed_chunks += 1 + # 콘솔 진행률 업데이트 + print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨") + + except Exception as e: + logger.warning(f"배치 처리 실패: {e}") + failed_chunks.append(chunk) + completed_chunks += 1 + print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨") + + processing_timer.stop() # 실패한 청크 재처리 if failed_chunks: + print_stage(f"🔄 실패한 배치 재처리 중 ({len(failed_chunks)}개)...") logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") - print_progress(f"재처리 중: {len(failed_chunks)}개 배치...") + processing_timer.start("배치 재처리") + for i, chunk in enumerate(failed_chunks): try: batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics) @@ -1695,9 +1778,12 @@ def process_cohort_fixed_parallel( print_progress(f"재처리 중: {i+1}/{len(failed_chunks)} 완료") except Exception as e: logger.error(f"재처리 실패: {e}") + + processing_timer.stop() + print_complete(f"재처리 완료: {len(failed_chunks)}개 배치 처리됨") # 최종 결과 - print_progress(f"병렬 처리 완료: 총 {len(all_results)}명 성공", end_with_newline=True) + print_complete(f"병렬 처리 완료: 총 {len(all_results)}명 성공") logger.info(f"2단계 완료: {len(all_results)}명 처리 성공") logger.info("=" * 80) return all_results @@ -1709,6 +1795,9 @@ def write_fixed_results(results: List[Dict], output_path: str, include_session_m logger.info("=" * 80) logger.info("3단계: 결과 저장") + print_stage("💾 분석 결과 CSV 파일 저장 중...") + processing_timer.start("CSV 파일 저장") + if not results: logger.error("저장할 결과 데이터가 없습니다.") return @@ -1764,6 +1853,8 @@ def write_fixed_results(results: List[Dict], output_path: str, include_session_m for result in results: writer.writerow(result) + processing_timer.stop() + print_complete(f"CSV 파일 저장 완료: {len(results)}명 데이터, {len(headers)}개 지표") logger.info(f"결과 파일 저장 완료: {output_path}") logger.info(f"총 {len(results)}명의 데이터 저장") logger.info(f"분석 지표: {len(headers)}개") @@ -1935,6 +2026,10 @@ def main(): logger.info(f"총 소요 시간: {total_time}") logger.info("수정된 분석 완료!") logger.info("=" * 80) + + print_complete(f"🎉 전체 분석 완료! 소요 시간: {total_time}") + # 타이머 정리 + processing_timer.stop() if __name__ == "__main__":