Refactor metrics configuration and improve query efficiency by updating field references and aggregation types

This commit is contained in:
Gnill82
2025-08-27 04:45:07 +09:00
parent 2a7a481e55
commit f859a31e7d

View File

@ -206,13 +206,13 @@ def get_fixed_metrics_config() -> Dict[str, Dict]:
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.dungeon_mode.keyword"
"field": "body.dungeon_mode"
},
"dungeon_first_stalker": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.stalker_name.keyword"
"field": "body.stalker_name"
},
"dungeon_first_result": {
"index": "ds-logs-live-survival_end",
@ -259,22 +259,22 @@ def get_fixed_metrics_config() -> Dict[str, Dict]:
# ==================== 3.4 전투 성과 ====================
"monster_kill_count": {
"index": "ds-logs-live-survival_end",
"index": "ds-logs-live-monster_kill",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.monster_kill_cnt"
"agg_type": "count"
},
"player_kill_count": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.instigator_uid.keyword"
"target_field": "uid.keyword"
},
"player_killed_count": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid.keyword"
"target_field": "body.target_uid.keyword",
"use_body_target": True
},
"death_count": {
"index": "ds-logs-live-dead",
@ -282,44 +282,33 @@ def get_fixed_metrics_config() -> Dict[str, Dict]:
"agg_type": "count",
"filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}]
},
"damage_dealt_monster": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.damage_dealt_monster"
},
"damage_dealt_player": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.damage_dealt_player"
},
# ==================== 3.5 진행도 및 성장 ====================
"level_max": {
"index": "ds-logs-live-matching_start",
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max",
"field": "body.class_level"
"agg_type": "max_level_with_stalker",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"level_max_stalker": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_stalker",
"field": "body.stalker.keyword"
"agg_type": "max_level_stalker_name",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"tutorial_entry": {
"index": "ds-logs-live-tutorial_entry",
"time_range": "d0",
"agg_type": "exists",
"filters": [{"term": {"body.action.keyword": "Start"}}]
"agg_type": "count"
},
"tutorial_completed": {
"index": "ds-logs-live-log_tutorial",
"time_range": "d0",
"agg_type": "exists",
"agg_type": "count",
"filters": [
{"term": {"body.action_type.keyword": "Complet"}},
{"term": {"body.action_type.keyword": "Complete"}},
{"term": {"body.stage_type.keyword": "result"}}
]
},
@ -465,7 +454,7 @@ def get_fixed_metrics_config() -> Dict[str, Dict]:
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid.keyword"
"target_field": "body.target_uid"
},
"mail_read_count": {
"index": "ds-logs-live-mail_read",
@ -574,14 +563,14 @@ def get_new_user_cohort_optimized(
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["country", "body.device_mod", "body.nickname"]
"_source": ["body.device_mod", "body.nickname"]
}
},
"latest_nickname": {
"latest_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname"]
"_source": ["body.nickname", "body.language"]
}
}
}
@ -616,16 +605,16 @@ def get_new_user_cohort_optimized(
first_login_utc = bucket["first_login"]["value_as_string"]
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"]
latest_nickname_hit = bucket["latest_nickname"]["hits"]["hits"][0]["_source"]
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"]
cohort[uid] = {
'auth_id': auth_id,
'first_login_utc': first_login_utc,
'first_login_kst': format_kst_time(first_login_utc),
'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')),
'country': user_hit.get('country', 'N/A'),
'language': latest_info_hit.get('body', {}).get('language', 'N/A'),
'device': user_hit.get('body', {}).get('device_mod', 'N/A'),
'nickname': latest_nickname_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
}
total_users += 1
@ -676,20 +665,14 @@ def build_fixed_msearch_queries(
# 사용자 식별 필터
if "target_field" in config:
if config["target_field"] == "body.instigator_uid.keyword":
user_filter = {"term": {config["target_field"]: uid}}
elif config["target_field"] == "body.target_uid.keyword":
if config.get("use_body_target", False):
# body.target_uid로 검색 (플레이어가 당한 경우)
user_filter = {"term": {config["target_field"]: uid}}
else:
user_filter = {"bool": {"should": [
{"term": {"uid.keyword": uid}},
{"term": {"auth.id.keyword": user_data['auth_id']}}
], "minimum_should_match": 1}}
# 일반적인 uid 검색
user_filter = {"term": {config["target_field"]: uid}}
else:
user_filter = {"bool": {"should": [
{"term": {"uid.keyword": uid}},
{"term": {"auth.id.keyword": user_data['auth_id']}}
], "minimum_should_match": 1}}
user_filter = {"term": {"uid.keyword": uid}}
# 쿼리 필터 구성
query_filters = [user_filter, time_filter]
@ -704,15 +687,15 @@ def build_fixed_msearch_queries(
query_filters.append(condition_filter)
# 쿼리 바디 구성
agg_type = config.get("agg_type", "count")
needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"]
query_body = {
"size": 0 if config.get("agg_type") not in ["first_value", "max_stalker"] else 1000,
"query": {"bool": {"filter": query_filters}},
"track_total_hits": False
"size": 0 if agg_type not in needs_docs else 1000,
"query": {"bool": {"filter": query_filters}}
}
# Aggregation 설정
agg_type = config.get("agg_type", "count")
if agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type
query_body["aggs"] = {
@ -723,11 +706,25 @@ def build_fixed_msearch_queries(
"metric_value": {"max": {"field": "@timestamp"}}
}
elif agg_type in ["first_value", "max_stalker"]:
sort_field = config.get("field", "@timestamp")
sort_order = "asc" if agg_type == "first_value" else "desc"
# first_value는 항상 @timestamp로 정렬 (earliest first)
# max_stalker는 다른 로직이므로 별도 처리 필요
if agg_type == "first_value":
query_body["sort"] = [{"@timestamp": {"order": "asc"}}]
else: # max_stalker
sort_field = config.get("field", "@timestamp")
query_body["sort"] = [{sort_field: {"order": "desc"}}]
query_body["sort"] = [{sort_field: {"order": sort_order}}]
query_body["_source"] = [config.get("field", "@timestamp")]
elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]:
# 최고 레벨 우선, 같은 레벨이면 최신 순
level_field = config.get("level_field", "body.level")
stalker_field = config.get("stalker_field", "body.stalker")
query_body["sort"] = [
{level_field: {"order": "desc"}},
{"@timestamp": {"order": "desc"}}
]
query_body["_source"] = [level_field, stalker_field]
# NDJSON 추가
queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False))
@ -910,7 +907,7 @@ def process_fixed_batch(
'nickname': user_data['nickname'],
'first_login_time': user_data['first_login_kst'],
'retention_status': 'Retained_d0',
'country': user_data['country'],
'language': user_data['language'],
'device': user_data['device'],
'active_seconds': user_session_metrics.get('active_seconds', 0),
'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0),
@ -936,7 +933,10 @@ def process_fixed_batch(
agg_type = config.get("agg_type", "count")
if agg_type == "count" or agg_type == "exists":
if agg_type == "count":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = hits_total
elif agg_type == "exists":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = 1 if hits_total > 0 else 0
@ -958,13 +958,26 @@ def process_fixed_batch(
keys = field_name.split(".")
value = source_value
for key in keys:
value = value.get(key, 0) if isinstance(value, dict) else 0
if isinstance(value, dict):
value = value.get(key)
if value is None:
# 문자열 필드는 빈 문자열 또는 적절한 기본값 설정
if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
value = ""
else:
value = 0
break
else:
value = 0
break
result[metric_name] = value
else:
result[metric_name] = source_value.get(field_name, 0)
else:
if metric_name == "dungeon_first_result":
result[metric_name] = 2 # 미플레이
elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
result[metric_name] = ""
else:
result[metric_name] = 0
@ -981,6 +994,36 @@ def process_fixed_batch(
else:
result[metric_name] = 'N/A'
elif agg_type == "max_level_with_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
level_field = config.get("level_field", "body.level")
# body.level 값 추출
level_keys = level_field.split(".")
level_value = source_value
for key in level_keys:
level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0
result[metric_name] = level_value if level_value else 0
else:
result[metric_name] = 0
elif agg_type == "max_level_stalker_name":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
stalker_field = config.get("stalker_field", "body.stalker")
# body.stalker 값 추출
stalker_keys = stalker_field.split(".")
stalker_value = source_value
for key in stalker_keys:
stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0
result[metric_name] = stalker_value if stalker_value else 0
else:
result[metric_name] = 0
else:
result[metric_name] = 0
@ -992,8 +1035,52 @@ def process_fixed_batch(
if result.get('dungeon_entry_count', 0) > 0:
escape_count = result.get('dungeon_escape_count', 0)
result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2)
# 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신)
dungeon_count = result['dungeon_entry_count']
try:
# 시간 범위 가져오기
first_login_dt = user_data['first_login_dt']
d0_start = user_data['first_login_utc']
d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
# survival_end 인덱스에서 직접 데미지 합계 조회
damage_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"term": {"uid.keyword": uid}},
{"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
]
}
},
"aggs": {
"total_monster_damage": {"sum": {"field": "body.play_stats.damage_dealt_monster"}},
"total_player_damage": {"sum": {"field": "body.play_stats.damage_dealt_player"}}
}
}
damage_response = exponential_backoff_retry(
client.search,
index="ds-logs-live-survival_end",
body=damage_query,
request_timeout=DEFAULT_TIMEOUT
)
monster_damage = damage_response.get('aggregations', {}).get('total_monster_damage', {}).get('value', 0) or 0
player_damage = damage_response.get('aggregations', {}).get('total_player_damage', {}).get('value', 0) or 0
result['avg_damage_per_game_monster'] = round(monster_damage / dungeon_count, 2)
result['avg_damage_per_game_player'] = round(player_damage / dungeon_count, 2)
except Exception as e:
logger.warning(f"평균 데미지 계산 실패 (UID: {uid}): {e}")
result['avg_damage_per_game_monster'] = 0
result['avg_damage_per_game_player'] = 0
else:
result['dungeon_escape_rate'] = 0
result['avg_damage_per_game_monster'] = 0
result['avg_damage_per_game_player'] = 0
batch_results.append(result)
@ -1081,13 +1168,13 @@ def write_fixed_results(results: List[Dict], output_path: str) -> None:
# 수정된 헤더 (retention_d1 제거)
headers = [
'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'country', 'device',
'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'language', 'device',
'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal',
'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result',
'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time',
'total_armor_break', 'raid_play_count', 'escape_count',
'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count',
'damage_dealt_monster', 'damage_dealt_player',
'avg_damage_per_game_monster', 'avg_damage_per_game_player',
'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed',
'guide_quest_stage', 'skill_points_earned',
'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count',