#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 던전 스토커즈 신규 유저 리텐션 분석 스크립트 hack-detector의 고급 최적화 기법을 적용한 대용량 데이터 처리 주요 최적화 기법: 1. Composite Aggregation - 메모리 효율적 코호트 선정 2. Streaming Pattern - 활동 시간 계산 최적화 3. NDJSON + 백오프 재시도 - msearch 배치 처리 안정화 4. Future Pattern - 비동기 병렬 처리 5. Memory Optimization - 스트리밍 CSV 작성 작성자: Claude Code 기획서 기반: DS-new_users-analy.md """ import os import csv import json import time import argparse import threading from datetime import datetime, timedelta, timezone from collections import defaultdict from typing import Dict, List, Optional, Tuple, Generator, Any from concurrent.futures import ThreadPoolExecutor, as_completed, Future import pandas as pd from tqdm import tqdm from opensearchpy import OpenSearch # ============================================================================== # 1. 설정 및 상수 # ============================================================================== # OpenSearch 연결 설정 (기획서에서 업데이트됨) OPENSEARCH_CONFIG = { "host": "ds-opensearch.oneunivrs.com", "port": 9200, "auth": { "username": "admin", "password": "DHp5#r#GYQ9d" }, "use_ssl": True, "verify_certs": False, "timeout": 60, "max_retries": 3, "headers": {"Connection": "close"} } # 한국 표준시 KST = timezone(timedelta(hours=9)) # 성능 최적화 설정 DEFAULT_BATCH_SIZE = 1000 DEFAULT_MAX_WORKERS = 6 DEFAULT_COMPOSITE_SIZE = 1000 # composite aggregation 페이지 크기 DEFAULT_TIMEOUT = 120 SCROLL_TIMEOUT = "5m" SESSION_GAP_MINUTES = 5 # 세션 분리 기준 (5분) MAX_SESSION_HOURS = 3 # 최대 세션 길이 (3시간) # 출력 파일 설정 OUTPUT_DIR = r"E:\DS_Git\DS_data_center\DS Log 분석" # 전역 타이머 제어 stop_timer_event = threading.Event() # ============================================================================== # 2. OpenSearch 연결 및 유틸리티 # ============================================================================== def create_opensearch_client() -> Optional[OpenSearch]: """OpenSearch 클라이언트 생성 (hack-detector 방식)""" print("[INFO] OpenSearch 클러스터에 연결 중...") try: client = OpenSearch( hosts=[{ "host": OPENSEARCH_CONFIG['host'], "port": OPENSEARCH_CONFIG['port'], "scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http" }], http_auth=( OPENSEARCH_CONFIG['auth']['username'], OPENSEARCH_CONFIG['auth']['password'] ), use_ssl=OPENSEARCH_CONFIG['use_ssl'], verify_certs=OPENSEARCH_CONFIG['verify_certs'], ssl_show_warn=False, timeout=OPENSEARCH_CONFIG['timeout'], max_retries=OPENSEARCH_CONFIG['max_retries'], retry_on_timeout=True, headers=OPENSEARCH_CONFIG['headers'] ) if not client.ping(): raise ConnectionError("클러스터에 PING을 보낼 수 없습니다.") print("[SUCCESS] OpenSearch 연결 성공!") return client except Exception as e: print(f"[ERROR] OpenSearch 연결 실패: {e}") return None def exponential_backoff_retry(func, *args, **kwargs) -> Any: """지수 백오프 재시도 패턴 (hack-detector 기법)""" for delay in [1, 2, 4, 8, 16]: try: return func(*args, **kwargs) except Exception as e: if delay == 16: # 마지막 시도 raise e print(f"[WARNING] 재시도 중... {delay}초 대기 (오류: {str(e)[:100]})") time.sleep(delay) # ============================================================================== # 3. 핵심 알고리즘 - Composite Aggregation을 활용한 코호트 선정 # ============================================================================== def get_new_user_cohort_optimized( client: OpenSearch, start_time: str, end_time: str, page_size: int = DEFAULT_COMPOSITE_SIZE ) -> Dict[str, Dict]: """ Composite Aggregation을 활용한 메모리 효율적 신규 유저 코호트 선정 hack-detector의 고급 기법 적용 """ print(f"\n[1단계] 신규 유저 코호트 선정 (Composite Aggregation)") print(f" - 분석 기간: {start_time} ~ {end_time}") cohort = {} after_key = None # Composite aggregation 쿼리 base_query = { "size": 0, "query": { "bool": { "filter": [ {"range": {"@timestamp": {"gte": start_time, "lt": end_time}}} ] } }, "aggs": { "new_users": { "composite": { "size": page_size, "sources": [ {"auth_id": {"terms": {"field": "auth.id.keyword"}}}, {"uid": {"terms": {"field": "uid.keyword"}}} ] }, "aggs": { "first_login": {"min": {"field": "@timestamp"}}, "user_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "asc"}}], "_source": ["country", "body.device_mod"] } } } } } } total_users = 0 while True: query = base_query.copy() if after_key: query["aggs"]["new_users"]["composite"]["after"] = after_key try: response = exponential_backoff_retry( client.search, index="ds-logs-live-login_comp", body=query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False # 성능 최적화 ) buckets = response["aggregations"]["new_users"]["buckets"] if not buckets: break for bucket in buckets: auth_id = bucket["key"]["auth_id"] uid = bucket["key"]["uid"] first_login_utc = bucket["first_login"]["value_as_string"] # 사용자 정보 추출 user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] cohort[uid] = { 'auth_id': auth_id, 'first_login_utc': first_login_utc, 'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')), 'country': user_hit.get('country', 'N/A'), 'device': user_hit.get('body', {}).get('device_mod', 'N/A') } total_users += 1 # 다음 페이지 키 확인 after_key = response["aggregations"]["new_users"].get("after_key") if not after_key: break except Exception as e: print(f"[ERROR] 코호트 선정 중 오류: {e}") break print(f" - [SUCCESS] 총 {total_users}명의 신규 유저 코호트 확정") return cohort # ============================================================================== # 4. Active Hours 계산 - 스트리밍 방식 (Generator Pattern) # ============================================================================== def calculate_active_hours_streaming( client: OpenSearch, uids: List[str], cohort_data: Dict[str, Dict] ) -> Dict[str, int]: """ 스트리밍 방식으로 활동 시간 계산 Generator 패턴으로 메모리 사용량 최소화 """ def stream_user_events(uid: str) -> Generator[Dict, None, None]: """개별 유저의 이벤트를 스트리밍으로 처리""" user_info = cohort_data.get(uid) if not user_info: return first_login_dt = user_info['first_login_dt'] d0_end_dt = first_login_dt + timedelta(hours=24) # 해당 유저의 D+0 이벤트만 스캔 query = { "query": { "bool": { "filter": [ {"term": {"uid.keyword": uid}}, {"range": {"@timestamp": { "gte": user_info['first_login_utc'], "lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ') }}} ] } } } try: from opensearchpy.helpers import scan for doc in scan( client, query=query, index="ds-logs-live-*", scroll=SCROLL_TIMEOUT, _source=["@timestamp", "type"] ): source = doc['_source'] event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00')) if first_login_dt <= event_dt < d0_end_dt: yield { "time": event_dt, "type": source.get('type', '').lower() } except Exception: # 오류 시 빈 generator 반환 pass results = {} for uid in uids: events = list(stream_user_events(uid)) if len(events) < 2: results[uid] = 0 continue # 세션 기반 활동 시간 계산 events.sort(key=lambda x: x['time']) total_active_seconds = 0 i = 0 while i < len(events) - 1: current_event = events[i] next_event = events[i + 1] # 세션 간격 체크 (5분 이상 차이나면 다른 세션) time_diff = next_event['time'] - current_event['time'] if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES): # 최대 세션 길이 제한 session_duration = min( time_diff.total_seconds(), MAX_SESSION_HOURS * 3600 ) total_active_seconds += session_duration i += 1 results[uid] = int(total_active_seconds) return results # ============================================================================== # 5. NDJSON + 백오프 재시도를 활용한 msearch 배치 처리 # ============================================================================== def build_msearch_queries(uids: List[str], cohort: Dict[str, Dict]) -> List[str]: """ msearch용 NDJSON 쿼리 생성 hack-detector의 NDJSON 직접 생성 기법 적용 """ queries = [] # 분석할 지표 정의 (기획서 기반) metrics_config = { "retention_d1": { "index": "ds-logs-live-login_comp", "time_range": "d1", # 24-48시간 "filters": [] }, "tutorial_entry": { "index": "ds-logs-live-tutorial_entry", "time_range": "d0", "filters": [{"nested": {"path": "body", "query": {"term": {"body.action.keyword": "Start"}}}}] }, "tutorial_completed": { "index": "ds-logs-live-log_tutorial", "time_range": "d0", "filters": [ {"nested": {"path": "body", "query": {"bool": {"must": [ {"term": {"body.action_type.keyword": "Complet"}}, {"term": {"body.stage_type.keyword": "result"}} ]}}}} ] }, "dungeon_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "filters": [] }, "dungeon_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "filters": [{"nested": {"path": "body", "query": {"term": {"body.result": 1}}}}] }, "monster_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_field": "body.play_stats.monster_kill_cnt" }, "player_kill_count": { "index": "ds-logs-live-player_kill", "time_range": "d0", "target_field": "body.instigator_uid" }, "matching_complete_count": { "index": "ds-logs-live-matching_complete", "time_range": "d0", "filters": [] }, "friend_add_count": { "index": "ds-logs-live-friend", "time_range": "d0", "filters": [{"nested": {"path": "body", "query": {"bool": {"must": [ {"term": {"body.oper_type": 0}}, {"term": {"body.friend_type": 0}} ]}}}}] } } for uid in uids: user_data = cohort[uid] first_login_dt = user_data['first_login_dt'] # 시간 범위 정의 d0_start = user_data['first_login_utc'] d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') d1_start = d0_end d1_end = (first_login_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ') for metric_name, config in metrics_config.items(): # 시간 범위 선택 if config["time_range"] == "d0": time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}} else: # d1 time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}} # 사용자 식별 필터 user_filter = {"bool": {"should": [ {"term": {"uid.keyword": uid}}, {"term": {"auth.id.keyword": user_data['auth_id']}} ], "minimum_should_match": 1}} # 쿼리 구성 query_filters = [user_filter, time_filter] # 추가 필터 적용 if "filters" in config: query_filters.extend(config["filters"]) # 특별한 필드 처리 (player_kill의 경우) if "target_field" in config and config["target_field"] == "body.instigator_uid": query_filters.append({"nested": {"path": "body", "query": {"term": {"body.instigator_uid.keyword": uid}}}}) query_body = { "size": 0 if "agg_field" not in config else 1000, "query": {"bool": {"filter": query_filters}}, "track_total_hits": False } # Aggregation이 필요한 경우 if "agg_field" in config: query_body["aggs"] = { "total": {"sum": {"field": config["agg_field"]}} } # NDJSON 형태로 추가 queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False)) queries.append(json.dumps(query_body, ensure_ascii=False)) return queries def execute_msearch_with_backoff(client: OpenSearch, queries: List[str]) -> List[Dict]: """ NDJSON + 지수 백오프 재시도로 msearch 실행 hack-detector의 안정화 기법 적용 """ # NDJSON 문자열 생성 body_ndjson = "\n".join(queries) + "\n" # 지수 백오프로 재시도 response = exponential_backoff_retry( client.msearch, body=body_ndjson, request_timeout=60 ) return response.get('responses', []) # ============================================================================== # 6. Future Pattern을 활용한 병렬 처리 최적화 # ============================================================================== def process_user_batch_optimized( client: OpenSearch, batch_uids: List[str], cohort: Dict[str, Dict] ) -> List[Dict]: """ 최적화된 배치 처리 함수 Future Pattern + 스트리밍 + NDJSON 기법 결합 """ # 1. 활동 시간 계산 (스트리밍 방식) active_hours_map = calculate_active_hours_streaming(client, batch_uids, cohort) # 2. msearch 쿼리 생성 및 실행 (NDJSON + 백오프) msearch_queries = build_msearch_queries(batch_uids, cohort) msearch_responses = execute_msearch_with_backoff(client, msearch_queries) # 3. 결과 집계 batch_results = [] metrics_per_user = 9 # 정의된 지표 수 for idx, uid in enumerate(batch_uids): try: user_data = cohort[uid] user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] # 기본 정보 result = { 'uid': uid, 'auth_id': user_data['auth_id'], 'nickname': 'N/A', # TODO: nickname 조회 로직 추가 'first_login_time': user_data['first_login_utc'], 'retention_status': 'Retained_d0', # 기본값 'country': user_data['country'], 'device': user_data['device'], 'active_seconds': active_hours_map.get(uid, 0) } # msearch 결과 파싱 metrics = [ 'retention_d1', 'tutorial_entry', 'tutorial_completed', 'dungeon_entry_count', 'dungeon_escape_count', 'monster_kill_count', 'player_kill_count', 'matching_complete_count', 'friend_add_count' ] for i, metric in enumerate(metrics): response = user_responses[i] if 'error' in response: result[metric] = 0 continue hits_total = response.get('hits', {}).get('total', {}).get('value', 0) if metric == 'retention_d1': result['retention_status'] = 'Retained_d1' if hits_total > 0 else 'Retained_d0' result[metric] = 1 if hits_total > 0 else 0 elif metric == 'monster_kill_count': agg_value = response.get('aggregations', {}).get('total', {}).get('value', 0) result[metric] = int(agg_value) if agg_value else 0 else: result[metric] = hits_total batch_results.append(result) except Exception as e: print(f" - ⚠️ UID '{uid}' 처리 중 오류: {e}") return batch_results def process_cohort_parallel( client: OpenSearch, cohort: Dict[str, Dict], batch_size: int, max_workers: int ) -> List[Dict]: """ Future Pattern을 활용한 병렬 처리 동적 청크 크기 조정 + 실패 재처리 """ # 동적 배치 크기 조정 (사용자 수 기반) user_count = len(cohort) if user_count < 1000: adjusted_batch_size = min(batch_size, 100) else: adjusted_batch_size = batch_size print(f"\n[2단계] 병렬 배치 처리 시작 (배치크기: {adjusted_batch_size}, 워커: {max_workers})") # UID 리스트를 청크로 분할 uid_list = list(cohort.keys()) chunks = [uid_list[i:i + adjusted_batch_size] for i in range(0, len(uid_list), adjusted_batch_size)] all_results = [] failed_chunks = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Future 객체 생성 future_to_chunk = { executor.submit(process_user_batch_optimized, client, chunk, cohort): chunk for chunk in chunks } # 진행률 표시 with tqdm(total=len(chunks), desc=" - 배치 처리 진행률") as pbar: for future in as_completed(future_to_chunk): chunk = future_to_chunk[future] try: batch_results = future.result(timeout=300) # 5분 타임아웃 if batch_results: all_results.extend(batch_results) else: failed_chunks.append(chunk) except Exception as e: print(f" - ⚠️ 배치 처리 실패: {e}") failed_chunks.append(chunk) finally: pbar.update(1) # 실패한 청크 재처리 (단일 스레드) if failed_chunks: print(f"\n - 실패한 {len(failed_chunks)}개 배치 재처리 중...") for chunk in failed_chunks: try: batch_results = process_user_batch_optimized(client, chunk, cohort) all_results.extend(batch_results) except Exception as e: print(f" - ❌ 재처리 실패: {e}") return all_results # ============================================================================== # 7. 스트리밍 CSV 작성 (메모리 최적화) # ============================================================================== def write_results_streaming(results: List[Dict], output_path: str) -> None: """ 스트리밍 방식으로 CSV 작성 메모리에 모든 데이터를 올리지 않고 직접 파일에 쓰기 """ if not results: print(" - [ERROR] 저장할 결과 데이터가 없습니다.") return # CSV 헤더 정의 (기획서 기반) headers = [ 'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'country', 'device', 'active_seconds', 'retention_d1', 'tutorial_entry', 'tutorial_completed', 'dungeon_entry_count', 'dungeon_escape_count', 'monster_kill_count', 'player_kill_count', 'matching_complete_count', 'friend_add_count' ] try: with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore') writer.writeheader() # 스트리밍으로 한 줄씩 작성 for result in results: writer.writerow(result) print(f" - [SUCCESS] 결과 파일 저장 완료: {output_path}") print(f" - [INFO] 총 {len(results)}명의 데이터가 저장되었습니다.") except Exception as e: print(f" - [ERROR] CSV 파일 저장 실패: {e}") # ============================================================================== # 8. 실시간 타이머 (유틸리티) # ============================================================================== def live_timer(start_time: float, pbar: tqdm) -> None: """실시간 경과 시간 표시""" while not stop_timer_event.is_set(): elapsed = str(timedelta(seconds=int(time.time() - start_time))) pbar.set_postfix_str(f"경과 시간: {elapsed}") time.sleep(1) # ============================================================================== # 9. 메인 함수 및 명령줄 인터페이스 # ============================================================================== def parse_arguments() -> argparse.Namespace: """명령줄 인자 파싱""" parser = argparse.ArgumentParser( description="던전 스토커즈 신규 유저 리텐션 분석 스크립트", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 예시: python ds_new_user_analy.py --start-time "2025-08-22T12:00:00+09:00" --end-time "2025-08-25T12:00:00+09:00" python ds_new_user_analy.py --start-time "2025-08-22T12:00:00+09:00" --end-time "2025-08-22T13:00:00+09:00" --sample-size 100 """ ) parser.add_argument( '--start-time', required=True, help='분석 시작 시간 (KST, ISO 형식): "2025-08-22T12:00:00+09:00"' ) parser.add_argument( '--end-time', required=True, help='분석 종료 시간 (KST, ISO 형식): "2025-08-25T12:00:00+09:00"' ) parser.add_argument( '--output-dir', default=OUTPUT_DIR, help=f'결과 파일 저장 경로 (기본값: {OUTPUT_DIR})' ) parser.add_argument( '--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help=f'배치 처리 크기 (기본값: {DEFAULT_BATCH_SIZE})' ) parser.add_argument( '--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help=f'병렬 처리 스레드 수 (기본값: {DEFAULT_MAX_WORKERS})' ) parser.add_argument( '--sample-size', type=int, help='샘플 분석 크기 (None이면 전체 분석)' ) return parser.parse_args() def main(): """메인 실행 함수""" # 시작 시간 기록 overall_start_time = time.time() print("=" * 80) print("던전 스토커즈 신규 유저 리텐션 분석 v2.0 (Claude Code)") print("hack-detector 고급 최적화 기법 적용") print("=" * 80) # 명령줄 인자 파싱 args = parse_arguments() # 시간 변환 (KST -> UTC) try: start_kst = datetime.fromisoformat(args.start_time) end_kst = datetime.fromisoformat(args.end_time) start_utc = start_kst.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') end_utc = end_kst.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') print(f"[INFO] 분석 기간: {args.start_time} ~ {args.end_time}") print(f"[INFO] UTC 변환: {start_utc} ~ {end_utc}") except Exception as e: print(f"[ERROR] 시간 형식 오류: {e}") return # OpenSearch 클라이언트 생성 client = create_opensearch_client() if not client: return try: # 1단계: 신규 유저 코호트 선정 (Composite Aggregation) cohort = get_new_user_cohort_optimized(client, start_utc, end_utc) if not cohort: print("\n[ERROR] 분석할 신규 유저가 없습니다.") return # 샘플링 모드 if args.sample_size and args.sample_size < len(cohort): uid_list = list(cohort.keys()) sampled_uids = uid_list[:args.sample_size] cohort = {uid: cohort[uid] for uid in sampled_uids} print(f"[WARNING] 샘플링 모드: {args.sample_size}명만 분석합니다.") # 2단계: 병렬 배치 처리 (Future Pattern) results = process_cohort_parallel( client, cohort, args.batch_size, args.max_workers ) # 3단계: 결과 저장 (스트리밍 CSV) print(f"\n[3단계] 결과 저장") timestamp = datetime.now(KST).strftime('%Y%m%d_%H%M%S') filename = f"ds_new_users_analy_{timestamp}.csv" output_path = os.path.join(args.output_dir, filename) write_results_streaming(results, output_path) # 통계 요약 if results: retained_d1 = sum(1 for r in results if r.get('retention_status') == 'Retained_d1') retention_rate = (retained_d1 / len(results)) * 100 print(f"\n[SUMMARY] 분석 요약:") print(f" - 총 신규 유저: {len(results)}명") print(f" - D+1 리텐션: {retained_d1}명 ({retention_rate:.1f}%)") print(f" - 평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / len(results) / 60:.1f}분") except KeyboardInterrupt: print(f"\n[WARNING] 사용자에 의해 중단되었습니다.") except Exception as e: print(f"\n[ERROR] 예상치 못한 오류: {e}") import traceback traceback.print_exc() finally: # 타이머 정지 stop_timer_event.set() # 총 소요 시간 end_time = time.time() total_time = str(timedelta(seconds=int(end_time - overall_start_time))) print(f"\n[INFO] 총 소요 시간: {total_time}") print("\n[SUCCESS] 분석 완료!") if __name__ == "__main__": main()