#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전) OpenSearch 매핑 오류 수정: nested → object 쿼리 변경 수정사항: - body 필드를 nested가 아닌 object로 처리 - retention_d1 중복 필드 제거 - 필드 경로 정확성 개선 - 키워드 필드 사용 (.keyword 추가) 작성자: Claude Code 기획서 기반: DS-new_users-analy.md 참고: OpenSearch 매핑 ds_opensearch_mappings.json """ import os import csv import json import time import yaml import logging import argparse import threading from datetime import datetime, timedelta, timezone from collections import defaultdict, Counter from typing import Dict, List, Optional, Tuple, Generator, Any, Set from concurrent.futures import ThreadPoolExecutor, as_completed, Future from pathlib import Path import pandas as pd from tqdm import tqdm from opensearchpy import OpenSearch from opensearchpy.helpers import scan # ============================================================================== # 1. 설정 및 상수 # ============================================================================== # OpenSearch 연결 설정 OPENSEARCH_CONFIG = { "host": "ds-opensearch.oneunivrs.com", "port": 9200, "auth": { "username": "admin", "password": "DHp5#r#GYQ9d" }, "use_ssl": True, "verify_certs": False, "timeout": 120, "max_retries": 3, "headers": {"Connection": "close"} } # 한국 표준시 설정 KST = timezone(timedelta(hours=9)) # 성능 최적화 설정 (오픈서치 스펙 기반 최적화) DEFAULT_BATCH_SIZE = 2000 DEFAULT_MAX_WORKERS = 16 DEFAULT_COMPOSITE_SIZE = 2000 DEFAULT_TIMEOUT = 180 SCROLL_TIMEOUT = "5m" SESSION_GAP_MINUTES = 5 MAX_SESSION_HOURS = 3 # 출력 디렉토리 설정 BASE_DIR = Path(__file__).parent OUTPUT_DIR = BASE_DIR / "analysis_results" OUTPUT_DIR.mkdir(exist_ok=True) # 전역 변수 stop_timer_event = threading.Event() logger = None # ============================================================================== # 2. 로깅 시스템 설정 # ============================================================================== def setup_logging(log_file_path: str) -> logging.Logger: """한국 시간 기준 로깅 설정""" class KSTFormatter(logging.Formatter): def formatTime(self, record, datefmt=None): kst = timezone(timedelta(hours=9)) ct = datetime.fromtimestamp(record.created, kst) return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00') logger = logging.getLogger('NewUserAnalyzer') logger.setLevel(logging.INFO) # 기존 핸들러 제거 for handler in logger.handlers[:]: logger.removeHandler(handler) # 파일 핸들러 file_handler = logging.FileHandler(log_file_path, encoding='utf-8') file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) # 콘솔 핸들러 console_handler = logging.StreamHandler() console_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # 외부 라이브러리 로그 억제 logging.getLogger('opensearchpy').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) return logger # ============================================================================== # 3. OpenSearch 연결 및 유틸리티 # ============================================================================== def create_opensearch_client() -> Optional[OpenSearch]: """OpenSearch 클라이언트 생성""" logger.info("OpenSearch 클러스터 연결 시도 중...") try: client = OpenSearch( hosts=[{ "host": OPENSEARCH_CONFIG['host'], "port": OPENSEARCH_CONFIG['port'], "scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http" }], http_auth=( OPENSEARCH_CONFIG['auth']['username'], OPENSEARCH_CONFIG['auth']['password'] ), use_ssl=OPENSEARCH_CONFIG['use_ssl'], verify_certs=OPENSEARCH_CONFIG['verify_certs'], ssl_show_warn=False, timeout=OPENSEARCH_CONFIG['timeout'], max_retries=OPENSEARCH_CONFIG['max_retries'], retry_on_timeout=True, headers=OPENSEARCH_CONFIG['headers'] ) if not client.ping(): raise ConnectionError("클러스터 PING 실패") logger.info("OpenSearch 연결 성공!") return client except Exception as e: logger.error(f"OpenSearch 연결 실패: {e}") return None def exponential_backoff_retry(func, *args, **kwargs) -> Any: """지수 백오프 재시도 패턴""" for delay in [1, 2, 4, 8, 16]: try: return func(*args, **kwargs) except Exception as e: if delay == 16: raise e logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})") time.sleep(delay) def format_kst_time(timestamp_str: str) -> str: """UTC 타임스탬프를 KST로 변환""" try: dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) kst_dt = dt.astimezone(KST) return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00') except: return timestamp_str # ============================================================================== # 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반) # ============================================================================== def get_fixed_metrics_config() -> Dict[str, Dict]: """OpenSearch 매핑에 기반한 수정된 분석 지표""" return { # ==================== 3.2 플레이 시간 및 세션 ==================== "session_count": { "index": "ds-logs-live-login_comp", "time_range": "d0", "agg_type": "count" }, "last_logout_time": { "index": "ds-logs-live-logout", "time_range": "d0", "agg_type": "max_timestamp" }, # ==================== 3.3 던전 플레이 성과 ==================== "dungeon_entry_count": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "count" }, "dungeon_first_mode": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "first_value", "field": "body.dungeon_mode" }, "dungeon_first_stalker": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "first_value", "field": "body.stalker_name" }, "dungeon_first_result": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "first_value", "field": "body.result" }, "dungeon_escape_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.result": 1}}] }, "avg_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "avg", "field": "body.play_stats.playtime" }, "max_survival_time": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "max", "field": "body.play_stats.playtime" }, "total_armor_break": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.armor_break_cnt" }, "raid_play_count": { "index": "ds-logs-live-survival_end", "time_range": "d0", "agg_type": "sum", "field": "body.play_stats.raid_play" }, "escape_count": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.inter_type": 0}}] }, # ==================== 3.4 전투 성과 ==================== "monster_kill_count": { "index": "ds-logs-live-monster_kill", "time_range": "d0", "agg_type": "count" }, "player_kill_count": { "index": "ds-logs-live-player_kill", "time_range": "d0", "agg_type": "count", "target_field": "uid.keyword" }, "player_killed_count": { "index": "ds-logs-live-player_kill", "time_range": "d0", "agg_type": "count", "target_field": "body.target_uid.keyword", "use_body_target": True }, "death_count": { "index": "ds-logs-live-dead", "time_range": "d0", "agg_type": "count", "filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}] }, # ==================== 3.5 진행도 및 성장 ==================== "level_max": { "index": "ds-logs-live-level_up", "time_range": "d0", "agg_type": "max_level_with_stalker", "level_field": "body.level", "stalker_field": "body.stalker" }, "level_max_stalker": { "index": "ds-logs-live-level_up", "time_range": "d0", "agg_type": "max_level_stalker_name", "level_field": "body.level", "stalker_field": "body.stalker" }, "tutorial_entry": { "index": "ds-logs-live-tutorial_entry", "time_range": "d0", "agg_type": "count" }, "tutorial_completed": { "index": "ds-logs-live-log_tutorial", "time_range": "d0", "agg_type": "count", "filters": [ {"term": {"body.action_type.keyword": "Complete"}}, {"term": {"body.stage_type.keyword": "result"}} ] }, "guide_quest_stage": { "index": "ds-logs-live-guide_quest_stage", "time_range": "d0", "agg_type": "max", "field": "body.guide_step" }, "skill_points_earned": { "index": "ds-logs-live-skill_point_get", "time_range": "d0", "agg_type": "count" }, # ==================== 3.6 아이템 및 경제 ==================== "items_obtained_count": { "index": "ds-logs-live-item_get", "time_range": "d0", "agg_type": "count" }, "highest_item_grade": { "index": "ds-logs-live-item_get", "time_range": "d0", "agg_type": "max", "field": "body.item_grade" }, "blueprint_use_count": { "index": "ds-logs-live-craft_from_blueprint", "time_range": "d0", "agg_type": "count" }, "shop_buy_count": { "index": "ds-logs-live-shop_buy", "time_range": "d0", "agg_type": "count" }, "shop_sell_count": { "index": "ds-logs-live-shop_sell", "time_range": "d0", "agg_type": "count" }, "gold_spent": { "index": "ds-logs-live-shop_buy", "time_range": "d0", "agg_type": "conditional_sum", "field": "body.amt", "condition_field": "body.cost_id.keyword", "condition_value": "i108000" }, "gold_earned": { "index": "ds-logs-live-shop_sell", "time_range": "d0", "agg_type": "conditional_sum", "field": "body.amt", "condition_field": "body.cost_id.keyword", "condition_value": "i108000" }, "storage_in_count": { "index": "ds-logs-live-storage_use", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.oper_type": 1}}] }, "storage_out_count": { "index": "ds-logs-live-storage_use", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.oper_type": -1}}] }, "enchant_count": { "index": "ds-logs-live-enchant", "time_range": "d0", "agg_type": "count" }, "enchant_gold_spent": { "index": "ds-logs-live-enchant", "time_range": "d0", "agg_type": "sum", "field": "body.amt" }, # ==================== 3.7 장비 관리 ==================== "ingame_equip_count": { "index": "ds-logs-live-item_ingame_equip", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.base_type": 2}}] }, # ==================== 3.8 오브젝트 상호작용 ==================== "object_interaction_count": { "index": "ds-logs-live-obj_inter", "time_range": "d0", "agg_type": "count" }, # ==================== 3.9 매칭 시스템 ==================== "matching_start_count": { "index": "ds-logs-live-matching_start", "time_range": "d0", "agg_type": "count" }, "matching_complete_count": { "index": "ds-logs-live-matching_complete", "time_range": "d0", "agg_type": "count" }, "matching_failed_count": { "index": "ds-logs-live-matching_failed", "time_range": "d0", "agg_type": "count" }, "avg_matching_time": { "index": "ds-logs-live-matching_complete", "time_range": "d0", "agg_type": "avg", "field": "body.matchingtime" }, # ==================== 3.10 소셜 활동 ==================== "friend_add_count": { "index": "ds-logs-live-friend", "time_range": "d0", "agg_type": "count", "filters": [ {"term": {"body.oper_type": 0}}, {"term": {"body.friend_type": 0}} ] }, "friend_delete_count": { "index": "ds-logs-live-friend", "time_range": "d0", "agg_type": "count", "filters": [{"term": {"body.oper_type": 1}}] }, "party_invite_sent": { "index": "ds-logs-live-player_invite", "time_range": "d0", "agg_type": "count" }, "party_invite_received": { "index": "ds-logs-live-player_invite", "time_range": "d0", "agg_type": "count", "target_field": "body.target_uid" }, "mail_read_count": { "index": "ds-logs-live-mail_read", "time_range": "d0", "agg_type": "count" }, # ==================== 3.11 거래소 및 경제 활동 ==================== "exchange_register_count": { "index": "ds-logs-live-exchange_reg", "time_range": "d0", "agg_type": "count" }, "exchange_use_count": { "index": "ds-logs-live-exchange_use", "time_range": "d0", "agg_type": "count" }, "coupon_used": { "index": "ds-logs-live-coupon", "time_range": "d0", "agg_type": "exists" }, # ==================== 3.12 기타 활동 ==================== "button_click_count": { "index": "ds-logs-live-button_click", "time_range": "d0", "agg_type": "count" }, "hideout_upgrade_count": { "index": "ds-logs-live-log_hideout_upgrade", "time_range": "d0", "agg_type": "count" }, "hideout_max_level": { "index": "ds-logs-live-log_hideout_upgrade", "time_range": "d0", "agg_type": "max", "field": "body.hideout_level" }, "season_pass_buy": { "index": "ds-logs-live-season_pass", "time_range": "d0", "agg_type": "exists", "filters": [{"term": {"body.cause": 1}}] }, "season_pass_max_step": { "index": "ds-logs-live-season_pass", "time_range": "d0", "agg_type": "max", "field": "body.season_pass_step" }, # ==================== 리텐션 판정 (retention_d1 삭제됨) ==================== "retention_check": { "index": "ds-logs-live-login_comp", "time_range": "d1", "agg_type": "exists" } } # ============================================================================== # 5. Composite Aggregation 신규 유저 코호트 선정 (동일) # ============================================================================== def get_new_user_cohort_optimized( client: OpenSearch, start_time: str, end_time: str, page_size: int = DEFAULT_COMPOSITE_SIZE ) -> Dict[str, Dict]: """최적화된 신규 유저 코호트 선정 (auth.id 수집 우선순위 적용) ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별 auth.id 수집 우선순위: 1) login_comp, 2) log_return_to_lobby """ logger.info("=" * 80) logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준, 최적화 버전)") logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") logger.info(f"페이지 크기: {page_size}") cohort = {} after_key = None # Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출 create_uid_query = { "size": 0, "query": { "bool": { "filter": [ {"range": {"@timestamp": {"gte": start_time, "lt": end_time}}} ] } }, "aggs": { "new_users": { "composite": { "size": page_size, "sources": [ {"uid": {"terms": {"field": "uid.keyword"}}}, {"auth_id": {"terms": {"field": "auth.id.keyword"}}} ] }, "aggs": { "first_create": {"min": {"field": "@timestamp"}} } } } } # 신규 생성된 유저 수집 new_user_map = {} # uid -> {'create_time': ...} page_count = 0 while True: page_count += 1 query = create_uid_query.copy() if after_key: query["aggs"]["new_users"]["composite"]["after"] = after_key try: logger.info(f"create_uid 페이지 {page_count} 처리 중...") response = exponential_backoff_retry( client.search, index="ds-logs-live-create_uid", body=query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False ) buckets = response["aggregations"]["new_users"]["buckets"] if not buckets: break for bucket in buckets: uid = bucket["key"]["uid"] first_create_utc = bucket["first_create"]["value_as_string"] # 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리) if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]: new_user_map[uid] = { "create_time": first_create_utc } logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨") after_key = response["aggregations"]["new_users"].get("after_key") if not after_key: break except Exception as e: logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}") break logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") # Step 2: 모든 create_uid 유저를 cohort에 추가 if not new_user_map: logger.warning("신규 유저가 없습니다.") return cohort uid_list = list(new_user_map.keys()) chunk_size = 100 total_users = 0 # 모든 create_uid 유저를 cohort에 먼저 추가 (auth_id는 N/A로 초기화) for uid in uid_list: cohort[uid] = { 'auth_id': 'N/A', 'create_time_utc': new_user_map[uid]["create_time"], 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), 'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')), 'language': 'N/A', 'device': 'N/A', 'nickname': 'N/A' } total_users += 1 logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료") # Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위) logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...") login_comp_collected = set() for i in range(0, len(uid_list), chunk_size): chunk_uids = uid_list[i:i+chunk_size] login_query = { "size": 0, "query": { "bool": { "filter": [ {"terms": {"uid.keyword": chunk_uids}} ] } }, "aggs": { "users": { "terms": { "field": "uid.keyword", "size": chunk_size }, "aggs": { "user_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "asc"}}], "_source": ["body.device_mod", "body.nickname", "auth.id"] } }, "latest_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], "_source": ["body.nickname", "body.language", "auth.id"] } } } } } } try: response = exponential_backoff_retry( client.search, index="ds-logs-live-login_comp", body=login_query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False ) for bucket in response["aggregations"]["users"]["buckets"]: uid = bucket["key"] user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {} latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {} # cohort 정보 업데이트 (auth.id 1순위 수집) if uid in cohort: cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A') cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A') cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') # auth.id 수집 (1순위) auth_id = latest_info_hit.get('auth', {}).get('id') or user_hit.get('auth', {}).get('id') if auth_id: cohort[uid]['auth_id'] = auth_id login_comp_collected.add(uid) except Exception as e: logger.error(f"login_comp 정보 수집 중 오류: {e}") logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료") # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위) missing_uids = [uid for uid in uid_list if uid not in login_comp_collected] if missing_uids: logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...") lobby_collected = set() for i in range(0, len(missing_uids), chunk_size): chunk_uids = missing_uids[i:i+chunk_size] lobby_query = { "size": 0, "query": { "bool": { "filter": [ {"terms": {"uid.keyword": chunk_uids}} ] } }, "aggs": { "users": { "terms": { "field": "uid.keyword", "size": chunk_size }, "aggs": { "info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], "_source": ["body.nickname", "auth.id"] } } } } } } try: response = exponential_backoff_retry( client.search, index="ds-logs-live-log_return_to_lobby", body=lobby_query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False ) for bucket in response["aggregations"]["users"]["buckets"]: uid = bucket["key"] info_hit = bucket["info"]["hits"]["hits"][0]["_source"] if bucket["info"]["hits"]["hits"] else {} # 차선 정보 업데이트 (login_comp에서 수집되지 않은 유저만) if uid in cohort: # nickname 업데이트 (없는 경우에만) if cohort[uid]['nickname'] == 'N/A': cohort[uid]['nickname'] = info_hit.get('body', {}).get('nickname', 'N/A') # auth.id 수집 (2순위, 없는 경우에만) if cohort[uid]['auth_id'] == 'N/A': auth_id = info_hit.get('auth', {}).get('id') if auth_id: cohort[uid]['auth_id'] = auth_id lobby_collected.add(uid) except Exception as e: logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료") # 최종 통계 auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A') logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}명") logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") logger.info("=" * 80) return cohort # ============================================================================== # 6. 수정된 msearch 쿼리 빌더 (nested → object) # ============================================================================== def build_fixed_msearch_queries( uids: List[str], cohort: Dict[str, Dict], metrics_config: Dict[str, Dict] ) -> List[str]: """수정된 msearch 쿼리 생성 (nested 쿼리 제거)""" queries = [] for uid in uids: user_data = cohort[uid] create_time_dt = user_data['create_time_dt'] # 시간 범위 정의 (create_time 기준) d0_start = user_data['create_time_utc'] d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') d1_start = d0_end d1_end = (create_time_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ') for metric_name, config in metrics_config.items(): # 시간 범위 선택 if config["time_range"] == "d0": time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}} else: # d1 time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}} # 사용자 식별 필터 if "target_field" in config: if config.get("use_body_target", False): # body.target_uid로 검색 (플레이어가 당한 경우) user_filter = {"term": {config["target_field"]: uid}} else: # 일반적인 uid 검색 user_filter = {"term": {config["target_field"]: uid}} else: user_filter = {"term": {"uid.keyword": uid}} # 쿼리 필터 구성 query_filters = [user_filter, time_filter] # 추가 필터 적용 (nested 제거) if "filters" in config: query_filters.extend(config["filters"]) # 조건부 필터 if config.get("agg_type") == "conditional_sum": condition_filter = {"term": {config['condition_field']: config['condition_value']}} query_filters.append(condition_filter) # 쿼리 바디 구성 agg_type = config.get("agg_type", "count") needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"] query_body = { "size": 0 if agg_type not in needs_docs else 1000, "query": {"bool": {"filter": query_filters}} } # Aggregation 설정 if agg_type in ["sum", "avg", "max", "conditional_sum"]: agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type query_body["aggs"] = { "metric_value": {agg_func: {"field": config["field"]}} } elif agg_type == "max_timestamp": query_body["aggs"] = { "metric_value": {"max": {"field": "@timestamp"}} } elif agg_type in ["first_value", "max_stalker"]: # first_value는 항상 @timestamp로 정렬 (earliest first) # max_stalker는 다른 로직이므로 별도 처리 필요 if agg_type == "first_value": query_body["sort"] = [{"@timestamp": {"order": "asc"}}] else: # max_stalker sort_field = config.get("field", "@timestamp") query_body["sort"] = [{sort_field: {"order": "desc"}}] query_body["_source"] = [config.get("field", "@timestamp")] elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]: # 최고 레벨 우선, 같은 레벨이면 최신 순 level_field = config.get("level_field", "body.level") stalker_field = config.get("stalker_field", "body.stalker") query_body["sort"] = [ {level_field: {"order": "desc"}}, {"@timestamp": {"order": "desc"}} ] query_body["_source"] = [level_field, stalker_field] # NDJSON 추가 queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False)) queries.append(json.dumps(query_body, ensure_ascii=False)) return queries # ============================================================================== # 7. 세션 지표 계산 (동일) # ============================================================================== def calculate_comprehensive_session_metrics( client: OpenSearch, uids: List[str], cohort_data: Dict[str, Dict] ) -> Dict[str, Dict]: """포괄적인 세션 지표 계산""" def stream_user_events(uid: str) -> Generator[Dict, None, None]: user_info = cohort_data.get(uid) if not user_info: return create_time_dt = user_info['create_time_dt'] d0_end_dt = create_time_dt + timedelta(hours=24) query = { "query": { "bool": { "filter": [ {"term": {"uid.keyword": uid}}, {"range": {"@timestamp": { "gte": user_info['create_time_utc'], "lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ') }}} ] } } } try: for doc in scan( client, query=query, index="ds-logs-live-*", scroll=SCROLL_TIMEOUT, _source=["@timestamp", "type"] ): source = doc['_source'] event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00')) if create_time_dt <= event_dt < d0_end_dt: yield { "time": event_dt, "type": source.get('type', '').lower() } except Exception: pass results = {} for uid in uids: events = list(stream_user_events(uid)) session_metrics = { 'active_seconds': 0, 'total_playtime_minutes': 0, 'session_count': 0, 'avg_session_length': 0, 'logout_abnormal': 0 } if len(events) < 2: results[uid] = session_metrics continue events.sort(key=lambda x: x['time']) login_events = [e for e in events if e['type'] == 'login_comp'] logout_events = [e for e in events if e['type'] == 'logout'] heartbeat_events = [e for e in events if e['type'] == 'heartbeat'] session_metrics['session_count'] = len(login_events) # 활동 시간 계산 if heartbeat_events: total_active_seconds = 0 for i in range(len(heartbeat_events) - 1): time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time'] if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES): total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600) session_metrics['active_seconds'] = int(total_active_seconds) # 총 플레이 시간 if login_events: total_playtime_seconds = 0 session_lengths = [] for login_event in login_events: login_time = login_event['time'] logout_time = None for logout_event in logout_events: if logout_event['time'] > login_time: logout_time = logout_event['time'] break if not logout_time and events: logout_time = events[-1]['time'] if logout_time: session_duration = (logout_time - login_time).total_seconds() session_duration = min(session_duration, MAX_SESSION_HOURS * 3600) total_playtime_seconds += session_duration session_lengths.append(session_duration) session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60) if session_lengths: session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60) # 비정상 종료 체크 if login_events and logout_events: last_login = max(login_events, key=lambda x: x['time'])['time'] last_logout = max(logout_events, key=lambda x: x['time'])['time'] session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0 elif login_events and not logout_events: session_metrics['logout_abnormal'] = 1 results[uid] = session_metrics return results # ============================================================================== # 8. 수정된 배치 처리 # ============================================================================== def process_fixed_batch( client: OpenSearch, batch_uids: List[str], cohort: Dict[str, Dict], metrics_config: Dict[str, Dict], include_session_metrics: bool = False ) -> List[Dict]: """수정된 배치 처리 함수""" logger.info(f"배치 처리 시작: {len(batch_uids)}명") try: # 1. 세션 지표 계산 (--full 옵션일 때만) if include_session_metrics: session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort) else: # 기본값으로 빈 딕셔너리 생성 session_metrics = {uid: { 'active_seconds': 0, 'total_playtime_minutes': 0, 'session_count': 0, 'avg_session_length': 0, 'logout_abnormal': 0 } for uid in batch_uids} # 2. 수정된 msearch 실행 msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config) body_ndjson = "\n".join(msearch_queries) + "\n" logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리") msearch_responses = exponential_backoff_retry( client.msearch, body=body_ndjson, request_timeout=300 ).get('responses', []) # 3. 결과 집계 batch_results = [] metrics_per_user = len(metrics_config) for idx, uid in enumerate(batch_uids): try: user_data = cohort[uid] user_session_metrics = session_metrics.get(uid, {}) user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] # 기본 정보 (first_login_time 제거, create_time으로 통합) result = { 'uid': uid, 'auth_id': user_data.get('auth_id', 'N/A'), # auth_id 기본값 처리 'nickname': user_data['nickname'], 'create_time': user_data.get('create_time_kst', 'N/A'), 'retention_status': 'Retained_d0', # 기본값, 나중에 업데이트 'language': user_data['language'], 'device': user_data['device'], 'active_seconds': user_session_metrics.get('active_seconds', 0), 'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0), 'session_count': user_session_metrics.get('session_count', 0), 'avg_session_length': user_session_metrics.get('avg_session_length', 0), 'logout_abnormal': user_session_metrics.get('logout_abnormal', 0) } # msearch 결과 파싱 metric_names = list(metrics_config.keys()) for i, metric_name in enumerate(metric_names): if i >= len(user_responses): result[metric_name] = 0 continue response = user_responses[i] config = metrics_config[metric_name] if 'error' in response: result[metric_name] = 0 continue agg_type = config.get("agg_type", "count") if agg_type == "count": hits_total = response.get('hits', {}).get('total', {}).get('value', 0) result[metric_name] = hits_total elif agg_type == "exists": hits_total = response.get('hits', {}).get('total', {}).get('value', 0) result[metric_name] = 1 if hits_total > 0 else 0 elif agg_type in ["sum", "avg", "max", "conditional_sum"]: agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value') result[metric_name] = int(agg_value) if agg_value else 0 elif agg_type == "max_timestamp": timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string') result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None elif agg_type == "first_value": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] field_name = config.get("field", "") if field_name.startswith("body."): keys = field_name.split(".") value = source_value for key in keys: if isinstance(value, dict): value = value.get(key) if value is None: # 문자열 필드는 빈 문자열 또는 적절한 기본값 설정 if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]: value = "" else: value = 0 break else: value = 0 break result[metric_name] = value else: result[metric_name] = source_value.get(field_name, 0) else: if metric_name == "dungeon_first_result": result[metric_name] = 2 # 미플레이 elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]: result[metric_name] = "" else: result[metric_name] = 0 elif agg_type == "max_stalker": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] field_name = config.get("field", "") keys = field_name.split(".") value = source_value for key in keys: value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A' result[metric_name] = value else: result[metric_name] = 'N/A' elif agg_type == "max_level_with_stalker": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] level_field = config.get("level_field", "body.level") # body.level 값 추출 level_keys = level_field.split(".") level_value = source_value for key in level_keys: level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0 result[metric_name] = level_value if level_value else 0 else: result[metric_name] = 0 elif agg_type == "max_level_stalker_name": hits = response.get('hits', {}).get('hits', []) if hits: source_value = hits[0]['_source'] stalker_field = config.get("stalker_field", "body.stalker") # body.stalker 값 추출 stalker_keys = stalker_field.split(".") stalker_value = source_value for key in stalker_keys: stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0 result[metric_name] = stalker_value if stalker_value else 0 else: result[metric_name] = 0 else: result[metric_name] = 0 # 리텐션 상태 업데이트 if metric_name == "retention_check" and result[metric_name] > 0: result['retention_status'] = 'Retained_d1' # 계산된 지표 if result.get('dungeon_entry_count', 0) > 0: escape_count = result.get('dungeon_escape_count', 0) result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2) # 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신) dungeon_count = result['dungeon_entry_count'] try: # 시간 범위 가져오기 (create_time 기준) create_time_dt = user_data['create_time_dt'] d0_start = user_data['create_time_utc'] d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') # survival_end 인덱스에서 직접 데미지 합계 조회 damage_query = { "size": 0, "query": { "bool": { "filter": [ {"term": {"uid.keyword": uid}}, {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}} ] } }, "aggs": { "total_monster_damage": {"sum": {"field": "body.play_stats.damage_dealt_monster"}}, "total_player_damage": {"sum": {"field": "body.play_stats.damage_dealt_player"}} } } damage_response = exponential_backoff_retry( client.search, index="ds-logs-live-survival_end", body=damage_query, request_timeout=DEFAULT_TIMEOUT ) monster_damage = damage_response.get('aggregations', {}).get('total_monster_damage', {}).get('value', 0) or 0 player_damage = damage_response.get('aggregations', {}).get('total_player_damage', {}).get('value', 0) or 0 result['avg_damage_per_game_monster'] = round(monster_damage / dungeon_count, 2) result['avg_damage_per_game_player'] = round(player_damage / dungeon_count, 2) except Exception as e: logger.warning(f"평균 데미지 계산 실패 (UID: {uid}): {e}") result['avg_damage_per_game_monster'] = 0 result['avg_damage_per_game_player'] = 0 else: result['dungeon_escape_rate'] = 0 result['avg_damage_per_game_monster'] = 0 result['avg_damage_per_game_player'] = 0 batch_results.append(result) except Exception as e: logger.warning(f"UID '{uid}' 처리 중 오류: {e}") logger.info(f"배치 처리 완료: {len(batch_results)}명 성공") return batch_results except Exception as e: logger.error(f"배치 처리 중 심각한 오류: {e}") return [] # ============================================================================== # 9. 병렬 처리 및 결과 저장 (동일) # ============================================================================== def process_cohort_fixed_parallel( client: OpenSearch, cohort: Dict[str, Dict], batch_size: int, max_workers: int, metrics_config: Dict[str, Dict], include_session_metrics: bool = False ) -> List[Dict]: """수정된 병렬 처리""" logger.info("=" * 80) logger.info("2단계: 수정된 병렬 배치 처리") logger.info(f"총 사용자: {len(cohort)}명") logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}") logger.info(f"분석 지표: {len(metrics_config)}개") logger.info(f"세션 지표 포함: {'예' if include_session_metrics else '아니오'}") uid_list = list(cohort.keys()) chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)] all_results = [] failed_chunks = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_chunk = { executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config, include_session_metrics): chunk for chunk in chunks } with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar: for future in as_completed(future_to_chunk): chunk = future_to_chunk[future] try: batch_results = future.result(timeout=600) if batch_results: all_results.extend(batch_results) else: failed_chunks.append(chunk) except Exception as e: logger.warning(f"배치 처리 실패: {e}") failed_chunks.append(chunk) finally: pbar.update(1) # 실패한 청크 재처리 if failed_chunks: logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...") for chunk in failed_chunks: try: batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics) all_results.extend(batch_results) except Exception as e: logger.error(f"재처리 실패: {e}") logger.info(f"2단계 완료: {len(all_results)}명 처리 성공") logger.info("=" * 80) return all_results def write_fixed_results(results: List[Dict], output_path: str) -> None: """수정된 결과 저장 (retention_d1 제거)""" logger.info("=" * 80) logger.info("3단계: 결과 저장") if not results: logger.error("저장할 결과 데이터가 없습니다.") return # 수정된 헤더 (first_login_time 제거, create_time만 사용) headers = [ 'uid', 'auth_id', 'nickname', 'create_time', 'retention_status', 'language', 'device', 'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal', 'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result', 'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time', 'total_armor_break', 'raid_play_count', 'escape_count', 'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count', 'avg_damage_per_game_monster', 'avg_damage_per_game_player', 'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed', 'guide_quest_stage', 'skill_points_earned', 'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count', 'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count', 'enchant_count', 'enchant_gold_spent', 'ingame_equip_count', 'object_interaction_count', 'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time', 'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count', 'exchange_register_count', 'exchange_use_count', 'coupon_used', 'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step', 'last_logout_time' ] try: with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore') writer.writeheader() for result in results: writer.writerow(result) logger.info(f"결과 파일 저장 완료: {output_path}") logger.info(f"총 {len(results)}명의 데이터 저장") logger.info(f"분석 지표: {len(headers)}개") except Exception as e: logger.error(f"CSV 파일 저장 실패: {e}") # ============================================================================== # 10. 메인 함수 # ============================================================================== def parse_arguments() -> argparse.Namespace: """명령줄 인자 파싱""" parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)") parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)') parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)') parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기') parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수') parser.add_argument('--sample-size', type=int, help='샘플 분석 크기') parser.add_argument('--full', action='store_true', help='세션 관련 지표 포함한 전체 분석') return parser.parse_args() def main(): """메인 실행 함수""" global logger overall_start_time = time.time() start_kst = datetime.now(KST) timestamp = start_kst.strftime('%Y%m%d_%H%M%S') log_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.log" csv_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.csv" logger = setup_logging(str(log_file_path)) logger.info("=" * 80) logger.info("던전 스토커즈 신규 유저 분석 v5.0") logger.info("create_uid 기반 신규 유저 판별 + 세션 지표 조건부 수집") logger.info("=" * 80) args = parse_arguments() try: start_kst_arg = datetime.fromisoformat(args.start_time) end_kst_arg = datetime.fromisoformat(args.end_time) start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}") logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}") logger.info(f"세션 지표 포함: {'예 (--full 옵션 사용)' if args.full else '아니오 (빠른 분석 모드)'}") except Exception as e: logger.error(f"시간 형식 오류: {e}") return client = create_opensearch_client() if not client: return try: metrics_config = get_fixed_metrics_config() logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}개") cohort = get_new_user_cohort_optimized(client, start_utc, end_utc) if not cohort: logger.error("분석할 신규 유저가 없습니다.") return if args.sample_size and args.sample_size < len(cohort): uid_list = list(cohort.keys()) sampled_uids = uid_list[:args.sample_size] cohort = {uid: cohort[uid] for uid in sampled_uids} logger.info(f"샘플링 모드: {args.sample_size}명만 분석") results = process_cohort_fixed_parallel( client, cohort, args.batch_size, args.max_workers, metrics_config, args.full ) write_fixed_results(results, str(csv_file_path)) # 요약 if results: total_users = len(results) retained_d1 = sum(1 for r in results if r.get('retention_status') == 'Retained_d1') retention_rate = (retained_d1 / total_users) * 100 logger.info("=" * 80) logger.info("분석 요약") logger.info("=" * 80) logger.info(f"총 신규 유저: {total_users:,}명") logger.info(f"D+1 리텐션: {retained_d1:,}명 ({retention_rate:.1f}%)") logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}분") except KeyboardInterrupt: logger.warning("사용자에 의해 중단되었습니다.") except Exception as e: logger.error(f"예상치 못한 오류: {e}") import traceback logger.error(traceback.format_exc()) finally: stop_timer_event.set() end_time = time.time() total_time = str(timedelta(seconds=int(end_time - overall_start_time))) end_kst = datetime.now(KST) logger.info("=" * 80) logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}") logger.info(f"총 소요 시간: {total_time}") logger.info("수정된 분석 완료!") logger.info("=" * 80) if __name__ == "__main__": main()