#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전) OpenSearch 매핑 오류 수정: nested → object 쿼리 변경 수정사항: - body 필드를 nested가 아닌 object로 처리 - retention_d1 중복 필드 제거 - 필드 경로 정확성 개선 - 키워드 필드 사용 (.keyword 추가) 작성자: Claude Code 기획서 기반: DS-new_users-analy.md 참고: OpenSearch 매핑 ds_opensearch_mappings.json """ import os import sys import csv import json import time import yaml import logging import argparse import threading from datetime import datetime, timedelta, timezone from collections import defaultdict, Counter from typing import Dict, List, Optional, Tuple, Generator, Any, Set from concurrent.futures import ThreadPoolExecutor, as_completed, Future from pathlib import Path import pandas as pd from opensearchpy import OpenSearch from opensearchpy.helpers import scan # ============================================================================== # 1. 설정 및 상수 # ============================================================================== # OpenSearch 연결 설정 OPENSEARCH_CONFIG = { "host": "ds-opensearch.oneunivrs.com", "port": 9200, "auth": { "username": "admin", "password": "DHp5#r#GYQ9d" }, "use_ssl": True, "verify_certs": False, "timeout": 120, "max_retries": 3, "headers": {"Connection": "close"} } # 한국 표준시 설정 KST = timezone(timedelta(hours=9)) # 성능 최적화 설정 (오픈서치 스펙 기반 최적화) DEFAULT_BATCH_SIZE = 1000 DEFAULT_MAX_WORKERS = 16 DEFAULT_COMPOSITE_SIZE = 2000 DEFAULT_TIMEOUT = 180 SCROLL_TIMEOUT = "5m" SESSION_GAP_MINUTES = 5 MAX_SESSION_HOURS = 3 # 출력 디렉토리 설정 BASE_DIR = Path(__file__).parent OUTPUT_DIR = BASE_DIR / "analysis_results" OUTPUT_DIR.mkdir(exist_ok=True) # 전역 변수 stop_timer_event = threading.Event() console_lock = threading.Lock() # 콘솔 출력용 스레드 락 logger = None # ============================================================================== # 2. 로깅 시스템 설정 # ============================================================================== def setup_logging(log_file_path: str) -> logging.Logger: """한국 시간 기준 로깅 설정 (콘솔 간결 모드)""" class KSTFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): kst = timezone(timedelta(hours=9)) ct = datetime.fromtimestamp(record.created, kst) return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00') class ConsoleFormatter(logging.Formatter): """Console-optimized formatter - 간결한 콘솔 출력""" def format(self, record): # WARNING 이상은 상세 표시 if record.levelno >= logging.WARNING: return f"[{record.levelname}] {record.getMessage()}" # 일반 INFO 로그는 중요한 것만 표시 msg = record.getMessage() important_keywords = [ "던전 스토커즈 신규 유저 분석", "분석 기간", "세션 지표 포함", "총 신규 유저", "분석 완료", "분석 요약", "리텐션 퍼널", "D0→D1", "D1→D2", "D2→D3", "D3→D4", "D4→D5", "D5→D6", "D6→D7", "총 소요 시간", "Retained_d", "평균 활동 시간", "="*20 # 구분선 ] if any(keyword in msg for keyword in important_keywords): return msg # 중요하지 않은 로그는 빈 문자열 반환 (출력 안함) return "" # 기본 로거 설정 logger = logging.getLogger('NewUserAnalyzer') logger.setLevel(logging.INFO) # 기존 핸들러 제거 for handler in logger.handlers[:]: logger.removeHandler(handler) # 파일 핸들러 (상세 로그) file_handler = logging.FileHandler(log_file_path, encoding='utf-8') file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) # 콘솔 핸들러 (간결 로그) console_handler = logging.StreamHandler() console_formatter = ConsoleFormatter() console_handler.setFormatter(console_formatter) # 빈 문자열은 출력하지 않도록 필터링 console_handler.addFilter(lambda record: console_formatter.format(record) != "") logger.addHandler(console_handler) # 외부 라이브러리 로그 억제 logging.getLogger('opensearchpy').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) return logger def print_progress(message: str, end_with_newline: bool = False): """진행률을 제자리에서 갱신하여 출력 (세부 진행률용)""" with console_lock: try: if end_with_newline: print(f"\r{message}") sys.stdout.flush() else: print(f"\r{message}", end="", flush=True) except (OSError, IOError) as e: # 파이프 오류 시 로깅으로 대체 if logger: logger.info(f"PROGRESS: {message}") pass def print_stage(message: str): """주요 단계 시작을 새 줄로 표시""" with console_lock: try: print(f"\n{message}") sys.stdout.flush() except (OSError, IOError) as e: # 파이프 오류 시 로깅으로 대체 if logger: logger.info(f"STAGE: {message}") pass def print_complete(message: str): """단계 완료를 새 줄로 표시""" with console_lock: try: print(f"\r✓ {message}") sys.stdout.flush() except (OSError, IOError) as e: # 파이프 오류 시 로깅으로 대체 if logger: logger.info(f"COMPLETE: {message}") pass class ProcessingTimer: """긴 작업 중 타임아웃 메시지 표시용 타이머""" def __init__(self, timeout_seconds: int = 10): self.timeout_seconds = timeout_seconds self.timer = None self.is_active = False self.last_message = "" def start(self, message: str): """타이머 시작""" self.stop() # 기존 타이머 정리 self.last_message = message self.is_active = True self._schedule_next() def stop(self): """타이머 중지""" self.is_active = False if self.timer: self.timer.cancel() self.timer = None def _schedule_next(self): """다음 타임아웃 메시지 스케줄링""" if self.is_active: def show_timeout(): if self.is_active: print(f"\r{self.last_message} (처리 중...)", end="", flush=True) self._schedule_next() self.timer = threading.Timer(self.timeout_seconds, show_timeout) self.timer.start() # 전역 타이머 인스턴스 processing_timer = ProcessingTimer() # ============================================================================== # 3. OpenSearch 연결 및 유틸리티 # ============================================================================== def create_opensearch_client() -> Optional[OpenSearch]: """OpenSearch 클라이언트 생성""" logger.info("OpenSearch 클러스터 연결 시도 중...") try: client = OpenSearch( hosts=[{ "host": OPENSEARCH_CONFIG['host'], "port": OPENSEARCH_CONFIG['port'], "scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http" }], http_auth=( OPENSEARCH_CONFIG['auth']['username'], OPENSEARCH_CONFIG['auth']['password'] ), use_ssl=OPENSEARCH_CONFIG['use_ssl'], verify_certs=OPENSEARCH_CONFIG['verify_certs'], ssl_show_warn=False, timeout=OPENSEARCH_CONFIG['timeout'], max_retries=OPENSEARCH_CONFIG['max_retries'], retry_on_timeout=True, headers=OPENSEARCH_CONFIG['headers'] ) if not client.ping(): raise ConnectionError("클러스터 PING 실패") logger.info("OpenSearch 연결 성공!") return client except Exception as e: logger.error(f"OpenSearch 연결 실패: {e}") return None def exponential_backoff_retry(func, *args, **kwargs) -> Any: """지수 백오프 재시도 패턴""" for delay in [1, 2, 4, 8, 16]: try: return func(*args, **kwargs) except Exception as e: if delay == 16: raise e logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})") time.sleep(delay) def format_kst_time(timestamp_str: str) -> str: """UTC 타임스탬프를 KST로 변환""" try: dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) kst_dt = dt.astimezone(KST) return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00') except: return timestamp_str # ============================================================================== # 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반) # ============================================================================== def get_fixed_metrics_config() -> Dict[str, Dict]: """OpenSearch 매핑에 기반한 수정된 분석 지표""" return { # ==================== 3.2 플레이 시간 및 세션 ==================== "session_count": { "index": "ds-logs-live-login_comp", "time_range": "d0", "agg_type": "count" }, "last_logout_time": { "index": "ds-logs-live-logout", "time_range": "d0", "agg_type": "max_timestamp" }, # ==================== 3.3 던전 플레이 성과 (모드별) ==================== # 각 모드별 entry count "COOP_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, "dungeon_first_mode": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "first_value", "field": "body.dungeon_mode" }, "dungeon_first_stalker": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "first_value", "field": "body.stalker_name" }, "dungeon_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result" }, # 각 모드별 첫 플레이 결과 "COOP_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # 각 모드별 escape count "COOP_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # 각 모드별 avg survival time "COOP_avg_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "avg", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_avg_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "avg", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_avg_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "avg", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_avg_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "avg", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_avg_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "avg", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # 각 모드별 max survival time "COOP_max_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "max", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_max_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "max", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_max_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "max", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_max_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "max", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_max_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "max", "field": "body.play_stats.playtime", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # 각 모드별 raid play count "COOP_raid_play_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.raid_play", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_raid_play_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.raid_play", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_raid_play_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.raid_play", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_raid_play_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.raid_play", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_raid_play_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.raid_play", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # ==================== 3.4 전투 성과 (모드별) ==================== # 각 모드별 monster kill count "COOP_monster_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.monster_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_monster_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.monster_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_monster_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.monster_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_monster_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.monster_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_monster_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.monster_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # 각 모드별 player kill count "COOP_player_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.player_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}] }, "Solo_player_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.player_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}] }, "Survival_player_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.player_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}] }, "Survival_BOT_player_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.player_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}] }, "Survival_Unprotected_player_kill_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.player_kill_cnt", "filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}] }, # ==================== 3.4.2 사망 원인 분석 ==================== "death_PK": { "index": "ds-logs-live-player_kill", "time_range": "d0", "agg_type": "count", "target_field": "body.target_uid.keyword", "use_body_target": True }, "death_GiveUp": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.inter_type": 0}}] }, "death_Mob": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.inter_type": 1}}] }, "death_Trap": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.inter_type": 10}}] }, "death_Red": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.inter_type": 11}}] }, "death_Others": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"bool": {"must_not": [{"terms": {"body.inter_type": [0, 1, 10, 11]}}]}}] }, # ==================== 3.5 진행도 및 성장 ==================== "level_max": { "index": "ds-logs-live-level_up", "time_range": "d0", "agg_type": "max_level_with_stalker", "level_field": "body.level", "stalker_field": "body.stalker" }, "level_max_stalker": { "index": "ds-logs-live-level_up", "time_range": "d0", "agg_type": "max_level_stalker_name", "level_field": "body.level", "stalker_field": "body.stalker" }, "tutorial_entry": { "index": "ds-logs-live-tutorial_entry", "time_range": "d0", "agg_type": "count" }, "tutorial_completed": { "index": "ds-logs-live-log_tutorial", "time_range": "d0", "agg_type": "count", "filters": [ {"term": {"body.action_type.keyword": "Complete"}}, {"term": {"body.stage_type.keyword": "tutorial_escape_portal"}} ] }, "guide_quest_stage": { "index": "ds-logs-live-guide_quest_stage", "time_range": "d0", "agg_type": "max", "field": "body.guide_step" }, # ==================== 3.6 아이템 및 경제 ==================== "highest_item_grade": { "index": "ds-logs-live-item_get", "time_range": "d0", "agg_type": "max", "field": "body.item_grade", "filters": [{"term": {"body.base_type": 2}}] }, "blueprint_use_count": { "index": "ds-logs-live-craft_from_blueprint", "time_range": "d0", "agg_type": "count" }, "shop_buy_count": { "index": "ds-logs-live-shop_buy", "time_range": "d0", "agg_type": "count" }, "shop_sell_count": { "index": "ds-logs-live-shop_sell", "time_range": "d0", "agg_type": "count" }, "gold_spent": { "index": "ds-logs-live-shop_buy", "time_range": "d0", "agg_type": "conditional_sum", "field": "body.amt", "condition_field": "body.cost_id.keyword", "condition_value": "i108000" }, "gold_earned": { "index": "ds-logs-live-shop_sell", "time_range": "d0", "agg_type": "conditional_sum", "field": "body.amt", "condition_field": "body.cost_id.keyword", "condition_value": "i108000" }, "storage_in_count": { "index": "ds-logs-live-storage_use", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.oper_type": 1}}] }, "storage_out_count": { "index": "ds-logs-live-storage_use", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.oper_type": -1}}] }, "enchant_count": { "index": "ds-logs-live-enchant", "time_range": "d0", "agg_type": "count" }, "enchant_gold_spent": { "index": "ds-logs-live-enchant", "time_range": "d0", "agg_type": "sum", "field": "body.amt" }, # ==================== 3.7 장비 관리 ==================== "ingame_equip_count": { "index": "ds-logs-live-item_ingame_equip", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.base_type": 2}}] }, # ==================== 3.8 오브젝트 상호작용 ==================== "object_interaction_count": { "index": "ds-logs-live-obj_inter", "time_range": "d0", "agg_type": "count" }, # ==================== 3.9 매칭 시스템 ==================== "matching_start_count": { "index": "ds-logs-live-matching_start", "time_range": "d0", "agg_type": "count" }, "matching_complete_count": { "index": "ds-logs-live-matching_complete", "time_range": "d0", "agg_type": "count" }, "matching_failed_count": { "index": "ds-logs-live-matching_failed", "time_range": "d0", "agg_type": "count" }, "avg_matching_time": { "index": "ds-logs-live-matching_complete", "time_range": "d0", "agg_type": "avg", "field": "body.matchingtime" }, # ==================== 3.10 소셜 활동 ==================== "friend_add_count": { "index": "ds-logs-live-friend", "time_range": "d0", "agg_type": "count", "filters": [ {"term": {"body.oper_type": 0}}, {"term": {"body.friend_type": 0}} ] }, "friend_delete_count": { "index": "ds-logs-live-friend", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.oper_type": 1}}] }, "party_invite_sent": { "index": "ds-logs-live-player_invite", "time_range": "d0", "agg_type": "count" }, "party_invite_received": { "index": "ds-logs-live-player_invite", "time_range": "d0", "agg_type": "count", "target_field": "body.target_uid" }, "mail_read_count": { "index": "ds-logs-live-mail_read", "time_range": "d0", "agg_type": "count" }, # ==================== 3.11 거래소 및 경제 활동 ==================== "exchange_register_count": { "index": "ds-logs-live-exchange_reg", "time_range": "d0", "agg_type": "count" }, "exchange_use_count": { "index": "ds-logs-live-exchange_use", "time_range": "d0", "agg_type": "count" }, "coupon_used": { "index": "ds-logs-live-coupon", "time_range": "d0", "agg_type": "exists" }, # ==================== 3.12 기타 활동 ==================== "button_click_count": { "index": "ds-logs-live-button_click", "time_range": "d0", "agg_type": "count" }, "hideout_upgrade_count": { "index": "ds-logs-live-log_hideout_upgrade", "time_range": "d0", "agg_type": "count" }, "hideout_max_level": { "index": "ds-logs-live-log_hideout_upgrade", "time_range": "d0", "agg_type": "max", "field": "body.hideout_level" }, "season_pass_buy": { "index": "ds-logs-live-season_pass", "time_range": "d0", "agg_type": "exists", "filters": [{"term": {"body.cause": 1}}] }, "season_pass_max_step": { "index": "ds-logs-live-season_pass", "time_range": "d0", "agg_type": "max", "field": "body.season_pass_step" }, # ==================== 리텐션 판정 ==================== "retention_check_d1": { "index": "ds-logs-live-heartbeat", "time_range": "d1", "agg_type": "exists" }, "retention_check_d2": { "index": "ds-logs-live-heartbeat", "time_range": "d2", "agg_type": "exists" }, "retention_check_d3": { "index": "ds-logs-live-heartbeat", "time_range": "d3", "agg_type": "exists" }, "retention_check_d4": { "index": "ds-logs-live-heartbeat", "time_range": "d4", "agg_type": "exists" }, "retention_check_d5": { "index": "ds-logs-live-heartbeat", "time_range": "d5", "agg_type": "exists" }, "retention_check_d6": { "index": "ds-logs-live-heartbeat", "time_range": "d6", "agg_type": "exists" }, "retention_check_d7_plus": { "index": "ds-logs-live-heartbeat", "time_range": "d7_plus", "agg_type": "exists" } } # ============================================================================== # 5. Composite Aggregation 신규 유저 코호트 선정 (동일) # ============================================================================== def get_new_user_cohort_optimized( client: OpenSearch, start_time: str, end_time: str, page_size: int = DEFAULT_COMPOSITE_SIZE ) -> Dict[str, Dict]: """최적화된 신규 유저 코호트 선정 (auth.id 수집 우선순위 적용) ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별 auth.id 수집 우선순위: 1) login_comp, 2) log_return_to_lobby """ logger.info("=" * 80) logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준, 최적화 버전)") logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") logger.info(f"페이지 크기: {page_size}") print_stage("🔍 신규 유저 코호트 선정 시작...") cohort = {} after_key = None # Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출 create_uid_query = { "size": 0, "query": { "bool": { "filter": [ {"range": {"@timestamp": {"gte": start_time, "lt": end_time}}} ] } }, "aggs": { "new_users": { "composite": { "size": page_size, "sources": [ {"uid": {"terms": {"field": "uid.keyword"}}}, {"auth_id": {"terms": {"field": "auth.id.keyword"}}} ] }, "aggs": { "first_create": {"min": {"field": "@timestamp"}} } } } } # 신규 생성된 유저 수집 new_user_map = {} # uid -> {'create_time': ...} page_count = 0 total_pages_estimate = 0 while True: page_count += 1 query = create_uid_query.copy() if after_key: query["aggs"]["new_users"]["composite"]["after"] = after_key try: # 진행률 표시 (제자리 갱신) if page_count == 1: print_progress("코호트 선정: 페이지 1 처리 중...") else: print_progress(f"코호트 선정: 페이지 {page_count} 처리 중... (수집: {len(new_user_map)}명)") logger.info(f"create_uid 페이지 {page_count} 처리 중...") response = exponential_backoff_retry( client.search, index="ds-logs-live-create_uid", body=query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False ) buckets = response["aggregations"]["new_users"]["buckets"] if not buckets: break for bucket in buckets: uid = bucket["key"]["uid"] first_create_utc = bucket["first_create"]["value_as_string"] # 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리) if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]: new_user_map[uid] = { "create_time": first_create_utc } logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨") # 콘솔에는 간결하게 print_progress(f"코호트 선정: 페이지 {page_count} 완료 (수집: {len(new_user_map)}명)") after_key = response["aggregations"]["new_users"].get("after_key") if not after_key: break except Exception as e: logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}") break # 최종 진행률 표시 및 완료 메시지 print_complete(f"신규 유저 코호트 선정 완료: 총 {len(new_user_map)}명 확인") logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") # Step 2: 모든 create_uid 유저를 cohort에 추가 if not new_user_map: logger.warning("신규 유저가 없습니다.") return cohort uid_list = list(new_user_map.keys()) chunk_size = 100 total_users = 0 # 모든 create_uid 유저를 cohort에 먼저 추가 (auth_id는 N/A로 초기화) for uid in uid_list: cohort[uid] = { 'auth_id': 'N/A', 'create_time_utc': new_user_map[uid]["create_time"], 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), 'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')), 'language': 'N/A', 'country': 'N/A', 'device': 'N/A', 'nickname': 'N/A' } total_users += 1 print_complete(f"코호트 초기화 완료: {total_users}명") logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료") # Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위) print_stage("📊 login_comp 인덱스에서 추가 정보 수집 중...") logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...") login_comp_collected = set() processing_timer.start("login_comp 정보 수집") for i in range(0, len(uid_list), chunk_size): chunk_uids = uid_list[i:i+chunk_size] login_query = { "size": 0, "query": { "bool": { "filter": [ {"terms": {"uid.keyword": chunk_uids}} ] } }, "aggs": { "users": { "terms": { "field": "uid.keyword", "size": chunk_size }, "aggs": { "user_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "asc"}}], "_source": ["body.device_mod", "body.nickname", "auth.id", "country"] } }, "latest_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], "_source": ["body.nickname", "body.language", "auth.id", "country"] } } } } } } try: response = exponential_backoff_retry( client.search, index="ds-logs-live-login_comp", body=login_query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False ) for bucket in response["aggregations"]["users"]["buckets"]: uid = bucket["key"] user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {} latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {} # cohort 정보 업데이트 (auth.id 1순위 수집) if uid in cohort: cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A') cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A') cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') # country 수집 (login_comp에서) country = latest_info_hit.get('country') or user_hit.get('country') if country: cohort[uid]['country'] = country # auth.id 수집 (1순위) auth_id = latest_info_hit.get('auth', {}).get('id') or user_hit.get('auth', {}).get('id') if auth_id: cohort[uid]['auth_id'] = auth_id login_comp_collected.add(uid) except Exception as e: logger.error(f"login_comp 정보 수집 중 오류: {e}") processing_timer.stop() print_complete(f"login_comp 정보 수집 완료: {len(login_comp_collected)}/{total_users}명") logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료") # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위) # login_comp에서 수집되지 않은 유저들 처리 missing_uids = [uid for uid in uid_list if uid not in login_comp_collected] if missing_uids: print_stage(f"🏠 lobby 인덱스에서 차선 정보 수집 중 ({len(missing_uids)}명)...") logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...") lobby_collected = set() processing_timer.start("lobby 정보 수집") for i in range(0, len(missing_uids), chunk_size): chunk_uids = missing_uids[i:i+chunk_size] lobby_query = { "size": 0, "query": { "bool": { "filter": [ {"terms": {"uid.keyword": chunk_uids}} ] } }, "aggs": { "users": { "terms": { "field": "uid.keyword", "size": chunk_size }, "aggs": { "info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], "_source": ["body.nickname", "auth.id", "country"] } } } } } } try: response = exponential_backoff_retry( client.search, index="ds-logs-live-log_return_to_lobby", body=lobby_query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False ) for bucket in response["aggregations"]["users"]["buckets"]: uid = bucket["key"] info_hit = bucket["info"]["hits"]["hits"][0]["_source"] if bucket["info"]["hits"]["hits"] else {} # 차선 정보 업데이트 if uid in cohort: # nickname 업데이트 (없는 경우에만) if cohort[uid]['nickname'] == 'N/A': cohort[uid]['nickname'] = info_hit.get('body', {}).get('nickname', 'N/A') # country 수집 (없는 경우에만) if cohort[uid]['country'] == 'N/A': country = info_hit.get('country') if country: cohort[uid]['country'] = country # auth.id 수집 (2순위, 없는 경우에만) if cohort[uid]['auth_id'] == 'N/A': auth_id = info_hit.get('auth', {}).get('id') if auth_id: cohort[uid]['auth_id'] = auth_id lobby_collected.add(uid) except Exception as e: logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") processing_timer.stop() print_complete(f"lobby 추가 정보 수집 완료: {len(lobby_collected)}명") logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료") # 최종 통계 auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A') print_complete(f"코호트 정보 수집 완료: auth.id {auth_id_count}/{total_users}명 수집") logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}명") logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") logger.info("=" * 80) return cohort # ============================================================================== # 6. 수정된 msearch 쿼리 빌더 (nested → object) # ============================================================================== def build_fixed_msearch_queries( uids: List[str], cohort: Dict[str, Dict], metrics_config: Dict[str, Dict] ) -> List[str]: """수정된 msearch 쿼리 생성 (nested 쿼리 제거)""" queries = [] for uid in uids: user_data = cohort[uid] create_time_dt = user_data['create_time_dt'] # 시간 범위 정의 (create_time 기준) d0_start = user_data['create_time_utc'] d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') d1_start = d0_end d1_end = (create_time_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ') d2_start = d1_end d2_end = (create_time_dt + timedelta(hours=72)).strftime('%Y-%m-%dT%H:%M:%SZ') d3_start = d2_end d3_end = (create_time_dt + timedelta(hours=96)).strftime('%Y-%m-%dT%H:%M:%SZ') d4_start = d3_end d4_end = (create_time_dt + timedelta(hours=120)).strftime('%Y-%m-%dT%H:%M:%SZ') d5_start = d4_end d5_end = (create_time_dt + timedelta(hours=144)).strftime('%Y-%m-%dT%H:%M:%SZ') d6_start = d5_end d6_end = (create_time_dt + timedelta(hours=168)).strftime('%Y-%m-%dT%H:%M:%SZ') d7_plus_start = d6_end for metric_name, config in metrics_config.items(): # 시간 범위 선택 if config["time_range"] == "d0": time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}} elif config["time_range"] == "d1": time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}} elif config["time_range"] == "d2": time_filter = {"range": {"@timestamp": {"gte": d2_start, "lt": d2_end}}} elif config["time_range"] == "d3": time_filter = {"range": {"@timestamp": {"gte": d3_start, "lt": d3_end}}} elif config["time_range"] == "d4": time_filter = {"range": {"@timestamp": {"gte": d4_start, "lt": d4_end}}} elif config["time_range"] == "d5": time_filter = {"range": {"@timestamp": {"gte": d5_start, "lt": d5_end}}} elif config["time_range"] == "d6": time_filter = {"range": {"@timestamp": {"gte": d6_start, "lt": d6_end}}} elif config["time_range"] == "d7_plus": time_filter = {"range": {"@timestamp": {"gte": d7_plus_start}}} else: # 기본값 time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}} # 사용자 식별 필터 if "target_field" in config: if config.get("use_body_target", False): # body.target_uid로 검색 (플레이어가 당한 경우) user_filter = {"term": {config["target_field"]: uid}} else: # 일반적인 uid 검색 user_filter = {"term": {config["target_field"]: uid}} else: user_filter = {"term": {"uid.keyword": uid}} # 쿼리 필터 구성 query_filters = [user_filter, time_filter] # 추가 필터 적용 (nested 제거) if "filters" in config: query_filters.extend(config["filters"]) # 조건부 필터 if config.get("agg_type") == "conditional_sum": condition_filter = {"term": {config['condition_field']: config['condition_value']}} query_filters.append(condition_filter) # 쿼리 바디 구성 agg_type = config.get("agg_type", "count") needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"] query_body = { "size": 0 if agg_type not in needs_docs else 1000, "query": {"bool": {"filter": query_filters}} } # Aggregation 설정 if agg_type in ["sum", "avg", "max", "conditional_sum"]: agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type query_body["aggs"] = { "metric_value": {agg_func: {"field": config["field"]}} } elif agg_type == "max_timestamp": query_body["aggs"] = { "metric_value": {"max": {"field": "@timestamp"}} } elif agg_type in ["first_value", "max_stalker"]: # first_value는 항상 @timestamp로 정렬 (earliest first) # max_stalker는 다른 로직이므로 별도 처리 필요 if agg_type == "first_value": query_body["sort"] = [{"@timestamp": {"order": "asc"}}] else: # max_stalker sort_field = config.get("field", "@timestamp") query_body["sort"] = [{sort_field: {"order": "desc"}}] query_body["_source"] = [config.get("field", "@timestamp")] elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]: # 최고 레벨 우선, 같은 레벨이면 최신 순 level_field = config.get("level_field", "body.level") stalker_field = config.get("stalker_field", "body.stalker") query_body["sort"] = [ {level_field: {"order": "desc"}}, {"@timestamp": {"order": "desc"}} ] query_body["_source"] = [level_field, stalker_field] # NDJSON 추가 queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False)) queries.append(json.dumps(query_body, ensure_ascii=False)) return queries # ============================================================================== # 7. 세션 지표 계산 (동일) # ============================================================================== def calculate_comprehensive_session_metrics( client: OpenSearch, uids: List[str], cohort_data: Dict[str, Dict] ) -> Dict[str, Dict]: """포괄적인 세션 지표 계산""" def stream_user_events(uid: str) -> Generator[Dict, None, None]: user_info = cohort_data.get(uid) if not user_info: return create_time_dt = user_info['create_time_dt'] d0_end_dt = create_time_dt + timedelta(hours=24) query = { "query": { "bool": { "filter": [ {"term": {"uid.keyword": uid}}, {"range": {"@timestamp": { "gte": user_info['create_time_utc'], "lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ') }}} ] } } } try: for doc in scan( client, query=query, index="ds-logs-live-*", scroll=SCROLL_TIMEOUT, _source=["@timestamp", "type"] ): source = doc['_source'] event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00')) if create_time_dt <= event_dt < d0_end_dt: yield { "time": event_dt, "type": source.get('type', '').lower() } except Exception: pass results = {} for uid in uids: events = list(stream_user_events(uid)) session_metrics = { 'active_seconds': 0, 'total_playtime_minutes': 0, 'session_count': 0, 'avg_session_length': 0, 'logout_abnormal': 0 } if len(events) < 2: results[uid] = session_metrics continue events.sort(key=lambda x: x['time']) login_events = [e for e in events if e['type'] == 'login_comp'] logout_events = [e for e in events if e['type'] == 'logout'] heartbeat_events = [e for e in events if e['type'] == 'heartbeat'] session_metrics['session_count'] = len(login_events) # 활동 시간 계산 if heartbeat_events: total_active_seconds = 0 for i in range(len(heartbeat_events) - 1): time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time'] if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES): total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600) session_metrics['active_seconds'] = int(total_active_seconds) # 총 플레이 시간 if login_events: total_playtime_seconds = 0 session_lengths = [] for login_event in login_events: login_time = login_event['time'] logout_time = None for logout_event in logout_events: if logout_event['time'] > login_time: logout_time = logout_event['time'] break if not logout_time and events: logout_time = events[-1]['time'] if logout_time: session_duration = (logout_time - login_time).total_seconds() session_duration = min(session_duration, MAX_SESSION_HOURS * 3600) total_playtime_seconds += session_duration session_lengths.append(session_duration) session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60) if session_lengths: session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60) # 비정상 종료 체크 if login_events and logout_events: last_login = max(login_events, key=lambda x: x['time'])['time'] last_logout = max(logout_events, key=lambda x: x['time'])['time'] session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0 elif login_events and not logout_events: session_metrics['logout_abnormal'] = 1 results[uid] = session_metrics return results # ============================================================================== # 8. 수정된 배치 처리 # ============================================================================== def process_fixed_batch( client: OpenSearch, batch_uids: List[str], cohort: Dict[str, Dict], metrics_config: Dict[str, Dict], include_session_metrics: bool = False ) -> List[Dict]: """수정된 배치 처리 함수""" # 진행률 추적용 로거 try: # 1. 세션 지표 계산 (--full 옵션일 때만) if include_session_metrics: session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort) else: # 기본값으로 빈 딕셔너리 생성 session_metrics = {uid: { 'active_seconds': 0, 'total_playtime_minutes': 0, 'session_count': 0, 'avg_session_length': 0, 'logout_abnormal': 0 } for uid in batch_uids} # 2. 수정된 msearch 실행 msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config) body_ndjson = "\n".join(msearch_queries) + "\n" # 콘솔에는 간단히, 파일에는 상세히 print_progress(f"배치 처리 중: {len(batch_uids)}명, {len(msearch_queries)//2}개 쿼리") logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리") msearch_responses = exponential_backoff_retry( client.msearch, body=body_ndjson, request_timeout=300 ).get('responses', []) # 3. 결과 집계 batch_results = [] metrics_per_user = len(metrics_config) for idx, uid in enumerate(batch_uids): try: user_data = cohort[uid] user_session_metrics = session_metrics.get(uid, {}) user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] # 기본 정보 (first_login_time 제거, create_time으로 통합) result = { 'uid': uid, 'auth_id': user_data.get('auth_id', 'N/A'), # auth_id 기본값 처리 'nickname': user_data['nickname'], 'create_time': user_data.get('create_time_kst', 'N/A'), 'retention_status': 'Retained_d0', # 기본값, 나중에 업데이트 'language': user_data['language'], 'country': user_data.get('country', 'N/A'), 'device': user_data['device'], 'active_seconds': user_session_metrics.get('active_seconds', 0), 'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0), 'session_count': user_session_metrics.get('session_count', 0), 'avg_session_length': user_session_metrics.get('avg_session_length', 0), 'logout_abnormal': user_session_metrics.get('logout_abnormal', 0) } # msearch 결과 파싱 metric_names = list(metrics_config.keys()) for i, metric_name in enumerate(metric_names): if i >= len(user_responses): result[metric_name] = 0 continue response = user_responses[i] config = metrics_config[metric_name] if 'error' in response: result[metric_name] = 0 continue agg_type = config.get("agg_type", "count") if agg_type == "count": hits_total = response.get('hits', {}).get('total', {}).get('value', 0) result[metric_name] = hits_total elif agg_type == "exists": hits_total = response.get('hits', {}).get('total', {}).get('value', 0) result[metric_name] = 1 if hits_total > 0 else 0 elif agg_type in ["sum", "avg", "max", "conditional_sum"]: agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value') result[metric_name] = int(agg_value) if agg_value else 0 elif agg_type == "max_timestamp": timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string') result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None elif agg_type == "first_value": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] field_name = config.get("field", "") if field_name.startswith("body."): keys = field_name.split(".") value = source_value for key in keys: if isinstance(value, dict): value = value.get(key) if value is None: # 문자열 필드는 빈 문자열 또는 적절한 기본값 설정 if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]: value = "" else: value = 0 break else: value = 0 break result[metric_name] = value else: result[metric_name] = source_value.get(field_name, 0) else: if metric_name == "dungeon_first_result": result[metric_name] = 2 # 미플레이 elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]: result[metric_name] = "" else: result[metric_name] = 0 elif agg_type == "max_stalker": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] field_name = config.get("field", "") keys = field_name.split(".") value = source_value for key in keys: value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A' result[metric_name] = value else: result[metric_name] = 'N/A' elif agg_type == "max_level_with_stalker": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] level_field = config.get("level_field", "body.level") # body.level 값 추출 level_keys = level_field.split(".") level_value = source_value for key in level_keys: level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0 result[metric_name] = level_value if level_value else 0 else: result[metric_name] = 0 elif agg_type == "max_level_stalker_name": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] stalker_field = config.get("stalker_field", "body.stalker") # body.stalker 값 추출 stalker_keys = stalker_field.split(".") stalker_value = source_value for key in stalker_keys: stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0 result[metric_name] = stalker_value if stalker_value else 0 else: result[metric_name] = 0 else: result[metric_name] = 0 # 모든 메트릭 처리 후 리텐션 상태 판정 # D7+ > D6 > D5 > D4 > D3 > D2 > D1 > D0 순서로 판정 (마지막 접속일 기준) if result.get('retention_check_d7_plus', 0) > 0: result['retention_status'] = 'Retained_d7+' elif result.get('retention_check_d6', 0) > 0: result['retention_status'] = 'Retained_d6' elif result.get('retention_check_d5', 0) > 0: result['retention_status'] = 'Retained_d5' elif result.get('retention_check_d4', 0) > 0: result['retention_status'] = 'Retained_d4' elif result.get('retention_check_d3', 0) > 0: result['retention_status'] = 'Retained_d3' elif result.get('retention_check_d2', 0) > 0: result['retention_status'] = 'Retained_d2' elif result.get('retention_check_d1', 0) > 0: result['retention_status'] = 'Retained_d1' else: # D0에만 접속 (기본값 유지) result['retention_status'] = 'Retained_d0' # 모드별 계산된 지표 modes = ['COOP', 'Solo', 'Survival', 'Survival_BOT', 'Survival_Unprotected'] for mode in modes: # 각 모드별 escape rate 계산 entry_key = f'{mode}_entry_count' escape_key = f'{mode}_escape_count' if result.get(entry_key, 0) > 0: escape_count = result.get(escape_key, 0) result[f'{mode}_escape_rate'] = round((escape_count / result[entry_key]), 4) # 소수점 4자리 # 각 모드별 평균 킬 수 계산 monster_kills = result.get(f'{mode}_monster_kill_count', 0) player_kills = result.get(f'{mode}_player_kill_count', 0) entry_count = result[entry_key] result[f'{mode}_avg_monster_kills'] = round(monster_kills / entry_count, 2) result[f'{mode}_avg_player_kills'] = round(player_kills / entry_count, 2) else: result[f'{mode}_escape_rate'] = 0.0 result[f'{mode}_avg_monster_kills'] = 0.0 result[f'{mode}_avg_player_kills'] = 0.0 batch_results.append(result) except Exception as e: logger.warning(f"UID '{uid}' 처리 중 오류: {e}") print_progress(f"배치 완료: {len(batch_results)}명") logger.info(f"배치 처리 완료: {len(batch_results)}명 성공") return batch_results except Exception as e: logger.error(f"배치 처리 중 심각한 오류: {e}") return [] # ============================================================================== # 9. 병렬 처리 및 결과 저장 (동일) # ============================================================================== def process_cohort_fixed_parallel( client: OpenSearch, cohort: Dict[str, Dict], batch_size: int, max_workers: int, metrics_config: Dict[str, Dict], include_session_metrics: bool = False ) -> List[Dict]: """수정된 병렬 처리""" logger.info("=" * 80) logger.info("2단계: 수정된 병렬 배치 처리") logger.info(f"총 사용자: {len(cohort)}명") logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}") logger.info(f"분석 지표: {len(metrics_config)}개") logger.info(f"세션 지표 포함: {'예' if include_session_metrics else '아니오'}") uid_list = list(cohort.keys()) chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] print_stage(f"⚡ 병렬 배치 처리 시작 ({len(cohort)}명, {len(chunks)}개 배치)...") all_results = [] failed_chunks = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_chunk = { executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config, include_session_metrics): chunk for chunk in chunks } completed_chunks = 0 processing_timer.start("병렬 배치 처리") for future in as_completed(future_to_chunk): chunk = future_to_chunk[future] try: batch_results = future.result(timeout=600) if batch_results: all_results.extend(batch_results) else: failed_chunks.append(chunk) completed_chunks += 1 # 콘솔 진행률 업데이트 print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨") except Exception as e: logger.warning(f"배치 처리 실패: {e}") failed_chunks.append(chunk) completed_chunks += 1 print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨") processing_timer.stop() # 실패한 청크 재처리 if failed_chunks: print_stage(f"🔄 실패한 배치 재처리 중 ({len(failed_chunks)}개)...") logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") processing_timer.start("배치 재처리") for i, chunk in enumerate(failed_chunks): try: batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics) all_results.extend(batch_results) print_progress(f"재처리 중: {i+1}/{len(failed_chunks)} 완료") except Exception as e: logger.error(f"재처리 실패: {e}") processing_timer.stop() print_complete(f"재처리 완료: {len(failed_chunks)}개 배치 처리됨") # 최종 결과 print_complete(f"병렬 처리 완료: 총 {len(all_results)}명 성공") logger.info(f"2단계 완료: {len(all_results)}명 처리 성공") logger.info("=" * 80) return all_results def write_fixed_results(results: List[Dict], output_path: str, include_session_metrics: bool = False) -> None: """수정된 결과 저장 (retention_d1 제거)""" logger.info("=" * 80) logger.info("3단계: 결과 저장") print_stage("💾 분석 결과 CSV 파일 저장 중...") processing_timer.start("CSV 파일 저장") if not results: logger.error("저장할 결과 데이터가 없습니다.") return # 수정된 헤더 (first_login_time 제거, create_time만 사용, country 및 last_active_day 추가) # 기본 헤더 headers = [ 'uid', 'auth_id', 'nickname', 'create_time', 'retention_status', 'language', 'country', 'device', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result', 'death_PK', 'death_GiveUp', 'death_Mob', 'death_Trap', 'death_Red', 'death_Others' ] # 세션 지표 (옵션에 따른 조건부 포함) if include_session_metrics: session_headers = ['active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal'] headers = headers[:4] + session_headers + headers[4:] # retention_status 뒤에 삽입 # 각 모드별 헤더 추가 (삭제된 지표 제외) modes = ['COOP', 'Solo', 'Survival', 'Survival_BOT', 'Survival_Unprotected'] for mode in modes: headers.extend([ f'{mode}_entry_count', f'{mode}_escape_count', f'{mode}_escape_rate', f'{mode}_avg_survival_time', f'{mode}_max_survival_time', f'{mode}_raid_play_count', f'{mode}_first_result', f'{mode}_avg_monster_kills', f'{mode}_avg_player_kills' ]) # 나머지 헤더 (삭제된 지표 제외) headers.extend([ 'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed', 'guide_quest_stage', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count', 'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count', 'enchant_count', 'enchant_gold_spent', 'ingame_equip_count', 'object_interaction_count', 'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time', 'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count', 'exchange_register_count', 'exchange_use_count', 'coupon_used', 'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step', 'last_logout_time' ]) try: with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore') writer.writeheader() for result in results: writer.writerow(result) processing_timer.stop() print_complete(f"CSV 파일 저장 완료: {len(results)}명 데이터, {len(headers)}개 지표") logger.info(f"결과 파일 저장 완료: {output_path}") logger.info(f"총 {len(results)}명의 데이터 저장") logger.info(f"분석 지표: {len(headers)}개") except Exception as e: logger.error(f"CSV 파일 저장 실패: {e}") # ============================================================================== # 10. 메인 함수 # ============================================================================== def parse_arguments() -> argparse.Namespace: """명령줄 인자 파싱""" parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)") parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)') parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)') parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기') parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수') parser.add_argument('--sample-size', type=int, help='샘플 분석 크기') parser.add_argument('--full', action='store_true', help='세션 관련 지표 포함한 전체 분석') return parser.parse_args() def main(): """메인 실행 함수""" global logger overall_start_time = time.time() start_kst = datetime.now(KST) timestamp = start_kst.strftime('%Y%m%d_%H%M%S') log_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.log" csv_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.csv" logger = setup_logging(str(log_file_path)) logger.info("=" * 80) logger.info("던전 스토커즈 신규 유저 분석 v5.0") logger.info("create_uid 기반 신규 유저 판별 + 세션 지표 조건부 수집") logger.info("=" * 80) args = parse_arguments() try: start_kst_arg = datetime.fromisoformat(args.start_time) end_kst_arg = datetime.fromisoformat(args.end_time) start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}") logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}") logger.info(f"세션 지표 포함: {'예 (--full 옵션 사용)' if args.full else '아니오 (빠른 분석 모드)'}") except Exception as e: logger.error(f"시간 형식 오류: {e}") return client = create_opensearch_client() if not client: return try: metrics_config = get_fixed_metrics_config() logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}개") cohort = get_new_user_cohort_optimized(client, start_utc, end_utc) if not cohort: logger.error("분석할 신규 유저가 없습니다.") return if args.sample_size and args.sample_size < len(cohort): uid_list = list(cohort.keys()) sampled_uids = uid_list[:args.sample_size] cohort = {uid: cohort[uid] for uid in sampled_uids} logger.info(f"샘플링 모드: {args.sample_size}명만 분석") results = process_cohort_fixed_parallel( client, cohort, args.batch_size, args.max_workers, metrics_config, args.full ) write_fixed_results(results, str(csv_file_path), args.full) # 요약 if results: total_users = len(results) retention_counts = {} retention_groups = ['Retained_d0', 'Retained_d1', 'Retained_d2', 'Retained_d3', 'Retained_d4', 'Retained_d5', 'Retained_d6', 'Retained_d7+'] # 각 그룹별 카운트 for group in retention_groups: retention_counts[group] = sum(1 for r in results if r.get('retention_status') == group) logger.info("=" * 80) logger.info("분석 요약") logger.info("=" * 80) logger.info(f"총 신규 유저: {total_users:,}명") # 기본 리텐션 분포 for group in retention_groups: count = retention_counts[group] percentage = (count / total_users) * 100 logger.info(f"{group}: {count:,}명 ({percentage:.1f}%)") # 리텐션 퍼널 요약 (단계별 + 누적) logger.info("") logger.info("=" * 50) logger.info("리텐션 퍼널 분석") logger.info("=" * 50) # 누적 리텐션 계산 (D1 이상 접속한 유저들) cumulative_d1_plus = sum(retention_counts[group] for group in retention_groups[1:]) # D1부터 cumulative_d2_plus = sum(retention_counts[group] for group in retention_groups[2:]) # D2부터 cumulative_d3_plus = sum(retention_counts[group] for group in retention_groups[3:]) # D3부터 cumulative_d4_plus = sum(retention_counts[group] for group in retention_groups[4:]) # D4부터 cumulative_d5_plus = sum(retention_counts[group] for group in retention_groups[5:]) # D5부터 cumulative_d6_plus = sum(retention_counts[group] for group in retention_groups[6:]) # D6부터 cumulative_d7_plus = retention_counts['Retained_d7+'] # 퍼널 분석 (단계별 잔존율 + 누적 리텐션율) if cumulative_d1_plus > 0: logger.info(f"D0→D1: {(cumulative_d1_plus/total_users)*100:.1f}% ({cumulative_d1_plus:,}/{total_users:,}명) | 누적: {(cumulative_d1_plus/total_users)*100:.1f}%") if cumulative_d2_plus > 0: d1_to_d2_rate = (cumulative_d2_plus/cumulative_d1_plus)*100 logger.info(f"D1→D2: {d1_to_d2_rate:.1f}% ({cumulative_d2_plus:,}/{cumulative_d1_plus:,}명) | 누적: {(cumulative_d2_plus/total_users)*100:.1f}%") if cumulative_d3_plus > 0: d2_to_d3_rate = (cumulative_d3_plus/cumulative_d2_plus)*100 logger.info(f"D2→D3: {d2_to_d3_rate:.1f}% ({cumulative_d3_plus:,}/{cumulative_d2_plus:,}명) | 누적: {(cumulative_d3_plus/total_users)*100:.1f}%") if cumulative_d4_plus > 0: d3_to_d4_rate = (cumulative_d4_plus/cumulative_d3_plus)*100 logger.info(f"D3→D4: {d3_to_d4_rate:.1f}% ({cumulative_d4_plus:,}/{cumulative_d3_plus:,}명) | 누적: {(cumulative_d4_plus/total_users)*100:.1f}%") if cumulative_d5_plus > 0: d4_to_d5_rate = (cumulative_d5_plus/cumulative_d4_plus)*100 logger.info(f"D4→D5: {d4_to_d5_rate:.1f}% ({cumulative_d5_plus:,}/{cumulative_d4_plus:,}명) | 누적: {(cumulative_d5_plus/total_users)*100:.1f}%") if cumulative_d6_plus > 0: d5_to_d6_rate = (cumulative_d6_plus/cumulative_d5_plus)*100 logger.info(f"D5→D6: {d5_to_d6_rate:.1f}% ({cumulative_d6_plus:,}/{cumulative_d5_plus:,}명) | 누적: {(cumulative_d6_plus/total_users)*100:.1f}%") if cumulative_d7_plus > 0: d6_to_d7_rate = (cumulative_d7_plus/cumulative_d6_plus)*100 logger.info(f"D6→D7+: {d6_to_d7_rate:.1f}% ({cumulative_d7_plus:,}/{cumulative_d6_plus:,}명) | 누적: {(cumulative_d7_plus/total_users)*100:.1f}%") logger.info("") logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}분") except KeyboardInterrupt: logger.warning("사용자에 의해 중단되었습니다.") except Exception as e: logger.error(f"예상치 못한 오류: {e}") import traceback logger.error(traceback.format_exc()) finally: stop_timer_event.set() end_time = time.time() total_time = str(timedelta(seconds=int(end_time - overall_start_time))) end_kst = datetime.now(KST) logger.info("=" * 80) logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}") logger.info(f"총 소요 시간: {total_time}") logger.info("수정된 분석 완료!") logger.info("=" * 80) print_complete(f"🎉 전체 분석 완료! 소요 시간: {total_time}") # 타이머 정리 processing_timer.stop() if __name__ == "__main__": main()