Files
ds_new_user_analy/ds_new_user_analy.py

2036 lines
83 KiB
Python
Raw Normal View History

2025-08-27 01:10:50 +09:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전)
OpenSearch 매핑 오류 수정: nested object 쿼리 변경
2025-08-27 01:10:50 +09:00
수정사항:
- body 필드를 nested가 아닌 object로 처리
- retention_d1 중복 필드 제거
- 필드 경로 정확성 개선
- 키워드 필드 사용 (.keyword 추가)
2025-08-27 01:10:50 +09:00
작성자: Claude Code
기획서 기반: DS-new_users-analy.md
참고: OpenSearch 매핑 ds_opensearch_mappings.json
2025-08-27 01:10:50 +09:00
"""
import os
import sys
2025-08-27 01:10:50 +09:00
import csv
import json
import time
import yaml
import logging
2025-08-27 01:10:50 +09:00
import argparse
import threading
from datetime import datetime, timedelta, timezone
from collections import defaultdict, Counter
from typing import Dict, List, Optional, Tuple, Generator, Any, Set
2025-08-27 01:10:50 +09:00
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from pathlib import Path
2025-08-27 01:10:50 +09:00
import pandas as pd
from opensearchpy import OpenSearch
from opensearchpy.helpers import scan
2025-08-27 01:10:50 +09:00
# ==============================================================================
# 1. 설정 및 상수
# ==============================================================================
# OpenSearch 연결 설정
2025-08-27 01:10:50 +09:00
OPENSEARCH_CONFIG = {
"host": "ds-opensearch.oneunivrs.com",
"port": 9200,
"auth": {
"username": "admin",
"password": "DHp5#r#GYQ9d"
},
"use_ssl": True,
"verify_certs": False,
"timeout": 120,
2025-08-27 01:10:50 +09:00
"max_retries": 3,
"headers": {"Connection": "close"}
}
# 한국 표준시 설정
2025-08-27 01:10:50 +09:00
KST = timezone(timedelta(hours=9))
# 성능 최적화 설정 (오픈서치 스펙 기반 최적화)
DEFAULT_BATCH_SIZE = 1000
DEFAULT_MAX_WORKERS = 16
DEFAULT_COMPOSITE_SIZE = 2000
DEFAULT_TIMEOUT = 180
2025-08-27 01:10:50 +09:00
SCROLL_TIMEOUT = "5m"
SESSION_GAP_MINUTES = 5
MAX_SESSION_HOURS = 3
2025-08-27 01:10:50 +09:00
# 출력 디렉토리 설정
BASE_DIR = Path(__file__).parent
OUTPUT_DIR = BASE_DIR / "analysis_results"
OUTPUT_DIR.mkdir(exist_ok=True)
2025-08-27 01:10:50 +09:00
# 전역 변수
2025-08-27 01:10:50 +09:00
stop_timer_event = threading.Event()
console_lock = threading.Lock() # 콘솔 출력용 스레드 락
logger = None
2025-08-27 01:10:50 +09:00
# ==============================================================================
# 2. 로깅 시스템 설정
# ==============================================================================
def setup_logging(log_file_path: str) -> logging.Logger:
"""한국 시간 기준 로깅 설정 (콘솔 간결 모드)"""
class KSTFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
kst = timezone(timedelta(hours=9))
ct = datetime.fromtimestamp(record.created, kst)
return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00')
class ConsoleFormatter(logging.Formatter):
"""Console-optimized formatter - 간결한 콘솔 출력"""
def format(self, record):
# WARNING 이상은 상세 표시
if record.levelno >= logging.WARNING:
return f"[{record.levelname}] {record.getMessage()}"
# 일반 INFO 로그는 중요한 것만 표시
msg = record.getMessage()
important_keywords = [
"던전 스토커즈 신규 유저 분석",
"분석 기간",
"세션 지표 포함",
"총 신규 유저",
"분석 완료",
"분석 요약",
"리텐션 퍼널",
"D0→D1", "D1→D2", "D2→D3", "D3→D4", "D4→D5", "D5→D6", "D6→D7",
"총 소요 시간",
"Retained_d",
"평균 활동 시간",
"="*20 # 구분선
]
if any(keyword in msg for keyword in important_keywords):
return msg
# 중요하지 않은 로그는 빈 문자열 반환 (출력 안함)
return ""
# 기본 로거 설정
logger = logging.getLogger('NewUserAnalyzer')
logger.setLevel(logging.INFO)
# 기존 핸들러 제거
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 파일 핸들러 (상세 로그)
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 콘솔 핸들러 (간결 로그)
console_handler = logging.StreamHandler()
console_formatter = ConsoleFormatter()
console_handler.setFormatter(console_formatter)
# 빈 문자열은 출력하지 않도록 필터링
console_handler.addFilter(lambda record: console_formatter.format(record) != "")
logger.addHandler(console_handler)
# 외부 라이브러리 로그 억제
logging.getLogger('opensearchpy').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
return logger
def print_progress(message: str, end_with_newline: bool = False):
"""진행률을 제자리에서 갱신하여 출력 (세부 진행률용)"""
with console_lock:
try:
if end_with_newline:
print(f"\r{message}")
sys.stdout.flush()
else:
print(f"\r{message}", end="", flush=True)
except (OSError, IOError) as e:
# 파이프 오류 시 로깅으로 대체
if logger:
logger.info(f"PROGRESS: {message}")
pass
def print_stage(message: str):
"""주요 단계 시작을 새 줄로 표시"""
with console_lock:
try:
print(f"\n{message}")
sys.stdout.flush()
except (OSError, IOError) as e:
# 파이프 오류 시 로깅으로 대체
if logger:
logger.info(f"STAGE: {message}")
pass
def print_complete(message: str):
"""단계 완료를 새 줄로 표시"""
with console_lock:
try:
print(f"\r{message}")
sys.stdout.flush()
except (OSError, IOError) as e:
# 파이프 오류 시 로깅으로 대체
if logger:
logger.info(f"COMPLETE: {message}")
pass
class ProcessingTimer:
"""긴 작업 중 타임아웃 메시지 표시용 타이머"""
def __init__(self, timeout_seconds: int = 10):
self.timeout_seconds = timeout_seconds
self.timer = None
self.is_active = False
self.last_message = ""
def start(self, message: str):
"""타이머 시작"""
self.stop() # 기존 타이머 정리
self.last_message = message
self.is_active = True
self._schedule_next()
def stop(self):
"""타이머 중지"""
self.is_active = False
if self.timer:
self.timer.cancel()
self.timer = None
def _schedule_next(self):
"""다음 타임아웃 메시지 스케줄링"""
if self.is_active:
def show_timeout():
if self.is_active:
print(f"\r{self.last_message} (처리 중...)", end="", flush=True)
self._schedule_next()
self.timer = threading.Timer(self.timeout_seconds, show_timeout)
self.timer.start()
# 전역 타이머 인스턴스
processing_timer = ProcessingTimer()
# ==============================================================================
# 3. OpenSearch 연결 및 유틸리티
2025-08-27 01:10:50 +09:00
# ==============================================================================
def create_opensearch_client() -> Optional[OpenSearch]:
"""OpenSearch 클라이언트 생성"""
logger.info("OpenSearch 클러스터 연결 시도 중...")
2025-08-27 01:10:50 +09:00
try:
client = OpenSearch(
hosts=[{
"host": OPENSEARCH_CONFIG['host'],
"port": OPENSEARCH_CONFIG['port'],
"scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http"
}],
http_auth=(
OPENSEARCH_CONFIG['auth']['username'],
OPENSEARCH_CONFIG['auth']['password']
),
use_ssl=OPENSEARCH_CONFIG['use_ssl'],
verify_certs=OPENSEARCH_CONFIG['verify_certs'],
ssl_show_warn=False,
timeout=OPENSEARCH_CONFIG['timeout'],
max_retries=OPENSEARCH_CONFIG['max_retries'],
retry_on_timeout=True,
headers=OPENSEARCH_CONFIG['headers']
)
if not client.ping():
raise ConnectionError("클러스터 PING 실패")
2025-08-27 01:10:50 +09:00
logger.info("OpenSearch 연결 성공!")
2025-08-27 01:10:50 +09:00
return client
except Exception as e:
logger.error(f"OpenSearch 연결 실패: {e}")
2025-08-27 01:10:50 +09:00
return None
def exponential_backoff_retry(func, *args, **kwargs) -> Any:
"""지수 백오프 재시도 패턴"""
2025-08-27 01:10:50 +09:00
for delay in [1, 2, 4, 8, 16]:
try:
return func(*args, **kwargs)
except Exception as e:
if delay == 16:
2025-08-27 01:10:50 +09:00
raise e
logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})")
2025-08-27 01:10:50 +09:00
time.sleep(delay)
def format_kst_time(timestamp_str: str) -> str:
"""UTC 타임스탬프를 KST로 변환"""
try:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
kst_dt = dt.astimezone(KST)
return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00')
except:
return timestamp_str
2025-08-27 01:10:50 +09:00
# ==============================================================================
# 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반)
# ==============================================================================
def get_fixed_metrics_config() -> Dict[str, Dict]:
"""OpenSearch 매핑에 기반한 수정된 분석 지표"""
return {
# ==================== 3.2 플레이 시간 및 세션 ====================
"session_count": {
"index": "ds-logs-live-login_comp",
"time_range": "d0",
"agg_type": "count"
},
"last_logout_time": {
"index": "ds-logs-live-logout",
"time_range": "d0",
"agg_type": "max_timestamp"
},
# ==================== 3.3 던전 플레이 성과 (모드별) ====================
# 각 모드별 entry count
"COOP_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_entry_count": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
"dungeon_first_mode": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.dungeon_mode"
},
"dungeon_first_stalker": {
"index": "ds-logs-live-survival_sta",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.stalker_name"
},
"dungeon_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result"
},
# 각 모드별 첫 플레이 결과
"COOP_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_first_result": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "first_value",
"field": "body.result",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# 각 모드별 escape count
"COOP_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_escape_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.result": 1}}, {"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# 각 모드별 avg survival time
"COOP_avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_avg_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "avg",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# 각 모드별 max survival time
"COOP_max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_max_survival_time": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "max",
"field": "body.play_stats.playtime",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# 각 모드별 raid play count
"COOP_raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_raid_play_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.raid_play",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# ==================== 3.4 전투 성과 (모드별) ====================
# 각 모드별 monster kill count
"COOP_monster_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.monster_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_monster_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.monster_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_monster_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.monster_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_monster_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.monster_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_monster_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.monster_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# 각 모드별 player kill count
"COOP_player_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.player_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "COOP"}}]
},
"Solo_player_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.player_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Solo"}}]
},
"Survival_player_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.player_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival"}}]
},
"Survival_BOT_player_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.player_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_BOT"}}]
},
"Survival_Unprotected_player_kill_count": {
"index": "ds-logs-live-survival_end",
"time_range": "d0",
"agg_type": "sum",
"field": "body.play_stats.player_kill_cnt",
"filters": [{"term": {"body.dungeon_mode.keyword": "Survival_Unprotected"}}]
},
# ==================== 3.4.2 사망 원인 분석 ====================
"death_PK": {
"index": "ds-logs-live-player_kill",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid.keyword",
"use_body_target": True
},
"death_GiveUp": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.inter_type": 0}}]
},
"death_Mob": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.inter_type": 1}}]
},
"death_Trap": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.inter_type": 10}}]
},
"death_Red": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.inter_type": 11}}]
},
"death_Others": {
"index": "ds-logs-live-dead",
"time_range": "d0",
"agg_type": "count",
"filters": [{"bool": {"must_not": [{"terms": {"body.inter_type": [0, 1, 10, 11]}}]}}]
},
# ==================== 3.5 진행도 및 성장 ====================
"level_max": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_level_with_stalker",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"level_max_stalker": {
"index": "ds-logs-live-level_up",
"time_range": "d0",
"agg_type": "max_level_stalker_name",
"level_field": "body.level",
"stalker_field": "body.stalker"
},
"tutorial_entry": {
"index": "ds-logs-live-tutorial_entry",
"time_range": "d0",
"agg_type": "count"
},
"tutorial_completed": {
"index": "ds-logs-live-log_tutorial",
"time_range": "d0",
"agg_type": "count",
"filters": [
{"term": {"body.action_type.keyword": "Complete"}},
{"term": {"body.stage_type.keyword": "tutorial_escape_portal"}}
]
},
"guide_quest_stage": {
"index": "ds-logs-live-guide_quest_stage",
"time_range": "d0",
"agg_type": "max",
"field": "body.guide_step"
},
# ==================== 3.6 아이템 및 경제 ====================
"highest_item_grade": {
"index": "ds-logs-live-item_get",
"time_range": "d0",
"agg_type": "max",
"field": "body.item_grade",
"filters": [{"term": {"body.base_type": 2}}]
},
"blueprint_use_count": {
"index": "ds-logs-live-craft_from_blueprint",
"time_range": "d0",
"agg_type": "count"
},
"shop_buy_count": {
"index": "ds-logs-live-shop_buy",
"time_range": "d0",
"agg_type": "count"
},
"shop_sell_count": {
"index": "ds-logs-live-shop_sell",
"time_range": "d0",
"agg_type": "count"
},
"gold_spent": {
"index": "ds-logs-live-shop_buy",
"time_range": "d0",
"agg_type": "conditional_sum",
"field": "body.amt",
"condition_field": "body.cost_id.keyword",
"condition_value": "i108000"
},
"gold_earned": {
"index": "ds-logs-live-shop_sell",
"time_range": "d0",
"agg_type": "conditional_sum",
"field": "body.amt",
"condition_field": "body.cost_id.keyword",
"condition_value": "i108000"
},
"storage_in_count": {
"index": "ds-logs-live-storage_use",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": 1}}]
},
"storage_out_count": {
"index": "ds-logs-live-storage_use",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": -1}}]
},
"enchant_count": {
"index": "ds-logs-live-enchant",
"time_range": "d0",
"agg_type": "count"
},
"enchant_gold_spent": {
"index": "ds-logs-live-enchant",
"time_range": "d0",
"agg_type": "sum",
"field": "body.amt"
},
# ==================== 3.7 장비 관리 ====================
"ingame_equip_count": {
"index": "ds-logs-live-item_ingame_equip",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.base_type": 2}}]
},
# ==================== 3.8 오브젝트 상호작용 ====================
"object_interaction_count": {
"index": "ds-logs-live-obj_inter",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.9 매칭 시스템 ====================
"matching_start_count": {
"index": "ds-logs-live-matching_start",
"time_range": "d0",
"agg_type": "count"
},
"matching_complete_count": {
"index": "ds-logs-live-matching_complete",
"time_range": "d0",
"agg_type": "count"
},
"matching_failed_count": {
"index": "ds-logs-live-matching_failed",
"time_range": "d0",
"agg_type": "count"
},
"avg_matching_time": {
"index": "ds-logs-live-matching_complete",
"time_range": "d0",
"agg_type": "avg",
"field": "body.matchingtime"
},
# ==================== 3.10 소셜 활동 ====================
"friend_add_count": {
"index": "ds-logs-live-friend",
"time_range": "d0",
"agg_type": "count",
"filters": [
{"term": {"body.oper_type": 0}},
{"term": {"body.friend_type": 0}}
]
},
"friend_delete_count": {
"index": "ds-logs-live-friend",
"time_range": "d0",
"agg_type": "count",
"filters": [{"term": {"body.oper_type": 1}}]
},
"party_invite_sent": {
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count"
},
"party_invite_received": {
"index": "ds-logs-live-player_invite",
"time_range": "d0",
"agg_type": "count",
"target_field": "body.target_uid"
},
"mail_read_count": {
"index": "ds-logs-live-mail_read",
"time_range": "d0",
"agg_type": "count"
},
# ==================== 3.11 거래소 및 경제 활동 ====================
"exchange_register_count": {
"index": "ds-logs-live-exchange_reg",
"time_range": "d0",
"agg_type": "count"
},
"exchange_use_count": {
"index": "ds-logs-live-exchange_use",
"time_range": "d0",
"agg_type": "count"
},
"coupon_used": {
"index": "ds-logs-live-coupon",
"time_range": "d0",
"agg_type": "exists"
},
# ==================== 3.12 기타 활동 ====================
"button_click_count": {
"index": "ds-logs-live-button_click",
"time_range": "d0",
"agg_type": "count"
},
"hideout_upgrade_count": {
"index": "ds-logs-live-log_hideout_upgrade",
"time_range": "d0",
"agg_type": "count"
},
"hideout_max_level": {
"index": "ds-logs-live-log_hideout_upgrade",
"time_range": "d0",
"agg_type": "max",
"field": "body.hideout_level"
},
"season_pass_buy": {
"index": "ds-logs-live-season_pass",
"time_range": "d0",
"agg_type": "exists",
"filters": [{"term": {"body.cause": 1}}]
},
"season_pass_max_step": {
"index": "ds-logs-live-season_pass",
"time_range": "d0",
"agg_type": "max",
"field": "body.season_pass_step"
},
# ==================== 리텐션 판정 ====================
"retention_check_d1": {
"index": "ds-logs-live-heartbeat",
"time_range": "d1",
"agg_type": "exists"
},
"retention_check_d2": {
"index": "ds-logs-live-heartbeat",
"time_range": "d2",
"agg_type": "exists"
},
"retention_check_d3": {
"index": "ds-logs-live-heartbeat",
"time_range": "d3",
"agg_type": "exists"
},
"retention_check_d4": {
"index": "ds-logs-live-heartbeat",
"time_range": "d4",
"agg_type": "exists"
},
"retention_check_d5": {
"index": "ds-logs-live-heartbeat",
"time_range": "d5",
"agg_type": "exists"
},
"retention_check_d6": {
"index": "ds-logs-live-heartbeat",
"time_range": "d6",
"agg_type": "exists"
},
"retention_check_d7_plus": {
"index": "ds-logs-live-heartbeat",
"time_range": "d7_plus",
"agg_type": "exists"
}
}
# ==============================================================================
# 5. Composite Aggregation 신규 유저 코호트 선정 (동일)
2025-08-27 01:10:50 +09:00
# ==============================================================================
def get_new_user_cohort_optimized(
client: OpenSearch,
start_time: str,
end_time: str,
page_size: int = DEFAULT_COMPOSITE_SIZE
) -> Dict[str, Dict]:
"""최적화된 신규 유저 코호트 선정 (auth.id 수집 우선순위 적용)
ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별
auth.id 수집 우선순위: 1) login_comp, 2) log_return_to_lobby
"""
logger.info("=" * 80)
logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준, 최적화 버전)")
logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}")
logger.info(f"페이지 크기: {page_size}")
2025-08-27 01:10:50 +09:00
print_stage("🔍 신규 유저 코호트 선정 시작...")
2025-08-27 01:10:50 +09:00
cohort = {}
after_key = None
# Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출
create_uid_query = {
2025-08-27 01:10:50 +09:00
"size": 0,
"query": {
"bool": {
"filter": [
{"range": {"@timestamp": {"gte": start_time, "lt": end_time}}}
]
}
},
"aggs": {
"new_users": {
"composite": {
"size": page_size,
"sources": [
{"uid": {"terms": {"field": "uid.keyword"}}},
{"auth_id": {"terms": {"field": "auth.id.keyword"}}}
2025-08-27 01:10:50 +09:00
]
},
"aggs": {
"first_create": {"min": {"field": "@timestamp"}}
2025-08-27 01:10:50 +09:00
}
}
}
}
# 신규 생성된 유저 수집
new_user_map = {} # uid -> {'create_time': ...}
page_count = 0
total_pages_estimate = 0
2025-08-27 01:10:50 +09:00
while True:
page_count += 1
query = create_uid_query.copy()
2025-08-27 01:10:50 +09:00
if after_key:
query["aggs"]["new_users"]["composite"]["after"] = after_key
try:
# 진행률 표시 (제자리 갱신)
if page_count == 1:
print_progress("코호트 선정: 페이지 1 처리 중...")
else:
print_progress(f"코호트 선정: 페이지 {page_count} 처리 중... (수집: {len(new_user_map)}명)")
logger.info(f"create_uid 페이지 {page_count} 처리 중...")
2025-08-27 01:10:50 +09:00
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-create_uid",
2025-08-27 01:10:50 +09:00
body=query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
2025-08-27 01:10:50 +09:00
)
buckets = response["aggregations"]["new_users"]["buckets"]
if not buckets:
break
for bucket in buckets:
uid = bucket["key"]["uid"]
first_create_utc = bucket["first_create"]["value_as_string"]
# 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리)
if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]:
new_user_map[uid] = {
"create_time": first_create_utc
}
logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨")
# 콘솔에는 간결하게
print_progress(f"코호트 선정: 페이지 {page_count} 완료 (수집: {len(new_user_map)}명)")
after_key = response["aggregations"]["new_users"].get("after_key")
if not after_key:
break
except Exception as e:
logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}")
break
# 최종 진행률 표시 및 완료 메시지
print_complete(f"신규 유저 코호트 선정 완료: 총 {len(new_user_map)}명 확인")
logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨")
# Step 2: 모든 create_uid 유저를 cohort에 추가
if not new_user_map:
logger.warning("신규 유저가 없습니다.")
return cohort
uid_list = list(new_user_map.keys())
chunk_size = 100
total_users = 0
# 모든 create_uid 유저를 cohort에 먼저 추가 (auth_id는 N/A로 초기화)
for uid in uid_list:
cohort[uid] = {
'auth_id': 'N/A',
'create_time_utc': new_user_map[uid]["create_time"],
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')),
'language': 'N/A',
'country': 'N/A',
'device': 'N/A',
'nickname': 'N/A'
}
total_users += 1
print_complete(f"코호트 초기화 완료: {total_users}")
logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료")
# Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위)
print_stage("📊 login_comp 인덱스에서 추가 정보 수집 중...")
logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...")
login_comp_collected = set()
processing_timer.start("login_comp 정보 수집")
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
for i in range(0, len(uid_list), chunk_size):
chunk_uids = uid_list[i:i+chunk_size]
login_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"terms": {"uid.keyword": chunk_uids}}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "uid.keyword",
"size": chunk_size
},
"aggs": {
"user_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["body.device_mod", "body.nickname", "auth.id", "country"]
}
},
"latest_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname", "body.language", "auth.id", "country"]
}
}
}
}
}
}
try:
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-login_comp",
body=login_query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
for bucket in response["aggregations"]["users"]["buckets"]:
uid = bucket["key"]
2025-08-27 01:10:50 +09:00
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {}
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {}
2025-08-27 01:10:50 +09:00
# cohort 정보 업데이트 (auth.id 1순위 수집)
if uid in cohort:
cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A')
cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A')
cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
# country 수집 (login_comp에서)
country = latest_info_hit.get('country') or user_hit.get('country')
if country:
cohort[uid]['country'] = country
# auth.id 수집 (1순위)
auth_id = latest_info_hit.get('auth', {}).get('id') or user_hit.get('auth', {}).get('id')
if auth_id:
cohort[uid]['auth_id'] = auth_id
login_comp_collected.add(uid)
2025-08-27 01:10:50 +09:00
except Exception as e:
logger.error(f"login_comp 정보 수집 중 오류: {e}")
2025-08-27 01:10:50 +09:00
processing_timer.stop()
print_complete(f"login_comp 정보 수집 완료: {len(login_comp_collected)}/{total_users}")
logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료")
# Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위)
# login_comp에서 수집되지 않은 유저들 처리
missing_uids = [uid for uid in uid_list if uid not in login_comp_collected]
if missing_uids:
print_stage(f"🏠 lobby 인덱스에서 차선 정보 수집 중 ({len(missing_uids)}명)...")
logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...")
lobby_collected = set()
processing_timer.start("lobby 정보 수집")
for i in range(0, len(missing_uids), chunk_size):
chunk_uids = missing_uids[i:i+chunk_size]
lobby_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"terms": {"uid.keyword": chunk_uids}}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "uid.keyword",
"size": chunk_size
},
"aggs": {
"info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname", "auth.id", "country"]
}
}
}
}
}
}
try:
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-log_return_to_lobby",
body=lobby_query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
for bucket in response["aggregations"]["users"]["buckets"]:
uid = bucket["key"]
info_hit = bucket["info"]["hits"]["hits"][0]["_source"] if bucket["info"]["hits"]["hits"] else {}
# 차선 정보 업데이트
if uid in cohort:
# nickname 업데이트 (없는 경우에만)
if cohort[uid]['nickname'] == 'N/A':
cohort[uid]['nickname'] = info_hit.get('body', {}).get('nickname', 'N/A')
# country 수집 (없는 경우에만)
if cohort[uid]['country'] == 'N/A':
country = info_hit.get('country')
if country:
cohort[uid]['country'] = country
# auth.id 수집 (2순위, 없는 경우에만)
if cohort[uid]['auth_id'] == 'N/A':
auth_id = info_hit.get('auth', {}).get('id')
if auth_id:
cohort[uid]['auth_id'] = auth_id
lobby_collected.add(uid)
except Exception as e:
logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}")
processing_timer.stop()
print_complete(f"lobby 추가 정보 수집 완료: {len(lobby_collected)}")
logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료")
# 최종 통계
auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A')
print_complete(f"코호트 정보 수집 완료: auth.id {auth_id_count}/{total_users}명 수집")
logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}")
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
logger.info("=" * 80)
2025-08-27 01:10:50 +09:00
return cohort
# ==============================================================================
# 6. 수정된 msearch 쿼리 빌더 (nested → object)
2025-08-27 01:10:50 +09:00
# ==============================================================================
def build_fixed_msearch_queries(
uids: List[str],
cohort: Dict[str, Dict],
metrics_config: Dict[str, Dict]
) -> List[str]:
"""수정된 msearch 쿼리 생성 (nested 쿼리 제거)"""
queries = []
for uid in uids:
user_data = cohort[uid]
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
create_time_dt = user_data['create_time_dt']
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
# 시간 범위 정의 (create_time 기준)
d0_start = user_data['create_time_utc']
d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
d1_start = d0_end
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
d1_end = (create_time_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ')
d2_start = d1_end
d2_end = (create_time_dt + timedelta(hours=72)).strftime('%Y-%m-%dT%H:%M:%SZ')
d3_start = d2_end
d3_end = (create_time_dt + timedelta(hours=96)).strftime('%Y-%m-%dT%H:%M:%SZ')
d4_start = d3_end
d4_end = (create_time_dt + timedelta(hours=120)).strftime('%Y-%m-%dT%H:%M:%SZ')
d5_start = d4_end
d5_end = (create_time_dt + timedelta(hours=144)).strftime('%Y-%m-%dT%H:%M:%SZ')
d6_start = d5_end
d6_end = (create_time_dt + timedelta(hours=168)).strftime('%Y-%m-%dT%H:%M:%SZ')
d7_plus_start = d6_end
for metric_name, config in metrics_config.items():
# 시간 범위 선택
if config["time_range"] == "d0":
time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
elif config["time_range"] == "d1":
time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}}
elif config["time_range"] == "d2":
time_filter = {"range": {"@timestamp": {"gte": d2_start, "lt": d2_end}}}
elif config["time_range"] == "d3":
time_filter = {"range": {"@timestamp": {"gte": d3_start, "lt": d3_end}}}
elif config["time_range"] == "d4":
time_filter = {"range": {"@timestamp": {"gte": d4_start, "lt": d4_end}}}
elif config["time_range"] == "d5":
time_filter = {"range": {"@timestamp": {"gte": d5_start, "lt": d5_end}}}
elif config["time_range"] == "d6":
time_filter = {"range": {"@timestamp": {"gte": d6_start, "lt": d6_end}}}
elif config["time_range"] == "d7_plus":
time_filter = {"range": {"@timestamp": {"gte": d7_plus_start}}}
else: # 기본값
time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
# 사용자 식별 필터
if "target_field" in config:
if config.get("use_body_target", False):
# body.target_uid로 검색 (플레이어가 당한 경우)
user_filter = {"term": {config["target_field"]: uid}}
else:
# 일반적인 uid 검색
user_filter = {"term": {config["target_field"]: uid}}
else:
user_filter = {"term": {"uid.keyword": uid}}
# 쿼리 필터 구성
query_filters = [user_filter, time_filter]
# 추가 필터 적용 (nested 제거)
if "filters" in config:
query_filters.extend(config["filters"])
# 조건부 필터
if config.get("agg_type") == "conditional_sum":
condition_filter = {"term": {config['condition_field']: config['condition_value']}}
query_filters.append(condition_filter)
# 쿼리 바디 구성
agg_type = config.get("agg_type", "count")
needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"]
query_body = {
"size": 0 if agg_type not in needs_docs else 1000,
"query": {"bool": {"filter": query_filters}}
}
# Aggregation 설정
if agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type
query_body["aggs"] = {
"metric_value": {agg_func: {"field": config["field"]}}
}
elif agg_type == "max_timestamp":
query_body["aggs"] = {
"metric_value": {"max": {"field": "@timestamp"}}
}
elif agg_type in ["first_value", "max_stalker"]:
# first_value는 항상 @timestamp로 정렬 (earliest first)
# max_stalker는 다른 로직이므로 별도 처리 필요
if agg_type == "first_value":
query_body["sort"] = [{"@timestamp": {"order": "asc"}}]
else: # max_stalker
sort_field = config.get("field", "@timestamp")
query_body["sort"] = [{sort_field: {"order": "desc"}}]
query_body["_source"] = [config.get("field", "@timestamp")]
elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]:
# 최고 레벨 우선, 같은 레벨이면 최신 순
level_field = config.get("level_field", "body.level")
stalker_field = config.get("stalker_field", "body.stalker")
query_body["sort"] = [
{level_field: {"order": "desc"}},
{"@timestamp": {"order": "desc"}}
]
query_body["_source"] = [level_field, stalker_field]
# NDJSON 추가
queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False))
queries.append(json.dumps(query_body, ensure_ascii=False))
return queries
# ==============================================================================
# 7. 세션 지표 계산 (동일)
# ==============================================================================
def calculate_comprehensive_session_metrics(
2025-08-27 01:10:50 +09:00
client: OpenSearch,
uids: List[str],
cohort_data: Dict[str, Dict]
) -> Dict[str, Dict]:
"""포괄적인 세션 지표 계산"""
2025-08-27 01:10:50 +09:00
def stream_user_events(uid: str) -> Generator[Dict, None, None]:
user_info = cohort_data.get(uid)
if not user_info:
return
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
create_time_dt = user_info['create_time_dt']
d0_end_dt = create_time_dt + timedelta(hours=24)
2025-08-27 01:10:50 +09:00
query = {
"query": {
"bool": {
"filter": [
{"term": {"uid.keyword": uid}},
{"range": {"@timestamp": {
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
"gte": user_info['create_time_utc'],
2025-08-27 01:10:50 +09:00
"lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
}}}
]
}
}
}
try:
for doc in scan(
client,
query=query,
index="ds-logs-live-*",
scroll=SCROLL_TIMEOUT,
_source=["@timestamp", "type"]
):
source = doc['_source']
event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00'))
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
if create_time_dt <= event_dt < d0_end_dt:
2025-08-27 01:10:50 +09:00
yield {
"time": event_dt,
"type": source.get('type', '').lower()
}
except Exception:
pass
results = {}
for uid in uids:
events = list(stream_user_events(uid))
session_metrics = {
'active_seconds': 0,
'total_playtime_minutes': 0,
'session_count': 0,
'avg_session_length': 0,
'logout_abnormal': 0
}
2025-08-27 01:10:50 +09:00
if len(events) < 2:
results[uid] = session_metrics
2025-08-27 01:10:50 +09:00
continue
events.sort(key=lambda x: x['time'])
login_events = [e for e in events if e['type'] == 'login_comp']
logout_events = [e for e in events if e['type'] == 'logout']
heartbeat_events = [e for e in events if e['type'] == 'heartbeat']
2025-08-27 01:10:50 +09:00
session_metrics['session_count'] = len(login_events)
2025-08-27 01:10:50 +09:00
# 활동 시간 계산
if heartbeat_events:
total_active_seconds = 0
for i in range(len(heartbeat_events) - 1):
time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time']
if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES):
total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600)
2025-08-27 01:10:50 +09:00
session_metrics['active_seconds'] = int(total_active_seconds)
# 총 플레이 시간
if login_events:
total_playtime_seconds = 0
session_lengths = []
2025-08-27 01:10:50 +09:00
for login_event in login_events:
login_time = login_event['time']
logout_time = None
for logout_event in logout_events:
if logout_event['time'] > login_time:
logout_time = logout_event['time']
break
if not logout_time and events:
logout_time = events[-1]['time']
if logout_time:
session_duration = (logout_time - login_time).total_seconds()
session_duration = min(session_duration, MAX_SESSION_HOURS * 3600)
total_playtime_seconds += session_duration
session_lengths.append(session_duration)
2025-08-27 01:10:50 +09:00
session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60)
2025-08-27 01:10:50 +09:00
if session_lengths:
session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60)
# 비정상 종료 체크
if login_events and logout_events:
last_login = max(login_events, key=lambda x: x['time'])['time']
last_logout = max(logout_events, key=lambda x: x['time'])['time']
session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0
elif login_events and not logout_events:
session_metrics['logout_abnormal'] = 1
results[uid] = session_metrics
2025-08-27 01:10:50 +09:00
return results
2025-08-27 01:10:50 +09:00
# ==============================================================================
# 8. 수정된 배치 처리
2025-08-27 01:10:50 +09:00
# ==============================================================================
def process_fixed_batch(
2025-08-27 01:10:50 +09:00
client: OpenSearch,
batch_uids: List[str],
cohort: Dict[str, Dict],
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
metrics_config: Dict[str, Dict],
include_session_metrics: bool = False
2025-08-27 01:10:50 +09:00
) -> List[Dict]:
"""수정된 배치 처리 함수"""
2025-08-27 01:10:50 +09:00
# 진행률 추적용 로거
2025-08-27 01:10:50 +09:00
try:
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
# 1. 세션 지표 계산 (--full 옵션일 때만)
if include_session_metrics:
session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort)
else:
# 기본값으로 빈 딕셔너리 생성
session_metrics = {uid: {
'active_seconds': 0,
'total_playtime_minutes': 0,
'session_count': 0,
'avg_session_length': 0,
'logout_abnormal': 0
} for uid in batch_uids}
# 2. 수정된 msearch 실행
msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config)
body_ndjson = "\n".join(msearch_queries) + "\n"
# 콘솔에는 간단히, 파일에는 상세히
print_progress(f"배치 처리 중: {len(batch_uids)}명, {len(msearch_queries)//2}개 쿼리")
logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리")
msearch_responses = exponential_backoff_retry(
client.msearch,
body=body_ndjson,
request_timeout=300
).get('responses', [])
# 3. 결과 집계
batch_results = []
metrics_per_user = len(metrics_config)
for idx, uid in enumerate(batch_uids):
try:
user_data = cohort[uid]
user_session_metrics = session_metrics.get(uid, {})
user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user]
2025-08-27 01:10:50 +09:00
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
# 기본 정보 (first_login_time 제거, create_time으로 통합)
result = {
'uid': uid,
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
'auth_id': user_data.get('auth_id', 'N/A'), # auth_id 기본값 처리
'nickname': user_data['nickname'],
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
'create_time': user_data.get('create_time_kst', 'N/A'),
'retention_status': 'Retained_d0', # 기본값, 나중에 업데이트
'language': user_data['language'],
'country': user_data.get('country', 'N/A'),
'device': user_data['device'],
'active_seconds': user_session_metrics.get('active_seconds', 0),
'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0),
'session_count': user_session_metrics.get('session_count', 0),
'avg_session_length': user_session_metrics.get('avg_session_length', 0),
'logout_abnormal': user_session_metrics.get('logout_abnormal', 0)
}
# msearch 결과 파싱
metric_names = list(metrics_config.keys())
2025-08-27 01:10:50 +09:00
for i, metric_name in enumerate(metric_names):
if i >= len(user_responses):
result[metric_name] = 0
continue
response = user_responses[i]
config = metrics_config[metric_name]
if 'error' in response:
result[metric_name] = 0
continue
agg_type = config.get("agg_type", "count")
if agg_type == "count":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = hits_total
elif agg_type == "exists":
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
result[metric_name] = 1 if hits_total > 0 else 0
elif agg_type in ["sum", "avg", "max", "conditional_sum"]:
agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value')
result[metric_name] = int(agg_value) if agg_value else 0
elif agg_type == "max_timestamp":
timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string')
result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None
elif agg_type == "first_value":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
field_name = config.get("field", "")
if field_name.startswith("body."):
keys = field_name.split(".")
value = source_value
for key in keys:
if isinstance(value, dict):
value = value.get(key)
if value is None:
# 문자열 필드는 빈 문자열 또는 적절한 기본값 설정
if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
value = ""
else:
value = 0
break
else:
value = 0
break
result[metric_name] = value
else:
result[metric_name] = source_value.get(field_name, 0)
else:
if metric_name == "dungeon_first_result":
result[metric_name] = 2 # 미플레이
elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
result[metric_name] = ""
else:
result[metric_name] = 0
elif agg_type == "max_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
field_name = config.get("field", "")
keys = field_name.split(".")
value = source_value
for key in keys:
value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A'
result[metric_name] = value
else:
result[metric_name] = 'N/A'
elif agg_type == "max_level_with_stalker":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
level_field = config.get("level_field", "body.level")
# body.level 값 추출
level_keys = level_field.split(".")
level_value = source_value
for key in level_keys:
level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0
result[metric_name] = level_value if level_value else 0
else:
result[metric_name] = 0
elif agg_type == "max_level_stalker_name":
hits = response.get('hits', {}).get('hits', [])
if hits:
source_value = hits[0]['_source']
stalker_field = config.get("stalker_field", "body.stalker")
# body.stalker 값 추출
stalker_keys = stalker_field.split(".")
stalker_value = source_value
for key in stalker_keys:
stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0
result[metric_name] = stalker_value if stalker_value else 0
else:
result[metric_name] = 0
else:
result[metric_name] = 0
# 모든 메트릭 처리 후 리텐션 상태 판정
# D7+ > D6 > D5 > D4 > D3 > D2 > D1 > D0 순서로 판정 (마지막 접속일 기준)
if result.get('retention_check_d7_plus', 0) > 0:
result['retention_status'] = 'Retained_d7+'
elif result.get('retention_check_d6', 0) > 0:
result['retention_status'] = 'Retained_d6'
elif result.get('retention_check_d5', 0) > 0:
result['retention_status'] = 'Retained_d5'
elif result.get('retention_check_d4', 0) > 0:
result['retention_status'] = 'Retained_d4'
elif result.get('retention_check_d3', 0) > 0:
result['retention_status'] = 'Retained_d3'
elif result.get('retention_check_d2', 0) > 0:
result['retention_status'] = 'Retained_d2'
elif result.get('retention_check_d1', 0) > 0:
result['retention_status'] = 'Retained_d1'
else:
# D0에만 접속 (기본값 유지)
result['retention_status'] = 'Retained_d0'
2025-08-27 01:10:50 +09:00
# 모드별 계산된 지표
modes = ['COOP', 'Solo', 'Survival', 'Survival_BOT', 'Survival_Unprotected']
for mode in modes:
# 각 모드별 escape rate 계산
entry_key = f'{mode}_entry_count'
escape_key = f'{mode}_escape_count'
if result.get(entry_key, 0) > 0:
escape_count = result.get(escape_key, 0)
result[f'{mode}_escape_rate'] = round((escape_count / result[entry_key]), 4) # 소수점 4자리
# 각 모드별 평균 킬 수 계산
monster_kills = result.get(f'{mode}_monster_kill_count', 0)
player_kills = result.get(f'{mode}_player_kill_count', 0)
entry_count = result[entry_key]
result[f'{mode}_avg_monster_kills'] = round(monster_kills / entry_count, 2)
result[f'{mode}_avg_player_kills'] = round(player_kills / entry_count, 2)
else:
result[f'{mode}_escape_rate'] = 0.0
result[f'{mode}_avg_monster_kills'] = 0.0
result[f'{mode}_avg_player_kills'] = 0.0
batch_results.append(result)
except Exception as e:
logger.warning(f"UID '{uid}' 처리 중 오류: {e}")
print_progress(f"배치 완료: {len(batch_results)}")
logger.info(f"배치 처리 완료: {len(batch_results)}명 성공")
return batch_results
except Exception as e:
logger.error(f"배치 처리 중 심각한 오류: {e}")
return []
2025-08-27 01:10:50 +09:00
# ==============================================================================
# 9. 병렬 처리 및 결과 저장 (동일)
# ==============================================================================
2025-08-27 01:10:50 +09:00
def process_cohort_fixed_parallel(
2025-08-27 01:10:50 +09:00
client: OpenSearch,
cohort: Dict[str, Dict],
batch_size: int,
max_workers: int,
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
metrics_config: Dict[str, Dict],
include_session_metrics: bool = False
2025-08-27 01:10:50 +09:00
) -> List[Dict]:
"""수정된 병렬 처리"""
2025-08-27 01:10:50 +09:00
logger.info("=" * 80)
logger.info("2단계: 수정된 병렬 배치 처리")
logger.info(f"총 사용자: {len(cohort)}")
logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}")
logger.info(f"분석 지표: {len(metrics_config)}")
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
logger.info(f"세션 지표 포함: {'' if include_session_metrics else '아니오'}")
2025-08-27 01:10:50 +09:00
uid_list = list(cohort.keys())
chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)]
2025-08-27 01:10:50 +09:00
print_stage(f"⚡ 병렬 배치 처리 시작 ({len(cohort)}명, {len(chunks)}개 배치)...")
2025-08-27 01:10:50 +09:00
all_results = []
failed_chunks = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_chunk = {
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config, include_session_metrics): chunk
2025-08-27 01:10:50 +09:00
for chunk in chunks
}
completed_chunks = 0
processing_timer.start("병렬 배치 처리")
for future in as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
batch_results = future.result(timeout=600)
if batch_results:
all_results.extend(batch_results)
else:
2025-08-27 01:10:50 +09:00
failed_chunks.append(chunk)
completed_chunks += 1
# 콘솔 진행률 업데이트
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료, {len(all_results)}명 처리됨")
except Exception as e:
logger.warning(f"배치 처리 실패: {e}")
failed_chunks.append(chunk)
completed_chunks += 1
print_progress(f"병렬 처리 중: {completed_chunks}/{len(chunks)} 배치 완료 (오류 1개), {len(all_results)}명 처리됨")
processing_timer.stop()
2025-08-27 01:10:50 +09:00
# 실패한 청크 재처리
2025-08-27 01:10:50 +09:00
if failed_chunks:
print_stage(f"🔄 실패한 배치 재처리 중 ({len(failed_chunks)}개)...")
logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...")
processing_timer.start("배치 재처리")
for i, chunk in enumerate(failed_chunks):
2025-08-27 01:10:50 +09:00
try:
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics)
2025-08-27 01:10:50 +09:00
all_results.extend(batch_results)
print_progress(f"재처리 중: {i+1}/{len(failed_chunks)} 완료")
2025-08-27 01:10:50 +09:00
except Exception as e:
logger.error(f"재처리 실패: {e}")
processing_timer.stop()
print_complete(f"재처리 완료: {len(failed_chunks)}개 배치 처리됨")
2025-08-27 01:10:50 +09:00
# 최종 결과
print_complete(f"병렬 처리 완료: 총 {len(all_results)}명 성공")
logger.info(f"2단계 완료: {len(all_results)}명 처리 성공")
logger.info("=" * 80)
2025-08-27 01:10:50 +09:00
return all_results
def write_fixed_results(results: List[Dict], output_path: str, include_session_metrics: bool = False) -> None:
"""수정된 결과 저장 (retention_d1 제거)"""
logger.info("=" * 80)
logger.info("3단계: 결과 저장")
2025-08-27 01:10:50 +09:00
print_stage("💾 분석 결과 CSV 파일 저장 중...")
processing_timer.start("CSV 파일 저장")
2025-08-27 01:10:50 +09:00
if not results:
logger.error("저장할 결과 데이터가 없습니다.")
2025-08-27 01:10:50 +09:00
return
# 수정된 헤더 (first_login_time 제거, create_time만 사용, country 및 last_active_day 추가)
# 기본 헤더
2025-08-27 01:10:50 +09:00
headers = [
'uid', 'auth_id', 'nickname', 'create_time', 'retention_status', 'language', 'country', 'device',
'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result',
'death_PK', 'death_GiveUp', 'death_Mob', 'death_Trap', 'death_Red', 'death_Others'
]
# 세션 지표 (옵션에 따른 조건부 포함)
if include_session_metrics:
session_headers = ['active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal']
headers = headers[:4] + session_headers + headers[4:] # retention_status 뒤에 삽입
# 각 모드별 헤더 추가 (삭제된 지표 제외)
modes = ['COOP', 'Solo', 'Survival', 'Survival_BOT', 'Survival_Unprotected']
for mode in modes:
headers.extend([
f'{mode}_entry_count',
f'{mode}_escape_count',
f'{mode}_escape_rate',
f'{mode}_avg_survival_time',
f'{mode}_max_survival_time',
f'{mode}_raid_play_count',
f'{mode}_first_result',
f'{mode}_avg_monster_kills',
f'{mode}_avg_player_kills'
])
# 나머지 헤더 (삭제된 지표 제외)
headers.extend([
'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed',
'guide_quest_stage',
'highest_item_grade', 'blueprint_use_count', 'shop_buy_count',
'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count',
'enchant_count', 'enchant_gold_spent',
'ingame_equip_count', 'object_interaction_count',
'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time',
'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count',
'exchange_register_count', 'exchange_use_count', 'coupon_used',
'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step',
'last_logout_time'
])
2025-08-27 01:10:50 +09:00
try:
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore')
writer.writeheader()
for result in results:
writer.writerow(result)
processing_timer.stop()
print_complete(f"CSV 파일 저장 완료: {len(results)}명 데이터, {len(headers)}개 지표")
logger.info(f"결과 파일 저장 완료: {output_path}")
logger.info(f"{len(results)}명의 데이터 저장")
logger.info(f"분석 지표: {len(headers)}")
2025-08-27 01:10:50 +09:00
except Exception as e:
logger.error(f"CSV 파일 저장 실패: {e}")
2025-08-27 01:10:50 +09:00
# ==============================================================================
# 10. 메인 함수
2025-08-27 01:10:50 +09:00
# ==============================================================================
def parse_arguments() -> argparse.Namespace:
"""명령줄 인자 파싱"""
parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)")
parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)')
parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)')
parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기')
parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수')
parser.add_argument('--sample-size', type=int, help='샘플 분석 크기')
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
parser.add_argument('--full', action='store_true', help='세션 관련 지표 포함한 전체 분석')
2025-08-27 01:10:50 +09:00
return parser.parse_args()
def main():
"""메인 실행 함수"""
global logger
2025-08-27 01:10:50 +09:00
overall_start_time = time.time()
start_kst = datetime.now(KST)
timestamp = start_kst.strftime('%Y%m%d_%H%M%S')
2025-08-27 01:10:50 +09:00
2025-08-27 17:21:05 +09:00
log_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.log"
csv_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.csv"
logger = setup_logging(str(log_file_path))
logger.info("=" * 80)
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
logger.info("던전 스토커즈 신규 유저 분석 v5.0")
logger.info("create_uid 기반 신규 유저 판별 + 세션 지표 조건부 수집")
logger.info("=" * 80)
2025-08-27 01:10:50 +09:00
args = parse_arguments()
try:
start_kst_arg = datetime.fromisoformat(args.start_time)
end_kst_arg = datetime.fromisoformat(args.end_time)
2025-08-27 01:10:50 +09:00
start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
2025-08-27 01:10:50 +09:00
logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}")
logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}")
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
logger.info(f"세션 지표 포함: {'예 (--full 옵션 사용)' if args.full else '아니오 (빠른 분석 모드)'}")
2025-08-27 01:10:50 +09:00
except Exception as e:
logger.error(f"시간 형식 오류: {e}")
2025-08-27 01:10:50 +09:00
return
client = create_opensearch_client()
if not client:
return
try:
metrics_config = get_fixed_metrics_config()
logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}")
2025-08-27 01:10:50 +09:00
cohort = get_new_user_cohort_optimized(client, start_utc, end_utc)
if not cohort:
logger.error("분석할 신규 유저가 없습니다.")
2025-08-27 01:10:50 +09:00
return
if args.sample_size and args.sample_size < len(cohort):
uid_list = list(cohort.keys())
sampled_uids = uid_list[:args.sample_size]
cohort = {uid: cohort[uid] for uid in sampled_uids}
logger.info(f"샘플링 모드: {args.sample_size}명만 분석")
2025-08-27 01:10:50 +09:00
results = process_cohort_fixed_parallel(
수정 완료 사항 1. first_login_time 필드 제거 및 create_time으로 통합 - get_new_user_cohort_optimized 함수에서 first_login_* 필드들을 제거하고 create_time_*으로 통합 - build_fixed_msearch_queries와 calculate_comprehensive_session_metrics에서 first_login_dt를 create_time_dt로 변경 - CSV 헤더에서 first_login_time 제거 2. auth.id 수집 로직 수정 - create_uid 인덱스에서 auth.id를 정확히 수집하여 new_user_map에 저장 - result 딕셔너리 생성 시 auth_id 필드에 기본값 처리 추가 3. retention_status 판정을 create_time 기준으로 변경 - 모든 시간 범위 계산을 create_time_dt 기준으로 변경 - D+0, D+1 판정이 계정 생성 시점 기준으로 작동 4. 세션 관련 지표를 --full 옵션으로 조건부 실행 - --full 명령줄 옵션 추가 - process_fixed_batch 함수에 include_session_metrics 파라미터 추가 - --full 옵션이 없으면 active_seconds, total_playtime_minutes, session_count, avg_session_length을 0으로 설정하여 빠른 실행 가능 이제 스크립트는 다음과 같이 실행할 수 있습니다: - 빠른 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" - 전체 분석: python ds_new_user_analy.py --start-time "2025-08-16T12:00:00+09:00" --end-time "2025-08-16T14:00:00+09:00" --full 수정이 완료되었습니다. 이제 get_new_user_cohort_optimized 함수는: 1. Step 1: create_uid 인덱스에서 신규 유저와 계정 생성 시간 수집 (auth.id는 빈값이므로 수집하지 않음) 2. Step 2: heartbeat 인덱스에서 각 UID에 대한 auth.id 수집 3. Step 3: login_comp 인덱스에서 추가 정보(닉네임, 언어, 디바이스) 수집
2025-08-29 12:36:33 +09:00
client, cohort, args.batch_size, args.max_workers, metrics_config, args.full
)
2025-08-27 01:10:50 +09:00
write_fixed_results(results, str(csv_file_path), args.full)
2025-08-27 01:10:50 +09:00
# 요약
2025-08-27 01:10:50 +09:00
if results:
total_users = len(results)
retention_counts = {}
retention_groups = ['Retained_d0', 'Retained_d1', 'Retained_d2', 'Retained_d3', 'Retained_d4', 'Retained_d5', 'Retained_d6', 'Retained_d7+']
# 각 그룹별 카운트
for group in retention_groups:
retention_counts[group] = sum(1 for r in results if r.get('retention_status') == group)
2025-08-27 01:10:50 +09:00
logger.info("=" * 80)
logger.info("분석 요약")
logger.info("=" * 80)
logger.info(f"총 신규 유저: {total_users:,}")
# 기본 리텐션 분포
for group in retention_groups:
count = retention_counts[group]
percentage = (count / total_users) * 100
logger.info(f"{group}: {count:,}명 ({percentage:.1f}%)")
# 리텐션 퍼널 요약 (단계별 + 누적)
logger.info("")
logger.info("=" * 50)
logger.info("리텐션 퍼널 분석")
logger.info("=" * 50)
# 누적 리텐션 계산 (D1 이상 접속한 유저들)
cumulative_d1_plus = sum(retention_counts[group] for group in retention_groups[1:]) # D1부터
cumulative_d2_plus = sum(retention_counts[group] for group in retention_groups[2:]) # D2부터
cumulative_d3_plus = sum(retention_counts[group] for group in retention_groups[3:]) # D3부터
cumulative_d4_plus = sum(retention_counts[group] for group in retention_groups[4:]) # D4부터
cumulative_d5_plus = sum(retention_counts[group] for group in retention_groups[5:]) # D5부터
cumulative_d6_plus = sum(retention_counts[group] for group in retention_groups[6:]) # D6부터
cumulative_d7_plus = retention_counts['Retained_d7+']
# 퍼널 분석 (단계별 잔존율 + 누적 리텐션율)
if cumulative_d1_plus > 0:
logger.info(f"D0→D1: {(cumulative_d1_plus/total_users)*100:.1f}% ({cumulative_d1_plus:,}/{total_users:,}명) | 누적: {(cumulative_d1_plus/total_users)*100:.1f}%")
if cumulative_d2_plus > 0:
d1_to_d2_rate = (cumulative_d2_plus/cumulative_d1_plus)*100
logger.info(f"D1→D2: {d1_to_d2_rate:.1f}% ({cumulative_d2_plus:,}/{cumulative_d1_plus:,}명) | 누적: {(cumulative_d2_plus/total_users)*100:.1f}%")
if cumulative_d3_plus > 0:
d2_to_d3_rate = (cumulative_d3_plus/cumulative_d2_plus)*100
logger.info(f"D2→D3: {d2_to_d3_rate:.1f}% ({cumulative_d3_plus:,}/{cumulative_d2_plus:,}명) | 누적: {(cumulative_d3_plus/total_users)*100:.1f}%")
if cumulative_d4_plus > 0:
d3_to_d4_rate = (cumulative_d4_plus/cumulative_d3_plus)*100
logger.info(f"D3→D4: {d3_to_d4_rate:.1f}% ({cumulative_d4_plus:,}/{cumulative_d3_plus:,}명) | 누적: {(cumulative_d4_plus/total_users)*100:.1f}%")
if cumulative_d5_plus > 0:
d4_to_d5_rate = (cumulative_d5_plus/cumulative_d4_plus)*100
logger.info(f"D4→D5: {d4_to_d5_rate:.1f}% ({cumulative_d5_plus:,}/{cumulative_d4_plus:,}명) | 누적: {(cumulative_d5_plus/total_users)*100:.1f}%")
if cumulative_d6_plus > 0:
d5_to_d6_rate = (cumulative_d6_plus/cumulative_d5_plus)*100
logger.info(f"D5→D6: {d5_to_d6_rate:.1f}% ({cumulative_d6_plus:,}/{cumulative_d5_plus:,}명) | 누적: {(cumulative_d6_plus/total_users)*100:.1f}%")
if cumulative_d7_plus > 0:
d6_to_d7_rate = (cumulative_d7_plus/cumulative_d6_plus)*100
logger.info(f"D6→D7+: {d6_to_d7_rate:.1f}% ({cumulative_d7_plus:,}/{cumulative_d6_plus:,}명) | 누적: {(cumulative_d7_plus/total_users)*100:.1f}%")
logger.info("")
logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}")
2025-08-27 01:10:50 +09:00
except KeyboardInterrupt:
logger.warning("사용자에 의해 중단되었습니다.")
2025-08-27 01:10:50 +09:00
except Exception as e:
logger.error(f"예상치 못한 오류: {e}")
2025-08-27 01:10:50 +09:00
import traceback
logger.error(traceback.format_exc())
2025-08-27 01:10:50 +09:00
finally:
stop_timer_event.set()
end_time = time.time()
total_time = str(timedelta(seconds=int(end_time - overall_start_time)))
end_kst = datetime.now(KST)
logger.info("=" * 80)
logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}")
logger.info(f"총 소요 시간: {total_time}")
logger.info("수정된 분석 완료!")
logger.info("=" * 80)
print_complete(f"🎉 전체 분석 완료! 소요 시간: {total_time}")
# 타이머 정리
processing_timer.stop()
2025-08-27 01:10:50 +09:00
if __name__ == "__main__":
main()