From f859a31e7d10461516ebde11b3c130c52843913f Mon Sep 17 00:00:00 2001 From: Gnill82 Date: Wed, 27 Aug 2025 04:45:07 +0900 Subject: [PATCH] Refactor metrics configuration and improve query efficiency by updating field references and aggregation types --- ds_new_user_analy.py | 205 ++++++++++++++++++++++++++++++------------- 1 file changed, 146 insertions(+), 59 deletions(-) diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index 25069ff..af3b3b0 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -206,13 +206,13 @@ def get_fixed_metrics_config() -> Dict[str, Dict]: "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "first_value", - "field": "body.dungeon_mode.keyword" + "field": "body.dungeon_mode" }, "dungeon_first_stalker": { "index": "ds-logs-live-survival_sta", "time_range": "d0", "agg_type": "first_value", - "field": "body.stalker_name.keyword" + "field": "body.stalker_name" }, "dungeon_first_result": { "index": "ds-logs-live-survival_end", @@ -259,22 +259,22 @@ def get_fixed_metrics_config() -> Dict[str, Dict]: # ==================== 3.4 전투 성과 ==================== "monster_kill_count": { - "index": "ds-logs-live-survival_end", + "index": "ds-logs-live-monster_kill", "time_range": "d0", - "agg_type": "sum", - "field": "body.play_stats.monster_kill_cnt" + "agg_type": "count" }, "player_kill_count": { "index": "ds-logs-live-player_kill", "time_range": "d0", "agg_type": "count", - "target_field": "body.instigator_uid.keyword" + "target_field": "uid.keyword" }, "player_killed_count": { "index": "ds-logs-live-player_kill", "time_range": "d0", "agg_type": "count", - "target_field": "body.target_uid.keyword" + "target_field": "body.target_uid.keyword", + "use_body_target": True }, "death_count": { "index": "ds-logs-live-dead", @@ -282,44 +282,33 @@ def get_fixed_metrics_config() -> Dict[str, Dict]: "agg_type": "count", "filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}] }, - "damage_dealt_monster": { - "index": "ds-logs-live-survival_end", - "time_range": "d0", - "agg_type": "sum", - "field": "body.play_stats.damage_dealt_monster" - }, - "damage_dealt_player": { - "index": "ds-logs-live-survival_end", - "time_range": "d0", - "agg_type": "sum", - "field": "body.play_stats.damage_dealt_player" - }, # ==================== 3.5 진행도 및 성장 ==================== "level_max": { - "index": "ds-logs-live-matching_start", + "index": "ds-logs-live-level_up", "time_range": "d0", - "agg_type": "max", - "field": "body.class_level" + "agg_type": "max_level_with_stalker", + "level_field": "body.level", + "stalker_field": "body.stalker" }, "level_max_stalker": { "index": "ds-logs-live-level_up", "time_range": "d0", - "agg_type": "max_stalker", - "field": "body.stalker.keyword" + "agg_type": "max_level_stalker_name", + "level_field": "body.level", + "stalker_field": "body.stalker" }, "tutorial_entry": { "index": "ds-logs-live-tutorial_entry", "time_range": "d0", - "agg_type": "exists", - "filters": [{"term": {"body.action.keyword": "Start"}}] + "agg_type": "count" }, "tutorial_completed": { "index": "ds-logs-live-log_tutorial", "time_range": "d0", - "agg_type": "exists", + "agg_type": "count", "filters": [ - {"term": {"body.action_type.keyword": "Complet"}}, + {"term": {"body.action_type.keyword": "Complete"}}, {"term": {"body.stage_type.keyword": "result"}} ] }, @@ -465,7 +454,7 @@ def get_fixed_metrics_config() -> Dict[str, Dict]: "index": "ds-logs-live-player_invite", "time_range": "d0", "agg_type": "count", - "target_field": "body.target_uid.keyword" + "target_field": "body.target_uid" }, "mail_read_count": { "index": "ds-logs-live-mail_read", @@ -574,14 +563,14 @@ def get_new_user_cohort_optimized( "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "asc"}}], - "_source": ["country", "body.device_mod", "body.nickname"] + "_source": ["body.device_mod", "body.nickname"] } }, - "latest_nickname": { + "latest_info": { "top_hits": { "size": 1, "sort": [{"@timestamp": {"order": "desc"}}], - "_source": ["body.nickname"] + "_source": ["body.nickname", "body.language"] } } } @@ -616,16 +605,16 @@ def get_new_user_cohort_optimized( first_login_utc = bucket["first_login"]["value_as_string"] user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] - latest_nickname_hit = bucket["latest_nickname"]["hits"]["hits"][0]["_source"] + latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] cohort[uid] = { 'auth_id': auth_id, 'first_login_utc': first_login_utc, 'first_login_kst': format_kst_time(first_login_utc), 'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')), - 'country': user_hit.get('country', 'N/A'), + 'language': latest_info_hit.get('body', {}).get('language', 'N/A'), 'device': user_hit.get('body', {}).get('device_mod', 'N/A'), - 'nickname': latest_nickname_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') + 'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') } total_users += 1 @@ -676,20 +665,14 @@ def build_fixed_msearch_queries( # 사용자 식별 필터 if "target_field" in config: - if config["target_field"] == "body.instigator_uid.keyword": - user_filter = {"term": {config["target_field"]: uid}} - elif config["target_field"] == "body.target_uid.keyword": + if config.get("use_body_target", False): + # body.target_uid로 검색 (플레이어가 당한 경우) user_filter = {"term": {config["target_field"]: uid}} else: - user_filter = {"bool": {"should": [ - {"term": {"uid.keyword": uid}}, - {"term": {"auth.id.keyword": user_data['auth_id']}} - ], "minimum_should_match": 1}} + # 일반적인 uid 검색 + user_filter = {"term": {config["target_field"]: uid}} else: - user_filter = {"bool": {"should": [ - {"term": {"uid.keyword": uid}}, - {"term": {"auth.id.keyword": user_data['auth_id']}} - ], "minimum_should_match": 1}} + user_filter = {"term": {"uid.keyword": uid}} # 쿼리 필터 구성 query_filters = [user_filter, time_filter] @@ -704,15 +687,15 @@ def build_fixed_msearch_queries( query_filters.append(condition_filter) # 쿼리 바디 구성 + agg_type = config.get("agg_type", "count") + needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"] + query_body = { - "size": 0 if config.get("agg_type") not in ["first_value", "max_stalker"] else 1000, - "query": {"bool": {"filter": query_filters}}, - "track_total_hits": False + "size": 0 if agg_type not in needs_docs else 1000, + "query": {"bool": {"filter": query_filters}} } # Aggregation 설정 - agg_type = config.get("agg_type", "count") - if agg_type in ["sum", "avg", "max", "conditional_sum"]: agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type query_body["aggs"] = { @@ -723,11 +706,25 @@ def build_fixed_msearch_queries( "metric_value": {"max": {"field": "@timestamp"}} } elif agg_type in ["first_value", "max_stalker"]: - sort_field = config.get("field", "@timestamp") - sort_order = "asc" if agg_type == "first_value" else "desc" + # first_value는 항상 @timestamp로 정렬 (earliest first) + # max_stalker는 다른 로직이므로 별도 처리 필요 + if agg_type == "first_value": + query_body["sort"] = [{"@timestamp": {"order": "asc"}}] + else: # max_stalker + sort_field = config.get("field", "@timestamp") + query_body["sort"] = [{sort_field: {"order": "desc"}}] - query_body["sort"] = [{sort_field: {"order": sort_order}}] query_body["_source"] = [config.get("field", "@timestamp")] + elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]: + # 최고 레벨 우선, 같은 레벨이면 최신 순 + level_field = config.get("level_field", "body.level") + stalker_field = config.get("stalker_field", "body.stalker") + + query_body["sort"] = [ + {level_field: {"order": "desc"}}, + {"@timestamp": {"order": "desc"}} + ] + query_body["_source"] = [level_field, stalker_field] # NDJSON 추가 queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False)) @@ -910,7 +907,7 @@ def process_fixed_batch( 'nickname': user_data['nickname'], 'first_login_time': user_data['first_login_kst'], 'retention_status': 'Retained_d0', - 'country': user_data['country'], + 'language': user_data['language'], 'device': user_data['device'], 'active_seconds': user_session_metrics.get('active_seconds', 0), 'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0), @@ -936,7 +933,10 @@ def process_fixed_batch( agg_type = config.get("agg_type", "count") - if agg_type == "count" or agg_type == "exists": + if agg_type == "count": + hits_total = response.get('hits', {}).get('total', {}).get('value', 0) + result[metric_name] = hits_total + elif agg_type == "exists": hits_total = response.get('hits', {}).get('total', {}).get('value', 0) result[metric_name] = 1 if hits_total > 0 else 0 @@ -958,13 +958,26 @@ def process_fixed_batch( keys = field_name.split(".") value = source_value for key in keys: - value = value.get(key, 0) if isinstance(value, dict) else 0 + if isinstance(value, dict): + value = value.get(key) + if value is None: + # 문자열 필드는 빈 문자열 또는 적절한 기본값 설정 + if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]: + value = "" + else: + value = 0 + break + else: + value = 0 + break result[metric_name] = value else: result[metric_name] = source_value.get(field_name, 0) else: if metric_name == "dungeon_first_result": result[metric_name] = 2 # 미플레이 + elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]: + result[metric_name] = "" else: result[metric_name] = 0 @@ -981,6 +994,36 @@ def process_fixed_batch( else: result[metric_name] = 'N/A' + elif agg_type == "max_level_with_stalker": + hits = response.get('hits', {}).get('hits', []) + if hits: + source_value = hits[0]['_source'] + level_field = config.get("level_field", "body.level") + + # body.level 값 추출 + level_keys = level_field.split(".") + level_value = source_value + for key in level_keys: + level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0 + result[metric_name] = level_value if level_value else 0 + else: + result[metric_name] = 0 + + elif agg_type == "max_level_stalker_name": + hits = response.get('hits', {}).get('hits', []) + if hits: + source_value = hits[0]['_source'] + stalker_field = config.get("stalker_field", "body.stalker") + + # body.stalker 값 추출 + stalker_keys = stalker_field.split(".") + stalker_value = source_value + for key in stalker_keys: + stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0 + result[metric_name] = stalker_value if stalker_value else 0 + else: + result[metric_name] = 0 + else: result[metric_name] = 0 @@ -992,8 +1035,52 @@ def process_fixed_batch( if result.get('dungeon_entry_count', 0) > 0: escape_count = result.get('dungeon_escape_count', 0) result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2) + + # 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신) + dungeon_count = result['dungeon_entry_count'] + try: + # 시간 범위 가져오기 + first_login_dt = user_data['first_login_dt'] + d0_start = user_data['first_login_utc'] + d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ') + + # survival_end 인덱스에서 직접 데미지 합계 조회 + damage_query = { + "size": 0, + "query": { + "bool": { + "filter": [ + {"term": {"uid.keyword": uid}}, + {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}} + ] + } + }, + "aggs": { + "total_monster_damage": {"sum": {"field": "body.play_stats.damage_dealt_monster"}}, + "total_player_damage": {"sum": {"field": "body.play_stats.damage_dealt_player"}} + } + } + + damage_response = exponential_backoff_retry( + client.search, + index="ds-logs-live-survival_end", + body=damage_query, + request_timeout=DEFAULT_TIMEOUT + ) + + monster_damage = damage_response.get('aggregations', {}).get('total_monster_damage', {}).get('value', 0) or 0 + player_damage = damage_response.get('aggregations', {}).get('total_player_damage', {}).get('value', 0) or 0 + + result['avg_damage_per_game_monster'] = round(monster_damage / dungeon_count, 2) + result['avg_damage_per_game_player'] = round(player_damage / dungeon_count, 2) + except Exception as e: + logger.warning(f"평균 데미지 계산 실패 (UID: {uid}): {e}") + result['avg_damage_per_game_monster'] = 0 + result['avg_damage_per_game_player'] = 0 else: result['dungeon_escape_rate'] = 0 + result['avg_damage_per_game_monster'] = 0 + result['avg_damage_per_game_player'] = 0 batch_results.append(result) @@ -1081,13 +1168,13 @@ def write_fixed_results(results: List[Dict], output_path: str) -> None: # 수정된 헤더 (retention_d1 제거) headers = [ - 'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'country', 'device', + 'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'language', 'device', 'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal', 'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result', 'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time', 'total_armor_break', 'raid_play_count', 'escape_count', 'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count', - 'damage_dealt_monster', 'damage_dealt_player', + 'avg_damage_per_game_monster', 'avg_damage_per_game_player', 'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed', 'guide_quest_stage', 'skill_points_earned', 'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count',