Files
ds_new_user_analy/ds_new_user_analy.py

1387 lines
54 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전)
OpenSearch 매핑 오류 수정: nested → object 쿼리 변경
수정사항:
- body 필드를 nested가 아닌 object로 처리
- retention_d1 중복 필드 제거
- 필드 경로 정확성 개선
- 키워드 필드 사용 (.keyword 추가)
작성자: Claude Code
기획서 기반: DS-new_users-analy.md
참고: OpenSearch 매핑 ds_opensearch_mappings.json
"""
import os
import csv
import json
import time
import yaml
import logging
import argparse
import threading
from datetime import datetime, timedelta, timezone
from collections import defaultdict, Counter
from typing import Dict, List, Optional, Tuple, Generator, Any, Set
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from pathlib import Path
import pandas as pd
from tqdm import tqdm
from opensearchpy import OpenSearch
from opensearchpy.helpers import scan
# ==============================================================================
# 1. 설정 및 상수
# ==============================================================================
# OpenSearch 연결 설정
OPENSEARCH_CONFIG = {
"host": "ds-opensearch.oneunivrs.com",
"port": 9200,
"auth": {
"username": "admin",
"password": "DHp5#r#GYQ9d"
},
"use_ssl": True,
"verify_certs": False,
"timeout": 120,
"max_retries": 3,
"headers": {"Connection": "close"}
}
# 한국 표준시 설정
KST = timezone(timedelta(hours=9))
# 성능 최적화 설정
DEFAULT_BATCH_SIZE = 500
DEFAULT_MAX_WORKERS = 6
DEFAULT_COMPOSITE_SIZE = 500
DEFAULT_TIMEOUT = 180
SCROLL_TIMEOUT = "5m"
SESSION_GAP_MINUTES = 5
MAX_SESSION_HOURS = 3
# 출력 디렉토리 설정
BASE_DIR = Path(__file__).parent
OUTPUT_DIR = BASE_DIR / "analysis_results"
OUTPUT_DIR.mkdir(exist_ok=True)
# 전역 변수
stop_timer_event = threading.Event()
logger = None
# ==============================================================================
# 2. 로깅 시스템 설정
# ==============================================================================
def setup_logging(log_file_path: str) -> logging.Logger:
"""한국 시간 기준 로깅 설정"""
class KSTFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
kst = timezone(timedelta(hours=9))
ct = datetime.fromtimestamp(record.created, kst)
return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00')
logger = logging.getLogger('NewUserAnalyzer')
logger.setLevel(logging.INFO)
# 기존 핸들러 제거
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 파일 핸들러
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 콘솔 핸들러
console_handler = logging.StreamHandler()
console_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# 외부 라이브러리 로그 억제
logging.getLogger('opensearchpy').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
return logger
# ==============================================================================
# 3. OpenSearch 연결 및 유틸리티
# ==============================================================================
def create_opensearch_client() -> Optional[OpenSearch]:
"""OpenSearch 클라이언트 생성"""
logger.info("OpenSearch 클러스터 연결 시도 중...")
try:
client = OpenSearch(
hosts=[{
"host": OPENSEARCH_CONFIG['host'],
"port": OPENSEARCH_CONFIG['port'],
"scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http"
}],
http_auth=(
OPENSEARCH_CONFIG['auth']['username'],
OPENSEARCH_CONFIG['auth']['password']
),
use_ssl=OPENSEARCH_CONFIG['use_ssl'],
verify_certs=OPENSEARCH_CONFIG['verify_certs'],
ssl_show_warn=False,
timeout=OPENSEARCH_CONFIG['timeout'],
max_retries=OPENSEARCH_CONFIG['max_retries'],
retry_on_timeout=True,
headers=OPENSEARCH_CONFIG['headers']
)
if not client.ping():
raise ConnectionError("클러스터 PING 실패")
logger.info("OpenSearch 연결 성공!")
return client
except Exception as e:
logger.error(f"OpenSearch 연결 실패: {e}")
return None
def exponential_backoff_retry(func, *args, **kwargs) -> Any:
"""지수 백오프 재시도 패턴"""
for delay in [1, 2, 4, 8, 16]:
try:
return func(*args, **kwargs)
except Exception as e:
if delay == 16:
raise e
logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})")
time.sleep(delay)
def format_kst_time(timestamp_str: str) -> str:
"""UTC 타임스탬프를 KST로 변환"""
try:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
kst_dt = dt.astimezone(KST)
return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00')
except:
return timestamp_str
# ==============================================================================
# 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반)
# ==============================================================================
def get_fixed_metrics_config() -> Dict[str, Dict]:
"""OpenSearch 매핑에 기반한 수정된 분석 지표"""
return {
# ==================== 3.2 플레이 시간 및 세션 ====================
"session_count": {
"index": "ds-logs-live-login_comp",
"time_range": "d0",
"agg_type": "count"
},
"last_logout_time": {
"index": "ds-logs-live-logout",
"time_range": "d0",
"agg_type": "max_timestamp"
},
# ==================== 3.3 던전 플레이 성과 ====================
"dungeon_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count"
},
"dungeon_first_mode": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.dungeon_mode"
},
"dungeon_first_stalker": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.stalker_name"
},
"dungeon_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result"
},
"dungeon_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}]
},
"avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime"
},
"max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime"
},
"total_armor_break": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.armor_break_cnt"
},
"raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play"
},
"escape_count": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.inter_type": 0}}]
},
# ==================== 3.4 전투 성과 ====================
"monster_kill_count": {
"index": "ds-logs-live-monster_kill",
"time_range": "d0",
"agg_type": "count"
},
"player_kill_count": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "uid.keyword"
},
"player_killed_count": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid.keyword",
"use_body_target": True
},
"death_count": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}]
},
# ==================== 3.5 진행도 및 성장 ====================
"level_max": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_level_with_stalker",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"level_max_stalker": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_level_stalker_name",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"tutorial_entry": {
"index": "ds-logs-live-tutorial_entry",
"time_range": "d0",
"agg_type": "count"
},
"tutorial_completed": {
"index": "ds-logs-live-log_tutorial",
"time_range": "d0",
"agg_type": "count",
"filters": [
{"term": {"body.action_type.keyword": "Complete"}},
{"term": {"body.stage_type.keyword": "result"}}
]
},
"guide_quest_stage": {
"index": "ds-logs-live-guide_quest_stage",
"time_range": "d0",
"agg_type": "max",
"field": "body.guide_step"
},
"skill_points_earned": {
"index": "ds-logs-live-skill_point_get",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.6 아이템 및 경제 ====================
"items_obtained_count": {
"index": "ds-logs-live-item_get",
"time_range": "d0",
"agg_type": "count"
},
"highest_item_grade": {
"index": "ds-logs-live-item_get",
"time_range": "d0",
"agg_type": "max",
"field": "body.item_grade"
},
"blueprint_use_count": {
"index": "ds-logs-live-craft_from_blueprint",
"time_range": "d0",
"agg_type": "count"
},
"shop_buy_count": {
"index": "ds-logs-live-shop_buy",
"time_range": "d0",
"agg_type": "count"
},
"shop_sell_count": {
"index": "ds-logs-live-shop_sell",
"time_range": "d0",
"agg_type": "count"
},
"gold_spent": {
"index": "ds-logs-live-shop_buy",
"time_range": "d0",
"agg_type": "conditional_sum",
"field": "body.amt",
"condition_field": "body.cost_id.keyword",
"condition_value": "i108000"
},
"gold_earned": {
"index": "ds-logs-live-shop_sell",
"time_range": "d0",
"agg_type": "conditional_sum",
"field": "body.amt",
"condition_field": "body.cost_id.keyword",
"condition_value": "i108000"
},
"storage_in_count": {
"index": "ds-logs-live-storage_use",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": 1}}]
},
"storage_out_count": {
"index": "ds-logs-live-storage_use",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": -1}}]
},
"enchant_count": {
"index": "ds-logs-live-enchant",
"time_range": "d0",
"agg_type": "count"
},
"enchant_gold_spent": {
"index": "ds-logs-live-enchant",
"time_range": "d0",
"agg_type": "sum",
"field": "body.amt"
},
# ==================== 3.7 장비 관리 ====================
"ingame_equip_count": {
"index": "ds-logs-live-item_ingame_equip",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.base_type": 2}}]
},
# ==================== 3.8 오브젝트 상호작용 ====================
"object_interaction_count": {
"index": "ds-logs-live-obj_inter",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.9 매칭 시스템 ====================
"matching_start_count": {
"index": "ds-logs-live-matching_start",
"time_range": "d0",
"agg_type": "count"
},
"matching_complete_count": {
"index": "ds-logs-live-matching_complete",
"time_range": "d0",
"agg_type": "count"
},
"matching_failed_count": {
"index": "ds-logs-live-matching_failed",
"time_range": "d0",
"agg_type": "count"
},
"avg_matching_time": {
"index": "ds-logs-live-matching_complete",
"time_range": "d0",
"agg_type": "avg",
"field": "body.matchingtime"
},
# ==================== 3.10 소셜 활동 ====================
"friend_add_count": {
"index": "ds-logs-live-friend",
"time_range": "d0",
"agg_type": "count",
"filters": [
{"term": {"body.oper_type": 0}},
{"term": {"body.friend_type": 0}}
]
},
"friend_delete_count": {
"index": "ds-logs-live-friend",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": 1}}]
},
"party_invite_sent": {
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count"
},
"party_invite_received": {
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid"
},
"mail_read_count": {
"index": "ds-logs-live-mail_read",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.11 거래소 및 경제 활동 ====================
"exchange_register_count": {
"index": "ds-logs-live-exchange_reg",
"time_range": "d0",
"agg_type": "count"
},
"exchange_use_count": {
"index": "ds-logs-live-exchange_use",
"time_range": "d0",
"agg_type": "count"
},
"coupon_used": {
"index": "ds-logs-live-coupon",
"time_range": "d0",
"agg_type": "exists"
},
# ==================== 3.12 기타 활동 ====================
"button_click_count": {
"index": "ds-logs-live-button_click",
"time_range": "d0",
"agg_type": "count"
},
"hideout_upgrade_count": {
"index": "ds-logs-live-log_hideout_upgrade",
"time_range": "d0",
"agg_type": "count"
},
"hideout_max_level": {
"index": "ds-logs-live-log_hideout_upgrade",
"time_range": "d0",
"agg_type": "max",
"field": "body.hideout_level"
},
"season_pass_buy": {
"index": "ds-logs-live-season_pass",
"time_range": "d0",
"agg_type": "exists",
"filters": [{"term": {"body.cause": 1}}]
},
"season_pass_max_step": {
"index": "ds-logs-live-season_pass",
"time_range": "d0",
"agg_type": "max",
"field": "body.season_pass_step"
},
# ==================== 리텐션 판정 (retention_d1 삭제됨) ====================
"retention_check": {
"index": "ds-logs-live-login_comp",
"time_range": "d1",
"agg_type": "exists"
}
}
# ==============================================================================
# 5. Composite Aggregation 신규 유저 코호트 선정 (동일)
# ==============================================================================
def get_new_user_cohort_optimized(
client: OpenSearch,
start_time: str,
end_time: str,
page_size: int = DEFAULT_COMPOSITE_SIZE
) -> Dict[str, Dict]:
"""Composite Aggregation을 활용한 신규 유저 코호트 선정
ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별
"""
logger.info("=" * 80)
logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준)")
logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}")
logger.info(f"페이지 크기: {page_size}")
cohort = {}
after_key = None
total_users = 0
# Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출
create_uid_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"@timestamp": {"gte": start_time, "lt": end_time}}}
]
}
},
"aggs": {
"new_users": {
"composite": {
"size": page_size,
"sources": [
{"uid": {"terms": {"field": "uid.keyword"}}},
{"auth_id": {"terms": {"field": "auth.id.keyword"}}}
]
},
"aggs": {
"first_create": {"min": {"field": "@timestamp"}}
}
}
}
}
# 신규 생성된 유저 수집
new_user_map = {} # uid -> {'auth_id': ..., 'create_time': ...}
page_count = 0
while True:
page_count += 1
query = create_uid_query.copy()
if after_key:
query["aggs"]["new_users"]["composite"]["after"] = after_key
try:
logger.info(f"create_uid 페이지 {page_count} 처리 중...")
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-create_uid",
body=query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
buckets = response["aggregations"]["new_users"]["buckets"]
if not buckets:
break
for bucket in buckets:
uid = bucket["key"]["uid"]
auth_id = bucket["key"]["auth_id"]
first_create_utc = bucket["first_create"]["value_as_string"]
# 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리)
if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]:
new_user_map[uid] = {
"auth_id": auth_id,
"create_time": first_create_utc
}
logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨")
after_key = response["aggregations"]["new_users"].get("after_key")
if not after_key:
break
except Exception as e:
logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}")
break
logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨")
# Step 2: login_comp 인덱스에서 해당 유저들의 추가 정보 수집
if not new_user_map:
logger.warning("신규 유저가 없습니다.")
return cohort
# 유저 청크 단위로 처리
uid_list = list(new_user_map.keys())
chunk_size = 100
for i in range(0, len(uid_list), chunk_size):
chunk_uids = uid_list[i:i+chunk_size]
login_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"terms": {"uid.keyword": chunk_uids}}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "uid.keyword",
"size": chunk_size
},
"aggs": {
"first_login": {"min": {"field": "@timestamp"}},
"user_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["body.device_mod", "body.nickname"]
}
},
"latest_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname", "body.language"]
}
}
}
}
}
}
try:
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-login_comp",
body=login_query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
for bucket in response["aggregations"]["users"]["buckets"]:
uid = bucket["key"]
first_login_utc = bucket["first_login"]["value_as_string"]
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {}
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {}
# create_uid 정보와 병합
cohort[uid] = {
'auth_id': new_user_map[uid]["auth_id"],
'create_time_utc': new_user_map[uid]["create_time"],
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
'first_login_utc': first_login_utc,
'first_login_kst': format_kst_time(first_login_utc),
'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')),
'language': latest_info_hit.get('body', {}).get('language', 'N/A'),
'device': user_hit.get('body', {}).get('device_mod', 'N/A'),
'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
}
total_users += 1
except Exception as e:
logger.error(f"login_comp 정보 수집 중 오류: {e}")
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
logger.info("=" * 80)
return cohort
# ==============================================================================
# 6. 수정된 msearch 쿼리 빌더 (nested → object)
# ==============================================================================
def build_fixed_msearch_queries(
uids: List[str],
cohort: Dict[str, Dict],
metrics_config: Dict[str, Dict]
) -> List[str]:
"""수정된 msearch 쿼리 생성 (nested 쿼리 제거)"""
queries = []
for uid in uids:
user_data = cohort[uid]
first_login_dt = user_data['first_login_dt']
# 시간 범위 정의
d0_start = user_data['first_login_utc']
d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
d1_start = d0_end
d1_end = (first_login_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ')
for metric_name, config in metrics_config.items():
# 시간 범위 선택
if config["time_range"] == "d0":
time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
else: # d1
time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}}
# 사용자 식별 필터
if "target_field" in config:
if config.get("use_body_target", False):
# body.target_uid로 검색 (플레이어가 당한 경우)
user_filter = {"term": {config["target_field"]: uid}}
else:
# 일반적인 uid 검색
user_filter = {"term": {config["target_field"]: uid}}
else:
user_filter = {"term": {"uid.keyword": uid}}
# 쿼리 필터 구성
query_filters = [user_filter, time_filter]
# 추가 필터 적용 (nested 제거)
if "filters" in config:
query_filters.extend(config["filters"])
# 조건부 필터
if config.get("agg_type") == "conditional_sum":
condition_filter = {"term": {config['condition_field']: config['condition_value']}}
query_filters.append(condition_filter)
# 쿼리 바디 구성
agg_type = config.get("agg_type", "count")
needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"]
query_body = {
"size": 0 if agg_type not in needs_docs else 1000,
"query": {"bool": {"filter": query_filters}}
}
# Aggregation 설정
if agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type
query_body["aggs"] = {
"metric_value": {agg_func: {"field": config["field"]}}
}
elif agg_type == "max_timestamp":
query_body["aggs"] = {
"metric_value": {"max": {"field": "@timestamp"}}
}
elif agg_type in ["first_value", "max_stalker"]:
# first_value는 항상 @timestamp로 정렬 (earliest first)
# max_stalker는 다른 로직이므로 별도 처리 필요
if agg_type == "first_value":
query_body["sort"] = [{"@timestamp": {"order": "asc"}}]
else: # max_stalker
sort_field = config.get("field", "@timestamp")
query_body["sort"] = [{sort_field: {"order": "desc"}}]
query_body["_source"] = [config.get("field", "@timestamp")]
elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]:
# 최고 레벨 우선, 같은 레벨이면 최신 순
level_field = config.get("level_field", "body.level")
stalker_field = config.get("stalker_field", "body.stalker")
query_body["sort"] = [
{level_field: {"order": "desc"}},
{"@timestamp": {"order": "desc"}}
]
query_body["_source"] = [level_field, stalker_field]
# NDJSON 추가
queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False))
queries.append(json.dumps(query_body, ensure_ascii=False))
return queries
# ==============================================================================
# 7. 세션 지표 계산 (동일)
# ==============================================================================
def calculate_comprehensive_session_metrics(
client: OpenSearch,
uids: List[str],
cohort_data: Dict[str, Dict]
) -> Dict[str, Dict]:
"""포괄적인 세션 지표 계산"""
def stream_user_events(uid: str) -> Generator[Dict, None, None]:
user_info = cohort_data.get(uid)
if not user_info:
return
first_login_dt = user_info['first_login_dt']
d0_end_dt = first_login_dt + timedelta(hours=24)
query = {
"query": {
"bool": {
"filter": [
{"term": {"uid.keyword": uid}},
{"range": {"@timestamp": {
"gte": user_info['first_login_utc'],
"lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
}}}
]
}
}
}
try:
for doc in scan(
client,
query=query,
index="ds-logs-live-*",
scroll=SCROLL_TIMEOUT,
_source=["@timestamp", "type"]
):
source = doc['_source']
event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00'))
if first_login_dt <= event_dt < d0_end_dt:
yield {
"time": event_dt,
"type": source.get('type', '').lower()
}
except Exception:
pass
results = {}
for uid in uids:
events = list(stream_user_events(uid))
session_metrics = {
'active_seconds': 0,
'total_playtime_minutes': 0,
'session_count': 0,
'avg_session_length': 0,
'logout_abnormal': 0
}
if len(events) < 2:
results[uid] = session_metrics
continue
events.sort(key=lambda x: x['time'])
login_events = [e for e in events if e['type'] == 'login_comp']
logout_events = [e for e in events if e['type'] == 'logout']
heartbeat_events = [e for e in events if e['type'] == 'heartbeat']
session_metrics['session_count'] = len(login_events)
# 활동 시간 계산
if heartbeat_events:
total_active_seconds = 0
for i in range(len(heartbeat_events) - 1):
time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time']
if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES):
total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600)
session_metrics['active_seconds'] = int(total_active_seconds)
# 총 플레이 시간
if login_events:
total_playtime_seconds = 0
session_lengths = []
for login_event in login_events:
login_time = login_event['time']
logout_time = None
for logout_event in logout_events:
if logout_event['time'] > login_time:
logout_time = logout_event['time']
break
if not logout_time and events:
logout_time = events[-1]['time']
if logout_time:
session_duration = (logout_time - login_time).total_seconds()
session_duration = min(session_duration, MAX_SESSION_HOURS * 3600)
total_playtime_seconds += session_duration
session_lengths.append(session_duration)
session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60)
if session_lengths:
session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60)
# 비정상 종료 체크
if login_events and logout_events:
last_login = max(login_events, key=lambda x: x['time'])['time']
last_logout = max(logout_events, key=lambda x: x['time'])['time']
session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0
elif login_events and not logout_events:
session_metrics['logout_abnormal'] = 1
results[uid] = session_metrics
return results
# ==============================================================================
# 8. 수정된 배치 처리
# ==============================================================================
def process_fixed_batch(
client: OpenSearch,
batch_uids: List[str],
cohort: Dict[str, Dict],
metrics_config: Dict[str, Dict]
) -> List[Dict]:
"""수정된 배치 처리 함수"""
logger.info(f"배치 처리 시작: {len(batch_uids)}")
try:
# 1. 세션 지표 계산
session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort)
# 2. 수정된 msearch 실행
msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config)
body_ndjson = "\n".join(msearch_queries) + "\n"
logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리")
msearch_responses = exponential_backoff_retry(
client.msearch,
body=body_ndjson,
request_timeout=300
).get('responses', [])
# 3. 결과 집계
batch_results = []
metrics_per_user = len(metrics_config)
for idx, uid in enumerate(batch_uids):
try:
user_data = cohort[uid]
user_session_metrics = session_metrics.get(uid, {})
user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user]
# 기본 정보 (create_time 추가)
result = {
'uid': uid,
'auth_id': user_data['auth_id'],
'nickname': user_data['nickname'],
'create_time': user_data.get('create_time_kst', user_data.get('first_login_kst')), # create_time이 있으면 사용, 없으면 first_login 사용
'first_login_time': user_data['first_login_kst'],
'retention_status': 'Retained_d0',
'language': user_data['language'],
'device': user_data['device'],
'active_seconds': user_session_metrics.get('active_seconds', 0),
'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0),
'session_count': user_session_metrics.get('session_count', 0),
'avg_session_length': user_session_metrics.get('avg_session_length', 0),
'logout_abnormal': user_session_metrics.get('logout_abnormal', 0)
}
# msearch 결과 파싱
metric_names = list(metrics_config.keys())
for i, metric_name in enumerate(metric_names):
if i >= len(user_responses):
result[metric_name] = 0
continue
response = user_responses[i]
config = metrics_config[metric_name]
if 'error' in response:
result[metric_name] = 0
continue
agg_type = config.get("agg_type", "count")
if agg_type == "count":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = hits_total
elif agg_type == "exists":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = 1 if hits_total > 0 else 0
elif agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value')
result[metric_name] = int(agg_value) if agg_value else 0
elif agg_type == "max_timestamp":
timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string')
result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None
elif agg_type == "first_value":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
field_name = config.get("field", "")
if field_name.startswith("body."):
keys = field_name.split(".")
value = source_value
for key in keys:
if isinstance(value, dict):
value = value.get(key)
if value is None:
# 문자열 필드는 빈 문자열 또는 적절한 기본값 설정
if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
value = ""
else:
value = 0
break
else:
value = 0
break
result[metric_name] = value
else:
result[metric_name] = source_value.get(field_name, 0)
else:
if metric_name == "dungeon_first_result":
result[metric_name] = 2 # 미플레이
elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
result[metric_name] = ""
else:
result[metric_name] = 0
elif agg_type == "max_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
field_name = config.get("field", "")
keys = field_name.split(".")
value = source_value
for key in keys:
value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A'
result[metric_name] = value
else:
result[metric_name] = 'N/A'
elif agg_type == "max_level_with_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
level_field = config.get("level_field", "body.level")
# body.level 값 추출
level_keys = level_field.split(".")
level_value = source_value
for key in level_keys:
level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0
result[metric_name] = level_value if level_value else 0
else:
result[metric_name] = 0
elif agg_type == "max_level_stalker_name":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
stalker_field = config.get("stalker_field", "body.stalker")
# body.stalker 값 추출
stalker_keys = stalker_field.split(".")
stalker_value = source_value
for key in stalker_keys:
stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0
result[metric_name] = stalker_value if stalker_value else 0
else:
result[metric_name] = 0
else:
result[metric_name] = 0
# 리텐션 상태 업데이트
if metric_name == "retention_check" and result[metric_name] > 0:
result['retention_status'] = 'Retained_d1'
# 계산된 지표
if result.get('dungeon_entry_count', 0) > 0:
escape_count = result.get('dungeon_escape_count', 0)
result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2)
# 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신)
dungeon_count = result['dungeon_entry_count']
try:
# 시간 범위 가져오기
first_login_dt = user_data['first_login_dt']
d0_start = user_data['first_login_utc']
d0_end = (first_login_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
# survival_end 인덱스에서 직접 데미지 합계 조회
damage_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"term": {"uid.keyword": uid}},
{"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
]
}
},
"aggs": {
"total_monster_damage": {"sum": {"field": "body.play_stats.damage_dealt_monster"}},
"total_player_damage": {"sum": {"field": "body.play_stats.damage_dealt_player"}}
}
}
damage_response = exponential_backoff_retry(
client.search,
index="ds-logs-live-survival_end",
body=damage_query,
request_timeout=DEFAULT_TIMEOUT
)
monster_damage = damage_response.get('aggregations', {}).get('total_monster_damage', {}).get('value', 0) or 0
player_damage = damage_response.get('aggregations', {}).get('total_player_damage', {}).get('value', 0) or 0
result['avg_damage_per_game_monster'] = round(monster_damage / dungeon_count, 2)
result['avg_damage_per_game_player'] = round(player_damage / dungeon_count, 2)
except Exception as e:
logger.warning(f"평균 데미지 계산 실패 (UID: {uid}): {e}")
result['avg_damage_per_game_monster'] = 0
result['avg_damage_per_game_player'] = 0
else:
result['dungeon_escape_rate'] = 0
result['avg_damage_per_game_monster'] = 0
result['avg_damage_per_game_player'] = 0
batch_results.append(result)
except Exception as e:
logger.warning(f"UID '{uid}' 처리 중 오류: {e}")
logger.info(f"배치 처리 완료: {len(batch_results)}명 성공")
return batch_results
except Exception as e:
logger.error(f"배치 처리 중 심각한 오류: {e}")
return []
# ==============================================================================
# 9. 병렬 처리 및 결과 저장 (동일)
# ==============================================================================
def process_cohort_fixed_parallel(
client: OpenSearch,
cohort: Dict[str, Dict],
batch_size: int,
max_workers: int,
metrics_config: Dict[str, Dict]
) -> List[Dict]:
"""수정된 병렬 처리"""
logger.info("=" * 80)
logger.info("2단계: 수정된 병렬 배치 처리")
logger.info(f"총 사용자: {len(cohort)}")
logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}")
logger.info(f"분석 지표: {len(metrics_config)}")
uid_list = list(cohort.keys())
chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)]
all_results = []
failed_chunks = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_chunk = {
executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config): chunk
for chunk in chunks
}
with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar:
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
batch_results = future.result(timeout=600)
if batch_results:
all_results.extend(batch_results)
else:
failed_chunks.append(chunk)
except Exception as e:
logger.warning(f"배치 처리 실패: {e}")
failed_chunks.append(chunk)
finally:
pbar.update(1)
# 실패한 청크 재처리
if failed_chunks:
logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...")
for chunk in failed_chunks:
try:
batch_results = process_fixed_batch(client, chunk, cohort, metrics_config)
all_results.extend(batch_results)
except Exception as e:
logger.error(f"재처리 실패: {e}")
logger.info(f"2단계 완료: {len(all_results)}명 처리 성공")
logger.info("=" * 80)
return all_results
def write_fixed_results(results: List[Dict], output_path: str) -> None:
"""수정된 결과 저장 (retention_d1 제거)"""
logger.info("=" * 80)
logger.info("3단계: 결과 저장")
if not results:
logger.error("저장할 결과 데이터가 없습니다.")
return
# 수정된 헤더 (create_time 추가)
headers = [
'uid', 'auth_id', 'nickname', 'create_time', 'first_login_time', 'retention_status', 'language', 'device',
'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal',
'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result',
'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time',
'total_armor_break', 'raid_play_count', 'escape_count',
'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count',
'avg_damage_per_game_monster', 'avg_damage_per_game_player',
'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed',
'guide_quest_stage', 'skill_points_earned',
'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count',
'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count',
'enchant_count', 'enchant_gold_spent',
'ingame_equip_count', 'object_interaction_count',
'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time',
'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count',
'exchange_register_count', 'exchange_use_count', 'coupon_used',
'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step',
'last_logout_time'
]
try:
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore')
writer.writeheader()
for result in results:
writer.writerow(result)
logger.info(f"결과 파일 저장 완료: {output_path}")
logger.info(f"{len(results)}명의 데이터 저장")
logger.info(f"분석 지표: {len(headers)}")
except Exception as e:
logger.error(f"CSV 파일 저장 실패: {e}")
# ==============================================================================
# 10. 메인 함수
# ==============================================================================
def parse_arguments() -> argparse.Namespace:
"""명령줄 인자 파싱"""
parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)")
parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)')
parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)')
parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기')
parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수')
parser.add_argument('--sample-size', type=int, help='샘플 분석 크기')
return parser.parse_args()
def main():
"""메인 실행 함수"""
global logger
overall_start_time = time.time()
start_kst = datetime.now(KST)
timestamp = start_kst.strftime('%Y%m%d_%H%M%S')
log_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.log"
csv_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.csv"
logger = setup_logging(str(log_file_path))
logger.info("=" * 80)
logger.info("던전 스토커즈 신규 유저 분석 v4.0 (OpenSearch 매핑 수정)")
logger.info("nested 쿼리 오류 수정 + retention_d1 필드 제거")
logger.info("=" * 80)
args = parse_arguments()
try:
start_kst_arg = datetime.fromisoformat(args.start_time)
end_kst_arg = datetime.fromisoformat(args.end_time)
start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}")
logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}")
except Exception as e:
logger.error(f"시간 형식 오류: {e}")
return
client = create_opensearch_client()
if not client:
return
try:
metrics_config = get_fixed_metrics_config()
logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}")
cohort = get_new_user_cohort_optimized(client, start_utc, end_utc)
if not cohort:
logger.error("분석할 신규 유저가 없습니다.")
return
if args.sample_size and args.sample_size < len(cohort):
uid_list = list(cohort.keys())
sampled_uids = uid_list[:args.sample_size]
cohort = {uid: cohort[uid] for uid in sampled_uids}
logger.info(f"샘플링 모드: {args.sample_size}명만 분석")
results = process_cohort_fixed_parallel(
client, cohort, args.batch_size, args.max_workers, metrics_config
)
write_fixed_results(results, str(csv_file_path))
# 요약
if results:
total_users = len(results)
retained_d1 = sum(1 for r in results if r.get('retention_status') == 'Retained_d1')
retention_rate = (retained_d1 / total_users) * 100
logger.info("=" * 80)
logger.info("분석 요약")
logger.info("=" * 80)
logger.info(f"총 신규 유저: {total_users:,}")
logger.info(f"D+1 리텐션: {retained_d1:,}명 ({retention_rate:.1f}%)")
logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}")
except KeyboardInterrupt:
logger.warning("사용자에 의해 중단되었습니다.")
except Exception as e:
logger.error(f"예상치 못한 오류: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
stop_timer_event.set()
end_time = time.time()
total_time = str(timedelta(seconds=int(end_time - overall_start_time)))
end_kst = datetime.now(KST)
logger.info("=" * 80)
logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}")
logger.info(f"총 소요 시간: {total_time}")
logger.info("수정된 분석 완료!")
logger.info("=" * 80)
if __name__ == "__main__":
main()