Refactor logging system: streamline console output, enhance message filtering, and replace progress logger with direct print function for better clarity.

This commit is contained in:
Gnill82
2025-08-30 17:08:15 +09:00
parent f431fdb19d
commit 21e53a5164

View File

@ -17,6 +17,7 @@ OpenSearch 매핑 오류 수정: nested → object 쿼리 변경
"""
import os
import sys
import csv
import json
import time
@ -93,37 +94,34 @@ def setup_logging(log_file_path: str) -> logging.Logger:
class ConsoleFormatter(logging.Formatter):
"""Console-optimized formatter - 간결한 콘솔 출력"""
def format(self, record):
# 콘솔에는 시간 없이 중요 메시지만 표시
if record.levelno >= logging.WARNING: # WARNING 이상은 상세 표시
# WARNING 이상은 상세 표시
if record.levelno >= logging.WARNING:
return f"[{record.levelname}] {record.getMessage()}"
elif record.name == 'progress': # 진행률 추적용 특별 로거
return record.getMessage() # 시간 없이 출력
else:
# 일반 INFO 로그는 상당히 간결하게
msg = record.getMessage()
# 주요 정보만 콘솔에 표시
if any(keyword in msg for keyword in [
"던전 스토커즈 신규 유저 분석",
"분석 기간",
"세션 지표 포함",
"총 신규 유저",
"분석 완료",
"분석 요약",
"리텐션 퍼널",
"D0→D1", "D1→D2", "D2→D3", "D3→D4", "D4→D5", "D5→D6", "D6→D7",
"총 소요 시간",
"Retained_d",
"평균 활동 시간",
"="*20 # 구분선
]):
return msg
else:
return None
def formatMessage(self, record):
formatted = self.format(record)
return formatted if formatted is not None else ""
# 일반 INFO 로그는 중요한 것만 표시
msg = record.getMessage()
important_keywords = [
"던전 스토커즈 신규 유저 분석",
"분석 기간",
"세션 지표 포함",
" 신규 유저",
"분석 완료",
"분석 요약",
"리텐션 퍼널",
"D0→D1", "D1→D2", "D2→D3", "D3→D4", "D4→D5", "D5→D6", "D6→D7",
"총 소요 시간",
"Retained_d",
"평균 활동 시간",
"="*20 # 구분선
]
if any(keyword in msg for keyword in important_keywords):
return msg
# 중요하지 않은 로그는 빈 문자열 반환 (출력 안함)
return ""
# 기본 로거 설정
logger = logging.getLogger('NewUserAnalyzer')
logger.setLevel(logging.INFO)
@ -141,8 +139,8 @@ def setup_logging(log_file_path: str) -> logging.Logger:
console_handler = logging.StreamHandler()
console_formatter = ConsoleFormatter()
console_handler.setFormatter(console_formatter)
# 콘솔용 필터 추가
console_handler.addFilter(lambda record: console_formatter.format(record) is not None)
# 빈 문자열은 출력하지 않도록 필터링
console_handler.addFilter(lambda record: console_formatter.format(record) != "")
logger.addHandler(console_handler)
# 외부 라이브러리 로그 억제
@ -152,6 +150,15 @@ def setup_logging(log_file_path: str) -> logging.Logger:
return logger
def print_progress(message: str, end_with_newline: bool = False):
"""진행률을 제자리에서 갱신하여 출력"""
if end_with_newline:
print(f"\r{message}")
sys.stdout.flush()
else:
print(f"\r{message}", end="", flush=True)
# ==============================================================================
# 3. OpenSearch 연결 및 유틸리티
# ==============================================================================
@ -880,7 +887,6 @@ def get_new_user_cohort_optimized(
new_user_map = {} # uid -> {'create_time': ...}
page_count = 0
total_pages_estimate = 0
progress_logger = logging.getLogger('progress')
while True:
page_count += 1
@ -891,9 +897,9 @@ def get_new_user_cohort_optimized(
try:
# 진행률 표시 (제자리 갱신)
if page_count == 1:
progress_logger.info("\r코호트 선정: 페이지 1 처리 중...")
print_progress("코호트 선정: 페이지 1 처리 중...")
else:
progress_logger.info(f"\r코호트 선정: 페이지 {page_count} 처리 중... (수집: {len(new_user_map)}명)")
print_progress(f"코호트 선정: 페이지 {page_count} 처리 중... (수집: {len(new_user_map)}명)")
logger.info(f"create_uid 페이지 {page_count} 처리 중...")
response = exponential_backoff_retry(
@ -920,7 +926,7 @@ def get_new_user_cohort_optimized(
logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨")
# 콘솔에는 간결하게
progress_logger.info(f"\r코호트 선정: 페이지 {page_count} 완료 (수집: {len(new_user_map)}명)")
print_progress(f"코호트 선정: 페이지 {page_count} 완료 (수집: {len(new_user_map)}명)")
after_key = response["aggregations"]["new_users"].get("after_key")
if not after_key:
@ -931,7 +937,7 @@ def get_new_user_cohort_optimized(
break
# 최종 진행률 표시
progress_logger.info(f"\r코호트 선정 완료: 총 {len(new_user_map)}명 확인 \n")
print_progress(f"코호트 선정 완료: 총 {len(new_user_map)}명 확인", end_with_newline=True)
logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨")
# Step 2: 모든 create_uid 유저를 cohort에 추가
@ -957,7 +963,7 @@ def get_new_user_cohort_optimized(
}
total_users += 1
progress_logger.info(f"\r코호트 초기화 완료: {total_users} ")
print_progress(f"코호트 초기화 완료: {total_users}")
logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료")
# Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위)
@ -1038,7 +1044,7 @@ def get_new_user_cohort_optimized(
except Exception as e:
logger.error(f"login_comp 정보 수집 중 오류: {e}")
progress_logger.info(f"\rlogin_comp 정보 수집: {len(login_comp_collected)}/{total_users}명 완료 ")
print_progress(f"login_comp 정보 수집: {len(login_comp_collected)}/{total_users}명 완료")
logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료")
# Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위)
@ -1117,7 +1123,7 @@ def get_new_user_cohort_optimized(
except Exception as e:
logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}")
progress_logger.info(f"\rlobby 추가 정보 수집: {len(lobby_collected)}명 완료 ")
print_progress(f"lobby 추가 정보 수집: {len(lobby_collected)}명 완료")
logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료")
# 최종 통계
@ -1395,7 +1401,6 @@ def process_fixed_batch(
"""수정된 배치 처리 함수"""
# 진행률 추적용 로거
progress_logger = logging.getLogger('progress')
try:
# 1. 세션 지표 계산 (--full 옵션일 때만)
@ -1416,7 +1421,7 @@ def process_fixed_batch(
body_ndjson = "\n".join(msearch_queries) + "\n"
# 콘솔에는 간단히, 파일에는 상세히
progress_logger.info(f"\r배치 처리 중: {len(batch_uids)}명, {len(msearch_queries)//2}개 쿼리")
print_progress(f"배치 처리 중: {len(batch_uids)}명, {len(msearch_queries)//2}개 쿼리")
logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리")
msearch_responses = exponential_backoff_retry(
@ -1612,7 +1617,7 @@ def process_fixed_batch(
except Exception as e:
logger.warning(f"UID '{uid}' 처리 중 오류: {e}")
progress_logger.info(f"\r배치 완료: {len(batch_results)} ")
print_progress(f"배치 완료: {len(batch_results)}")
logger.info(f"배치 처리 완료: {len(batch_results)}명 성공")
return batch_results
@ -1643,7 +1648,6 @@ def process_cohort_fixed_parallel(
logger.info(f"세션 지표 포함: {'' if include_session_metrics else '아니오'}")
# 진행률 추적용 로거
progress_logger = logging.getLogger('progress')
uid_list = list(cohort.keys())
chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)]
@ -1670,30 +1674,30 @@ def process_cohort_fixed_parallel(
completed_chunks += 1
# 콘솔 진행률 업데이트
progress_logger.info(f"\r병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨")
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨")
except Exception as e:
logger.warning(f"배치 처리 실패: {e}")
failed_chunks.append(chunk)
completed_chunks += 1
progress_logger.info(f"\r병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨")
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨")
finally:
pbar.update(1)
# 실패한 청크 재처리
if failed_chunks:
logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...")
progress_logger.info(f"\r재처리 중: {len(failed_chunks)}개 배치...")
print_progress(f"재처리 중: {len(failed_chunks)}개 배치...")
for i, chunk in enumerate(failed_chunks):
try:
batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics)
all_results.extend(batch_results)
progress_logger.info(f"\r재처리 중: {i+1}/{len(failed_chunks)} 완료")
print_progress(f"재처리 중: {i+1}/{len(failed_chunks)} 완료")
except Exception as e:
logger.error(f"재처리 실패: {e}")
# 최종 결과
progress_logger.info(f"\r병렬 처리 완료: 총 {len(all_results)}명 성공 \\n")
print_progress(f"병렬 처리 완료: 총 {len(all_results)}명 성공", end_with_newline=True)
logger.info(f"2단계 완료: {len(all_results)}명 처리 성공")
logger.info("=" * 80)
return all_results