Files
ds_new_user_analy/ds_new_user_analy.py
Gnill82 846a39ac7a 수정 완료 사항
1. first_login_time 필드 제거 및 create_time으로 통합

  - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합
  - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경
  - CSV 헤더에서 first_login_time 제거

  2. auth.id 수집 로직 수정

  - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장
  - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가

  3. retention_status 판정을 create_time 기준으로 변경

  - 모든 시간 범위 계산을 create_time_dt 기준으로 변경
  - D+0, D+1 판정이 계정 생성 시점 기준으로 작동

  4. 세션 관련 지표를 --full 옵션으로 조건부 실행

  - --full 명령줄 옵션 추가
  - process_fixed_batch 함수에 include_session_metrics 파라미터 추가
  - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른
  실행 가능

  이제 스크립트는 다음과 같이 실행할 수 있습니다:
  - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00"
  - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00"
   --full

 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는:

  1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음)
  2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집
  3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00

1459 lines
57 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전)
OpenSearch 매핑 오류 수정: nested → object 쿼리 변경
수정사항:
- body 필드를 nested가 아닌 object로 처리
- retention_d1 중복 필드 제거
- 필드 경로 정확성 개선
- 키워드 필드 사용 (.keyword 추가)
작성자: Claude Code
기획서 기반: DS-new_users-analy.md
참고: OpenSearch 매핑 ds_opensearch_mappings.json
"""
import os
import csv
import json
import time
import yaml
import logging
import argparse
import threading
from datetime import datetime, timedelta, timezone
from collections import defaultdict, Counter
from typing import Dict, List, Optional, Tuple, Generator, Any, Set
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from pathlib import Path
import pandas as pd
from tqdm import tqdm
from opensearchpy import OpenSearch
from opensearchpy.helpers import scan
# ==============================================================================
# 1. 설정 및 상수
# ==============================================================================
# OpenSearch 연결 설정
OPENSEARCH_CONFIG = {
"host": "ds-opensearch.oneunivrs.com",
"port": 9200,
"auth": {
"username": "admin",
"password": "DHp5#r#GYQ9d"
},
"use_ssl": True,
"verify_certs": False,
"timeout": 120,
"max_retries": 3,
"headers": {"Connection": "close"}
}
# 한국 표준시 설정
KST = timezone(timedelta(hours=9))
# 성능 최적화 설정
DEFAULT_BATCH_SIZE = 500
DEFAULT_MAX_WORKERS = 6
DEFAULT_COMPOSITE_SIZE = 500
DEFAULT_TIMEOUT = 180
SCROLL_TIMEOUT = "5m"
SESSION_GAP_MINUTES = 5
MAX_SESSION_HOURS = 3
# 출력 디렉토리 설정
BASE_DIR = Path(__file__).parent
OUTPUT_DIR = BASE_DIR / "analysis_results"
OUTPUT_DIR.mkdir(exist_ok=True)
# 전역 변수
stop_timer_event = threading.Event()
logger = None
# ==============================================================================
# 2. 로깅 시스템 설정
# ==============================================================================
def setup_logging(log_file_path: str) -> logging.Logger:
"""한국 시간 기준 로깅 설정"""
class KSTFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
kst = timezone(timedelta(hours=9))
ct = datetime.fromtimestamp(record.created, kst)
return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00')
logger = logging.getLogger('NewUserAnalyzer')
logger.setLevel(logging.INFO)
# 기존 핸들러 제거
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 파일 핸들러
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 콘솔 핸들러
console_handler = logging.StreamHandler()
console_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# 외부 라이브러리 로그 억제
logging.getLogger('opensearchpy').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
return logger
# ==============================================================================
# 3. OpenSearch 연결 및 유틸리티
# ==============================================================================
def create_opensearch_client() -> Optional[OpenSearch]:
"""OpenSearch 클라이언트 생성"""
logger.info("OpenSearch 클러스터 연결 시도 중...")
try:
client = OpenSearch(
hosts=[{
"host": OPENSEARCH_CONFIG['host'],
"port": OPENSEARCH_CONFIG['port'],
"scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http"
}],
http_auth=(
OPENSEARCH_CONFIG['auth']['username'],
OPENSEARCH_CONFIG['auth']['password']
),
use_ssl=OPENSEARCH_CONFIG['use_ssl'],
verify_certs=OPENSEARCH_CONFIG['verify_certs'],
ssl_show_warn=False,
timeout=OPENSEARCH_CONFIG['timeout'],
max_retries=OPENSEARCH_CONFIG['max_retries'],
retry_on_timeout=True,
headers=OPENSEARCH_CONFIG['headers']
)
if not client.ping():
raise ConnectionError("클러스터 PING 실패")
logger.info("OpenSearch 연결 성공!")
return client
except Exception as e:
logger.error(f"OpenSearch 연결 실패: {e}")
return None
def exponential_backoff_retry(func, *args, **kwargs) -> Any:
"""지수 백오프 재시도 패턴"""
for delay in [1, 2, 4, 8, 16]:
try:
return func(*args, **kwargs)
except Exception as e:
if delay == 16:
raise e
logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})")
time.sleep(delay)
def format_kst_time(timestamp_str: str) -> str:
"""UTC 타임스탬프를 KST로 변환"""
try:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
kst_dt = dt.astimezone(KST)
return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00')
except:
return timestamp_str
# ==============================================================================
# 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반)
# ==============================================================================
def get_fixed_metrics_config() -> Dict[str, Dict]:
"""OpenSearch 매핑에 기반한 수정된 분석 지표"""
return {
# ==================== 3.2 플레이 시간 및 세션 ====================
"session_count": {
"index": "ds-logs-live-login_comp",
"time_range": "d0",
"agg_type": "count"
},
"last_logout_time": {
"index": "ds-logs-live-logout",
"time_range": "d0",
"agg_type": "max_timestamp"
},
# ==================== 3.3 던전 플레이 성과 ====================
"dungeon_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count"
},
"dungeon_first_mode": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.dungeon_mode"
},
"dungeon_first_stalker": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.stalker_name"
},
"dungeon_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result"
},
"dungeon_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}]
},
"avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime"
},
"max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime"
},
"total_armor_break": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.armor_break_cnt"
},
"raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play"
},
"escape_count": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.inter_type": 0}}]
},
# ==================== 3.4 전투 성과 ====================
"monster_kill_count": {
"index": "ds-logs-live-monster_kill",
"time_range": "d0",
"agg_type": "count"
},
"player_kill_count": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "uid.keyword"
},
"player_killed_count": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid.keyword",
"use_body_target": True
},
"death_count": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}]
},
# ==================== 3.5 진행도 및 성장 ====================
"level_max": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_level_with_stalker",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"level_max_stalker": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_level_stalker_name",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"tutorial_entry": {
"index": "ds-logs-live-tutorial_entry",
"time_range": "d0",
"agg_type": "count"
},
"tutorial_completed": {
"index": "ds-logs-live-log_tutorial",
"time_range": "d0",
"agg_type": "count",
"filters": [
{"term": {"body.action_type.keyword": "Complete"}},
{"term": {"body.stage_type.keyword": "result"}}
]
},
"guide_quest_stage": {
"index": "ds-logs-live-guide_quest_stage",
"time_range": "d0",
"agg_type": "max",
"field": "body.guide_step"
},
"skill_points_earned": {
"index": "ds-logs-live-skill_point_get",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.6 아이템 및 경제 ====================
"items_obtained_count": {
"index": "ds-logs-live-item_get",
"time_range": "d0",
"agg_type": "count"
},
"highest_item_grade": {
"index": "ds-logs-live-item_get",
"time_range": "d0",
"agg_type": "max",
"field": "body.item_grade"
},
"blueprint_use_count": {
"index": "ds-logs-live-craft_from_blueprint",
"time_range": "d0",
"agg_type": "count"
},
"shop_buy_count": {
"index": "ds-logs-live-shop_buy",
"time_range": "d0",
"agg_type": "count"
},
"shop_sell_count": {
"index": "ds-logs-live-shop_sell",
"time_range": "d0",
"agg_type": "count"
},
"gold_spent": {
"index": "ds-logs-live-shop_buy",
"time_range": "d0",
"agg_type": "conditional_sum",
"field": "body.amt",
"condition_field": "body.cost_id.keyword",
"condition_value": "i108000"
},
"gold_earned": {
"index": "ds-logs-live-shop_sell",
"time_range": "d0",
"agg_type": "conditional_sum",
"field": "body.amt",
"condition_field": "body.cost_id.keyword",
"condition_value": "i108000"
},
"storage_in_count": {
"index": "ds-logs-live-storage_use",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": 1}}]
},
"storage_out_count": {
"index": "ds-logs-live-storage_use",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": -1}}]
},
"enchant_count": {
"index": "ds-logs-live-enchant",
"time_range": "d0",
"agg_type": "count"
},
"enchant_gold_spent": {
"index": "ds-logs-live-enchant",
"time_range": "d0",
"agg_type": "sum",
"field": "body.amt"
},
# ==================== 3.7 장비 관리 ====================
"ingame_equip_count": {
"index": "ds-logs-live-item_ingame_equip",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.base_type": 2}}]
},
# ==================== 3.8 오브젝트 상호작용 ====================
"object_interaction_count": {
"index": "ds-logs-live-obj_inter",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.9 매칭 시스템 ====================
"matching_start_count": {
"index": "ds-logs-live-matching_start",
"time_range": "d0",
"agg_type": "count"
},
"matching_complete_count": {
"index": "ds-logs-live-matching_complete",
"time_range": "d0",
"agg_type": "count"
},
"matching_failed_count": {
"index": "ds-logs-live-matching_failed",
"time_range": "d0",
"agg_type": "count"
},
"avg_matching_time": {
"index": "ds-logs-live-matching_complete",
"time_range": "d0",
"agg_type": "avg",
"field": "body.matchingtime"
},
# ==================== 3.10 소셜 활동 ====================
"friend_add_count": {
"index": "ds-logs-live-friend",
"time_range": "d0",
"agg_type": "count",
"filters": [
{"term": {"body.oper_type": 0}},
{"term": {"body.friend_type": 0}}
]
},
"friend_delete_count": {
"index": "ds-logs-live-friend",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": 1}}]
},
"party_invite_sent": {
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count"
},
"party_invite_received": {
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid"
},
"mail_read_count": {
"index": "ds-logs-live-mail_read",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.11 거래소 및 경제 활동 ====================
"exchange_register_count": {
"index": "ds-logs-live-exchange_reg",
"time_range": "d0",
"agg_type": "count"
},
"exchange_use_count": {
"index": "ds-logs-live-exchange_use",
"time_range": "d0",
"agg_type": "count"
},
"coupon_used": {
"index": "ds-logs-live-coupon",
"time_range": "d0",
"agg_type": "exists"
},
# ==================== 3.12 기타 활동 ====================
"button_click_count": {
"index": "ds-logs-live-button_click",
"time_range": "d0",
"agg_type": "count"
},
"hideout_upgrade_count": {
"index": "ds-logs-live-log_hideout_upgrade",
"time_range": "d0",
"agg_type": "count"
},
"hideout_max_level": {
"index": "ds-logs-live-log_hideout_upgrade",
"time_range": "d0",
"agg_type": "max",
"field": "body.hideout_level"
},
"season_pass_buy": {
"index": "ds-logs-live-season_pass",
"time_range": "d0",
"agg_type": "exists",
"filters": [{"term": {"body.cause": 1}}]
},
"season_pass_max_step": {
"index": "ds-logs-live-season_pass",
"time_range": "d0",
"agg_type": "max",
"field": "body.season_pass_step"
},
# ==================== 리텐션 판정 (retention_d1 삭제됨) ====================
"retention_check": {
"index": "ds-logs-live-login_comp",
"time_range": "d1",
"agg_type": "exists"
}
}
# ==============================================================================
# 5. Composite Aggregation 신규 유저 코호트 선정 (동일)
# ==============================================================================
def get_new_user_cohort_optimized(
client: OpenSearch,
start_time: str,
end_time: str,
page_size: int = DEFAULT_COMPOSITE_SIZE
) -> Dict[str, Dict]:
"""Composite Aggregation을 활용한 신규 유저 코호트 선정
ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별
"""
logger.info("=" * 80)
logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준)")
logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}")
logger.info(f"페이지 크기: {page_size}")
cohort = {}
after_key = None
total_users = 0
# Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출
create_uid_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"@timestamp": {"gte": start_time, "lt": end_time}}}
]
}
},
"aggs": {
"new_users": {
"composite": {
"size": page_size,
"sources": [
{"uid": {"terms": {"field": "uid.keyword"}}},
{"auth_id": {"terms": {"field": "auth.id.keyword"}}}
]
},
"aggs": {
"first_create": {"min": {"field": "@timestamp"}}
}
}
}
}
# 신규 생성된 유저 수집
new_user_map = {} # uid -> {'auth_id': ..., 'create_time': ...}
page_count = 0
while True:
page_count += 1
query = create_uid_query.copy()
if after_key:
query["aggs"]["new_users"]["composite"]["after"] = after_key
try:
logger.info(f"create_uid 페이지 {page_count} 처리 중...")
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-create_uid",
body=query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
buckets = response["aggregations"]["new_users"]["buckets"]
if not buckets:
break
for bucket in buckets:
uid = bucket["key"]["uid"]
auth_id = bucket["key"]["auth_id"] # 빈값일 수 있음
first_create_utc = bucket["first_create"]["value_as_string"]
# 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리)
if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]:
new_user_map[uid] = {
"auth_id": None, # heartbeat에서 수집 예정
"create_time": first_create_utc
}
logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨")
after_key = response["aggregations"]["new_users"].get("after_key")
if not after_key:
break
except Exception as e:
logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}")
break
logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨")
# Step 2: heartbeat 인덱스에서 auth.id 수집
if not new_user_map:
logger.warning("신규 유저가 없습니다.")
return cohort
logger.info("heartbeat 인덱스에서 auth.id 수집 중...")
uid_list = list(new_user_map.keys())
chunk_size = 100
for i in range(0, len(uid_list), chunk_size):
chunk_uids = uid_list[i:i+chunk_size]
# heartbeat에서 auth.id 수집
heartbeat_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"terms": {"uid.keyword": chunk_uids}}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "uid.keyword",
"size": chunk_size
},
"aggs": {
"auth_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["auth.id"]
}
}
}
}
}
}
try:
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-heartbeat",
body=heartbeat_query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
for bucket in response["aggregations"]["users"]["buckets"]:
uid = bucket["key"]
if bucket["auth_info"]["hits"]["hits"]:
auth_id = bucket["auth_info"]["hits"]["hits"][0]["_source"].get("auth", {}).get("id")
if auth_id and uid in new_user_map:
new_user_map[uid]["auth_id"] = auth_id
except Exception as e:
logger.error(f"heartbeat에서 auth.id 수집 중 오류: {e}")
# auth.id 수집 상태 확인
auth_id_count = sum(1 for uid in new_user_map if new_user_map[uid]["auth_id"] is not None)
logger.info(f"auth.id 수집 완료: {auth_id_count}/{len(new_user_map)}")
# Step 3: login_comp 인덱스에서 추가 정보 수집
logger.info("login_comp 인덱스에서 추가 정보 수집 중...")
# 유저 청크 단위로 처리
for i in range(0, len(uid_list), chunk_size):
chunk_uids = uid_list[i:i+chunk_size]
login_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"terms": {"uid.keyword": chunk_uids}}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "uid.keyword",
"size": chunk_size
},
"aggs": {
"first_login": {"min": {"field": "@timestamp"}},
"user_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["body.device_mod", "body.nickname"]
}
},
"latest_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname", "body.language"]
}
}
}
}
}
}
try:
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-login_comp",
body=login_query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
for bucket in response["aggregations"]["users"]["buckets"]:
uid = bucket["key"]
first_login_utc = bucket["first_login"]["value_as_string"]
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {}
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {}
# create_uid 정보와 병합 (first_login 제거, create_time으로 통합)
# auth_id는 heartbeat에서 수집되었거나 N/A
cohort[uid] = {
'auth_id': new_user_map[uid]["auth_id"] or 'N/A',
'create_time_utc': new_user_map[uid]["create_time"],
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')),
'language': latest_info_hit.get('body', {}).get('language', 'N/A'),
'device': user_hit.get('body', {}).get('device_mod', 'N/A'),
'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
}
total_users += 1
except Exception as e:
logger.error(f"login_comp 정보 수집 중 오류: {e}")
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
logger.info("=" * 80)
return cohort
# ==============================================================================
# 6. 수정된 msearch 쿼리 빌더 (nested → object)
# ==============================================================================
def build_fixed_msearch_queries(
uids: List[str],
cohort: Dict[str, Dict],
metrics_config: Dict[str, Dict]
) -> List[str]:
"""수정된 msearch 쿼리 생성 (nested 쿼리 제거)"""
queries = []
for uid in uids:
user_data = cohort[uid]
create_time_dt = user_data['create_time_dt']
# 시간 범위 정의 (create_time 기준)
d0_start = user_data['create_time_utc']
d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
d1_start = d0_end
d1_end = (create_time_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ')
for metric_name, config in metrics_config.items():
# 시간 범위 선택
if config["time_range"] == "d0":
time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
else: # d1
time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}}
# 사용자 식별 필터
if "target_field" in config:
if config.get("use_body_target", False):
# body.target_uid로 검색 (플레이어가 당한 경우)
user_filter = {"term": {config["target_field"]: uid}}
else:
# 일반적인 uid 검색
user_filter = {"term": {config["target_field"]: uid}}
else:
user_filter = {"term": {"uid.keyword": uid}}
# 쿼리 필터 구성
query_filters = [user_filter, time_filter]
# 추가 필터 적용 (nested 제거)
if "filters" in config:
query_filters.extend(config["filters"])
# 조건부 필터
if config.get("agg_type") == "conditional_sum":
condition_filter = {"term": {config['condition_field']: config['condition_value']}}
query_filters.append(condition_filter)
# 쿼리 바디 구성
agg_type = config.get("agg_type", "count")
needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"]
query_body = {
"size": 0 if agg_type not in needs_docs else 1000,
"query": {"bool": {"filter": query_filters}}
}
# Aggregation 설정
if agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type
query_body["aggs"] = {
"metric_value": {agg_func: {"field": config["field"]}}
}
elif agg_type == "max_timestamp":
query_body["aggs"] = {
"metric_value": {"max": {"field": "@timestamp"}}
}
elif agg_type in ["first_value", "max_stalker"]:
# first_value는 항상 @timestamp로 정렬 (earliest first)
# max_stalker는 다른 로직이므로 별도 처리 필요
if agg_type == "first_value":
query_body["sort"] = [{"@timestamp": {"order": "asc"}}]
else: # max_stalker
sort_field = config.get("field", "@timestamp")
query_body["sort"] = [{sort_field: {"order": "desc"}}]
query_body["_source"] = [config.get("field", "@timestamp")]
elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]:
# 최고 레벨 우선, 같은 레벨이면 최신 순
level_field = config.get("level_field", "body.level")
stalker_field = config.get("stalker_field", "body.stalker")
query_body["sort"] = [
{level_field: {"order": "desc"}},
{"@timestamp": {"order": "desc"}}
]
query_body["_source"] = [level_field, stalker_field]
# NDJSON 추가
queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False))
queries.append(json.dumps(query_body, ensure_ascii=False))
return queries
# ==============================================================================
# 7. 세션 지표 계산 (동일)
# ==============================================================================
def calculate_comprehensive_session_metrics(
client: OpenSearch,
uids: List[str],
cohort_data: Dict[str, Dict]
) -> Dict[str, Dict]:
"""포괄적인 세션 지표 계산"""
def stream_user_events(uid: str) -> Generator[Dict, None, None]:
user_info = cohort_data.get(uid)
if not user_info:
return
create_time_dt = user_info['create_time_dt']
d0_end_dt = create_time_dt + timedelta(hours=24)
query = {
"query": {
"bool": {
"filter": [
{"term": {"uid.keyword": uid}},
{"range": {"@timestamp": {
"gte": user_info['create_time_utc'],
"lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
}}}
]
}
}
}
try:
for doc in scan(
client,
query=query,
index="ds-logs-live-*",
scroll=SCROLL_TIMEOUT,
_source=["@timestamp", "type"]
):
source = doc['_source']
event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00'))
if create_time_dt <= event_dt < d0_end_dt:
yield {
"time": event_dt,
"type": source.get('type', '').lower()
}
except Exception:
pass
results = {}
for uid in uids:
events = list(stream_user_events(uid))
session_metrics = {
'active_seconds': 0,
'total_playtime_minutes': 0,
'session_count': 0,
'avg_session_length': 0,
'logout_abnormal': 0
}
if len(events) < 2:
results[uid] = session_metrics
continue
events.sort(key=lambda x: x['time'])
login_events = [e for e in events if e['type'] == 'login_comp']
logout_events = [e for e in events if e['type'] == 'logout']
heartbeat_events = [e for e in events if e['type'] == 'heartbeat']
session_metrics['session_count'] = len(login_events)
# 활동 시간 계산
if heartbeat_events:
total_active_seconds = 0
for i in range(len(heartbeat_events) - 1):
time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time']
if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES):
total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600)
session_metrics['active_seconds'] = int(total_active_seconds)
# 총 플레이 시간
if login_events:
total_playtime_seconds = 0
session_lengths = []
for login_event in login_events:
login_time = login_event['time']
logout_time = None
for logout_event in logout_events:
if logout_event['time'] > login_time:
logout_time = logout_event['time']
break
if not logout_time and events:
logout_time = events[-1]['time']
if logout_time:
session_duration = (logout_time - login_time).total_seconds()
session_duration = min(session_duration, MAX_SESSION_HOURS * 3600)
total_playtime_seconds += session_duration
session_lengths.append(session_duration)
session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60)
if session_lengths:
session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60)
# 비정상 종료 체크
if login_events and logout_events:
last_login = max(login_events, key=lambda x: x['time'])['time']
last_logout = max(logout_events, key=lambda x: x['time'])['time']
session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0
elif login_events and not logout_events:
session_metrics['logout_abnormal'] = 1
results[uid] = session_metrics
return results
# ==============================================================================
# 8. 수정된 배치 처리
# ==============================================================================
def process_fixed_batch(
client: OpenSearch,
batch_uids: List[str],
cohort: Dict[str, Dict],
metrics_config: Dict[str, Dict],
include_session_metrics: bool = False
) -> List[Dict]:
"""수정된 배치 처리 함수"""
logger.info(f"배치 처리 시작: {len(batch_uids)}")
try:
# 1. 세션 지표 계산 (--full 옵션일 때만)
if include_session_metrics:
session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort)
else:
# 기본값으로 빈 딕셔너리 생성
session_metrics = {uid: {
'active_seconds': 0,
'total_playtime_minutes': 0,
'session_count': 0,
'avg_session_length': 0,
'logout_abnormal': 0
} for uid in batch_uids}
# 2. 수정된 msearch 실행
msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config)
body_ndjson = "\n".join(msearch_queries) + "\n"
logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리")
msearch_responses = exponential_backoff_retry(
client.msearch,
body=body_ndjson,
request_timeout=300
).get('responses', [])
# 3. 결과 집계
batch_results = []
metrics_per_user = len(metrics_config)
for idx, uid in enumerate(batch_uids):
try:
user_data = cohort[uid]
user_session_metrics = session_metrics.get(uid, {})
user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user]
# 기본 정보 (first_login_time 제거, create_time으로 통합)
result = {
'uid': uid,
'auth_id': user_data.get('auth_id', 'N/A'), # auth_id 기본값 처리
'nickname': user_data['nickname'],
'create_time': user_data.get('create_time_kst', 'N/A'),
'retention_status': 'Retained_d0', # 기본값, 나중에 업데이트
'language': user_data['language'],
'device': user_data['device'],
'active_seconds': user_session_metrics.get('active_seconds', 0),
'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0),
'session_count': user_session_metrics.get('session_count', 0),
'avg_session_length': user_session_metrics.get('avg_session_length', 0),
'logout_abnormal': user_session_metrics.get('logout_abnormal', 0)
}
# msearch 결과 파싱
metric_names = list(metrics_config.keys())
for i, metric_name in enumerate(metric_names):
if i >= len(user_responses):
result[metric_name] = 0
continue
response = user_responses[i]
config = metrics_config[metric_name]
if 'error' in response:
result[metric_name] = 0
continue
agg_type = config.get("agg_type", "count")
if agg_type == "count":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = hits_total
elif agg_type == "exists":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = 1 if hits_total > 0 else 0
elif agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value')
result[metric_name] = int(agg_value) if agg_value else 0
elif agg_type == "max_timestamp":
timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string')
result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None
elif agg_type == "first_value":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
field_name = config.get("field", "")
if field_name.startswith("body."):
keys = field_name.split(".")
value = source_value
for key in keys:
if isinstance(value, dict):
value = value.get(key)
if value is None:
# 문자열 필드는 빈 문자열 또는 적절한 기본값 설정
if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
value = ""
else:
value = 0
break
else:
value = 0
break
result[metric_name] = value
else:
result[metric_name] = source_value.get(field_name, 0)
else:
if metric_name == "dungeon_first_result":
result[metric_name] = 2 # 미플레이
elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
result[metric_name] = ""
else:
result[metric_name] = 0
elif agg_type == "max_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
field_name = config.get("field", "")
keys = field_name.split(".")
value = source_value
for key in keys:
value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A'
result[metric_name] = value
else:
result[metric_name] = 'N/A'
elif agg_type == "max_level_with_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
level_field = config.get("level_field", "body.level")
# body.level 값 추출
level_keys = level_field.split(".")
level_value = source_value
for key in level_keys:
level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0
result[metric_name] = level_value if level_value else 0
else:
result[metric_name] = 0
elif agg_type == "max_level_stalker_name":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
stalker_field = config.get("stalker_field", "body.stalker")
# body.stalker 값 추출
stalker_keys = stalker_field.split(".")
stalker_value = source_value
for key in stalker_keys:
stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0
result[metric_name] = stalker_value if stalker_value else 0
else:
result[metric_name] = 0
else:
result[metric_name] = 0
# 리텐션 상태 업데이트
if metric_name == "retention_check" and result[metric_name] > 0:
result['retention_status'] = 'Retained_d1'
# 계산된 지표
if result.get('dungeon_entry_count', 0) > 0:
escape_count = result.get('dungeon_escape_count', 0)
result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2)
# 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신)
dungeon_count = result['dungeon_entry_count']
try:
# 시간 범위 가져오기 (create_time 기준)
create_time_dt = user_data['create_time_dt']
d0_start = user_data['create_time_utc']
d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
# survival_end 인덱스에서 직접 데미지 합계 조회
damage_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"term": {"uid.keyword": uid}},
{"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
]
}
},
"aggs": {
"total_monster_damage": {"sum": {"field": "body.play_stats.damage_dealt_monster"}},
"total_player_damage": {"sum": {"field": "body.play_stats.damage_dealt_player"}}
}
}
damage_response = exponential_backoff_retry(
client.search,
index="ds-logs-live-survival_end",
body=damage_query,
request_timeout=DEFAULT_TIMEOUT
)
monster_damage = damage_response.get('aggregations', {}).get('total_monster_damage', {}).get('value', 0) or 0
player_damage = damage_response.get('aggregations', {}).get('total_player_damage', {}).get('value', 0) or 0
result['avg_damage_per_game_monster'] = round(monster_damage / dungeon_count, 2)
result['avg_damage_per_game_player'] = round(player_damage / dungeon_count, 2)
except Exception as e:
logger.warning(f"평균 데미지 계산 실패 (UID: {uid}): {e}")
result['avg_damage_per_game_monster'] = 0
result['avg_damage_per_game_player'] = 0
else:
result['dungeon_escape_rate'] = 0
result['avg_damage_per_game_monster'] = 0
result['avg_damage_per_game_player'] = 0
batch_results.append(result)
except Exception as e:
logger.warning(f"UID '{uid}' 처리 중 오류: {e}")
logger.info(f"배치 처리 완료: {len(batch_results)}명 성공")
return batch_results
except Exception as e:
logger.error(f"배치 처리 중 심각한 오류: {e}")
return []
# ==============================================================================
# 9. 병렬 처리 및 결과 저장 (동일)
# ==============================================================================
def process_cohort_fixed_parallel(
client: OpenSearch,
cohort: Dict[str, Dict],
batch_size: int,
max_workers: int,
metrics_config: Dict[str, Dict],
include_session_metrics: bool = False
) -> List[Dict]:
"""수정된 병렬 처리"""
logger.info("=" * 80)
logger.info("2단계: 수정된 병렬 배치 처리")
logger.info(f"총 사용자: {len(cohort)}")
logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}")
logger.info(f"분석 지표: {len(metrics_config)}")
logger.info(f"세션 지표 포함: {'' if include_session_metrics else '아니오'}")
uid_list = list(cohort.keys())
chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)]
all_results = []
failed_chunks = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_chunk = {
executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config, include_session_metrics): chunk
for chunk in chunks
}
with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar:
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
batch_results = future.result(timeout=600)
if batch_results:
all_results.extend(batch_results)
else:
failed_chunks.append(chunk)
except Exception as e:
logger.warning(f"배치 처리 실패: {e}")
failed_chunks.append(chunk)
finally:
pbar.update(1)
# 실패한 청크 재처리
if failed_chunks:
logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...")
for chunk in failed_chunks:
try:
batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics)
all_results.extend(batch_results)
except Exception as e:
logger.error(f"재처리 실패: {e}")
logger.info(f"2단계 완료: {len(all_results)}명 처리 성공")
logger.info("=" * 80)
return all_results
def write_fixed_results(results: List[Dict], output_path: str) -> None:
"""수정된 결과 저장 (retention_d1 제거)"""
logger.info("=" * 80)
logger.info("3단계: 결과 저장")
if not results:
logger.error("저장할 결과 데이터가 없습니다.")
return
# 수정된 헤더 (first_login_time 제거, create_time만 사용)
headers = [
'uid', 'auth_id', 'nickname', 'create_time', 'retention_status', 'language', 'device',
'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal',
'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result',
'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time',
'total_armor_break', 'raid_play_count', 'escape_count',
'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count',
'avg_damage_per_game_monster', 'avg_damage_per_game_player',
'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed',
'guide_quest_stage', 'skill_points_earned',
'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count',
'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count',
'enchant_count', 'enchant_gold_spent',
'ingame_equip_count', 'object_interaction_count',
'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time',
'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count',
'exchange_register_count', 'exchange_use_count', 'coupon_used',
'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step',
'last_logout_time'
]
try:
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore')
writer.writeheader()
for result in results:
writer.writerow(result)
logger.info(f"결과 파일 저장 완료: {output_path}")
logger.info(f"{len(results)}명의 데이터 저장")
logger.info(f"분석 지표: {len(headers)}")
except Exception as e:
logger.error(f"CSV 파일 저장 실패: {e}")
# ==============================================================================
# 10. 메인 함수
# ==============================================================================
def parse_arguments() -> argparse.Namespace:
"""명령줄 인자 파싱"""
parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)")
parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)')
parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)')
parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기')
parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수')
parser.add_argument('--sample-size', type=int, help='샘플 분석 크기')
parser.add_argument('--full', action='store_true', help='세션 관련 지표 포함한 전체 분석')
return parser.parse_args()
def main():
"""메인 실행 함수"""
global logger
overall_start_time = time.time()
start_kst = datetime.now(KST)
timestamp = start_kst.strftime('%Y%m%d_%H%M%S')
log_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.log"
csv_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.csv"
logger = setup_logging(str(log_file_path))
logger.info("=" * 80)
logger.info("던전 스토커즈 신규 유저 분석 v5.0")
logger.info("create_uid 기반 신규 유저 판별 + 세션 지표 조건부 수집")
logger.info("=" * 80)
args = parse_arguments()
try:
start_kst_arg = datetime.fromisoformat(args.start_time)
end_kst_arg = datetime.fromisoformat(args.end_time)
start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}")
logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}")
logger.info(f"세션 지표 포함: {'예 (--full 옵션 사용)' if args.full else '아니오 (빠른 분석 모드)'}")
except Exception as e:
logger.error(f"시간 형식 오류: {e}")
return
client = create_opensearch_client()
if not client:
return
try:
metrics_config = get_fixed_metrics_config()
logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}")
cohort = get_new_user_cohort_optimized(client, start_utc, end_utc)
if not cohort:
logger.error("분석할 신규 유저가 없습니다.")
return
if args.sample_size and args.sample_size < len(cohort):
uid_list = list(cohort.keys())
sampled_uids = uid_list[:args.sample_size]
cohort = {uid: cohort[uid] for uid in sampled_uids}
logger.info(f"샘플링 모드: {args.sample_size}명만 분석")
results = process_cohort_fixed_parallel(
client, cohort, args.batch_size, args.max_workers, metrics_config, args.full
)
write_fixed_results(results, str(csv_file_path))
# 요약
if results:
total_users = len(results)
retained_d1 = sum(1 for r in results if r.get('retention_status') == 'Retained_d1')
retention_rate = (retained_d1 / total_users) * 100
logger.info("=" * 80)
logger.info("분석 요약")
logger.info("=" * 80)
logger.info(f"총 신규 유저: {total_users:,}")
logger.info(f"D+1 리텐션: {retained_d1:,}명 ({retention_rate:.1f}%)")
logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}")
except KeyboardInterrupt:
logger.warning("사용자에 의해 중단되었습니다.")
except Exception as e:
logger.error(f"예상치 못한 오류: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
stop_timer_event.set()
end_time = time.time()
total_time = str(timedelta(seconds=int(end_time - overall_start_time)))
end_kst = datetime.now(KST)
logger.info("=" * 80)
logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}")
logger.info(f"총 소요 시간: {total_time}")
logger.info("수정된 분석 완료!")
logger.info("=" * 80)
if __name__ == "__main__":
main()