2025-08-27 01:10:50 +09:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
"""
|
2025-08-27 02:15:45 +09:00
|
|
|
던전 스토커즈 신규 유저 리텐션 분석 스크립트 (수정 버전)
|
|
|
|
|
OpenSearch 매핑 오류 수정: nested → object 쿼리 변경
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
수정사항:
|
|
|
|
|
- body 필드를 nested가 아닌 object로 처리
|
|
|
|
|
- retention_d1 중복 필드 제거
|
|
|
|
|
- 필드 경로 정확성 개선
|
|
|
|
|
- 키워드 필드 사용 (.keyword 추가)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
작성자: Claude Code
|
2025-08-27 02:15:45 +09:00
|
|
|
기획서 기반: DS-new_users-analy.md
|
|
|
|
|
참고: OpenSearch 매핑 ds_opensearch_mappings.json
|
2025-08-27 01:10:50 +09:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import csv
|
|
|
|
|
import json
|
|
|
|
|
import time
|
2025-08-27 02:15:45 +09:00
|
|
|
import yaml
|
|
|
|
|
import logging
|
2025-08-27 01:10:50 +09:00
|
|
|
import argparse
|
|
|
|
|
import threading
|
|
|
|
|
from datetime import datetime, timedelta, timezone
|
2025-08-27 02:15:45 +09:00
|
|
|
from collections import defaultdict, Counter
|
|
|
|
|
from typing import Dict, List, Optional, Tuple, Generator, Any, Set
|
2025-08-27 01:10:50 +09:00
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
|
2025-08-27 02:15:45 +09:00
|
|
|
from pathlib import Path
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from tqdm import tqdm
|
|
|
|
|
from opensearchpy import OpenSearch
|
2025-08-27 02:15:45 +09:00
|
|
|
from opensearchpy.helpers import scan
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
# 1. 설정 및 상수
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# OpenSearch 연결 설정
|
2025-08-27 01:10:50 +09:00
|
|
|
OPENSEARCH_CONFIG = {
|
|
|
|
|
"host": "ds-opensearch.oneunivrs.com",
|
|
|
|
|
"port": 9200,
|
|
|
|
|
"auth": {
|
|
|
|
|
"username": "admin",
|
|
|
|
|
"password": "DHp5#r#GYQ9d"
|
|
|
|
|
},
|
|
|
|
|
"use_ssl": True,
|
|
|
|
|
"verify_certs": False,
|
2025-08-27 02:15:45 +09:00
|
|
|
"timeout": 120,
|
2025-08-27 01:10:50 +09:00
|
|
|
"max_retries": 3,
|
|
|
|
|
"headers": {"Connection": "close"}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 한국 표준시 설정
|
2025-08-27 01:10:50 +09:00
|
|
|
KST = timezone(timedelta(hours=9))
|
|
|
|
|
|
2025-08-29 13:43:23 +09:00
|
|
|
# 성능 최적화 설정 (오픈서치 스펙 기반 최적화)
|
|
|
|
|
DEFAULT_BATCH_SIZE = 2000
|
|
|
|
|
DEFAULT_MAX_WORKERS = 16
|
|
|
|
|
DEFAULT_COMPOSITE_SIZE = 2000
|
2025-08-27 02:15:45 +09:00
|
|
|
DEFAULT_TIMEOUT = 180
|
2025-08-27 01:10:50 +09:00
|
|
|
SCROLL_TIMEOUT = "5m"
|
2025-08-27 02:15:45 +09:00
|
|
|
SESSION_GAP_MINUTES = 5
|
|
|
|
|
MAX_SESSION_HOURS = 3
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 출력 디렉토리 설정
|
|
|
|
|
BASE_DIR = Path(__file__).parent
|
|
|
|
|
OUTPUT_DIR = BASE_DIR / "analysis_results"
|
|
|
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 전역 변수
|
2025-08-27 01:10:50 +09:00
|
|
|
stop_timer_event = threading.Event()
|
2025-08-27 02:15:45 +09:00
|
|
|
logger = None
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
2025-08-27 02:15:45 +09:00
|
|
|
# 2. 로깅 시스템 설정
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
|
|
|
|
def setup_logging(log_file_path: str) -> logging.Logger:
|
|
|
|
|
"""한국 시간 기준 로깅 설정"""
|
|
|
|
|
|
|
|
|
|
class KSTFormatter(logging.Formatter):
|
|
|
|
|
def formatTime(self, record, datefmt=None):
|
|
|
|
|
kst = timezone(timedelta(hours=9))
|
|
|
|
|
ct = datetime.fromtimestamp(record.created, kst)
|
|
|
|
|
return ct.strftime('%Y-%m-%dT%H:%M:%S+09:00')
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('NewUserAnalyzer')
|
|
|
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
|
|
|
|
# 기존 핸들러 제거
|
|
|
|
|
for handler in logger.handlers[:]:
|
|
|
|
|
logger.removeHandler(handler)
|
|
|
|
|
|
|
|
|
|
# 파일 핸들러
|
|
|
|
|
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
|
|
|
|
|
file_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
|
|
|
|
|
file_handler.setFormatter(file_formatter)
|
|
|
|
|
logger.addHandler(file_handler)
|
|
|
|
|
|
|
|
|
|
# 콘솔 핸들러
|
|
|
|
|
console_handler = logging.StreamHandler()
|
|
|
|
|
console_formatter = KSTFormatter('[%(levelname)s] %(asctime)s - %(message)s')
|
|
|
|
|
console_handler.setFormatter(console_formatter)
|
|
|
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
|
|
|
|
# 외부 라이브러리 로그 억제
|
|
|
|
|
logging.getLogger('opensearchpy').setLevel(logging.WARNING)
|
|
|
|
|
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
|
|
|
|
|
|
|
|
|
return logger
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
# 3. OpenSearch 연결 및 유틸리티
|
2025-08-27 01:10:50 +09:00
|
|
|
# ==============================================================================
|
|
|
|
|
|
|
|
|
|
def create_opensearch_client() -> Optional[OpenSearch]:
|
2025-08-27 02:15:45 +09:00
|
|
|
"""OpenSearch 클라이언트 생성"""
|
|
|
|
|
logger.info("OpenSearch 클러스터 연결 시도 중...")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
client = OpenSearch(
|
|
|
|
|
hosts=[{
|
|
|
|
|
"host": OPENSEARCH_CONFIG['host'],
|
|
|
|
|
"port": OPENSEARCH_CONFIG['port'],
|
|
|
|
|
"scheme": "https" if OPENSEARCH_CONFIG['use_ssl'] else "http"
|
|
|
|
|
}],
|
|
|
|
|
http_auth=(
|
|
|
|
|
OPENSEARCH_CONFIG['auth']['username'],
|
|
|
|
|
OPENSEARCH_CONFIG['auth']['password']
|
|
|
|
|
),
|
|
|
|
|
use_ssl=OPENSEARCH_CONFIG['use_ssl'],
|
|
|
|
|
verify_certs=OPENSEARCH_CONFIG['verify_certs'],
|
|
|
|
|
ssl_show_warn=False,
|
|
|
|
|
timeout=OPENSEARCH_CONFIG['timeout'],
|
|
|
|
|
max_retries=OPENSEARCH_CONFIG['max_retries'],
|
|
|
|
|
retry_on_timeout=True,
|
|
|
|
|
headers=OPENSEARCH_CONFIG['headers']
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not client.ping():
|
2025-08-27 02:15:45 +09:00
|
|
|
raise ConnectionError("클러스터 PING 실패")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info("OpenSearch 연결 성공!")
|
2025-08-27 01:10:50 +09:00
|
|
|
return client
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error(f"OpenSearch 연결 실패: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def exponential_backoff_retry(func, *args, **kwargs) -> Any:
|
2025-08-27 02:15:45 +09:00
|
|
|
"""지수 백오프 재시도 패턴"""
|
2025-08-27 01:10:50 +09:00
|
|
|
for delay in [1, 2, 4, 8, 16]:
|
|
|
|
|
try:
|
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
if delay == 16:
|
2025-08-27 01:10:50 +09:00
|
|
|
raise e
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.warning(f"재시도 중... {delay}초 대기 (오류: {str(e)[:100]})")
|
2025-08-27 01:10:50 +09:00
|
|
|
time.sleep(delay)
|
|
|
|
|
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
def format_kst_time(timestamp_str: str) -> str:
|
|
|
|
|
"""UTC 타임스탬프를 KST로 변환"""
|
|
|
|
|
try:
|
|
|
|
|
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
|
|
|
|
|
kst_dt = dt.astimezone(KST)
|
|
|
|
|
return kst_dt.strftime('%Y-%m-%dT%H:%M:%S+09:00')
|
|
|
|
|
except:
|
|
|
|
|
return timestamp_str
|
|
|
|
|
|
|
|
|
|
|
2025-08-27 01:10:50 +09:00
|
|
|
# ==============================================================================
|
2025-08-27 02:15:45 +09:00
|
|
|
# 4. 수정된 분석 지표 정의 (OpenSearch 매핑 기반)
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
|
|
|
|
def get_fixed_metrics_config() -> Dict[str, Dict]:
|
|
|
|
|
"""OpenSearch 매핑에 기반한 수정된 분석 지표"""
|
|
|
|
|
return {
|
|
|
|
|
# ==================== 3.2 플레이 시간 및 세션 ====================
|
|
|
|
|
"session_count": {
|
|
|
|
|
"index": "ds-logs-live-login_comp",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"last_logout_time": {
|
|
|
|
|
"index": "ds-logs-live-logout",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "max_timestamp"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.3 던전 플레이 성과 ====================
|
|
|
|
|
"dungeon_entry_count": {
|
|
|
|
|
"index": "ds-logs-live-survival_sta",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"dungeon_first_mode": {
|
|
|
|
|
"index": "ds-logs-live-survival_sta",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "first_value",
|
2025-08-27 04:45:07 +09:00
|
|
|
"field": "body.dungeon_mode"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"dungeon_first_stalker": {
|
|
|
|
|
"index": "ds-logs-live-survival_sta",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "first_value",
|
2025-08-27 04:45:07 +09:00
|
|
|
"field": "body.stalker_name"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"dungeon_first_result": {
|
|
|
|
|
"index": "ds-logs-live-survival_end",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "first_value",
|
|
|
|
|
"field": "body.result"
|
|
|
|
|
},
|
|
|
|
|
"dungeon_escape_count": {
|
|
|
|
|
"index": "ds-logs-live-survival_end",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"term": {"body.result": 1}}]
|
|
|
|
|
},
|
|
|
|
|
"avg_survival_time": {
|
|
|
|
|
"index": "ds-logs-live-survival_end",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "avg",
|
|
|
|
|
"field": "body.play_stats.playtime"
|
|
|
|
|
},
|
|
|
|
|
"max_survival_time": {
|
|
|
|
|
"index": "ds-logs-live-survival_end",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "max",
|
|
|
|
|
"field": "body.play_stats.playtime"
|
|
|
|
|
},
|
|
|
|
|
"total_armor_break": {
|
|
|
|
|
"index": "ds-logs-live-survival_end",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "sum",
|
|
|
|
|
"field": "body.play_stats.armor_break_cnt"
|
|
|
|
|
},
|
|
|
|
|
"raid_play_count": {
|
|
|
|
|
"index": "ds-logs-live-survival_end",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "sum",
|
|
|
|
|
"field": "body.play_stats.raid_play"
|
|
|
|
|
},
|
|
|
|
|
"escape_count": {
|
|
|
|
|
"index": "ds-logs-live-dead",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"term": {"body.inter_type": 0}}]
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.4 전투 성과 ====================
|
|
|
|
|
"monster_kill_count": {
|
2025-08-27 04:45:07 +09:00
|
|
|
"index": "ds-logs-live-monster_kill",
|
2025-08-27 02:15:45 +09:00
|
|
|
"time_range": "d0",
|
2025-08-27 04:45:07 +09:00
|
|
|
"agg_type": "count"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"player_kill_count": {
|
|
|
|
|
"index": "ds-logs-live-player_kill",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
2025-08-27 04:45:07 +09:00
|
|
|
"target_field": "uid.keyword"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"player_killed_count": {
|
|
|
|
|
"index": "ds-logs-live-player_kill",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
2025-08-27 04:45:07 +09:00
|
|
|
"target_field": "body.target_uid.keyword",
|
|
|
|
|
"use_body_target": True
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"death_count": {
|
|
|
|
|
"index": "ds-logs-live-dead",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"bool": {"must_not": {"term": {"body.inter_type": 0}}}}]
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.5 진행도 및 성장 ====================
|
|
|
|
|
"level_max": {
|
2025-08-27 04:45:07 +09:00
|
|
|
"index": "ds-logs-live-level_up",
|
2025-08-27 02:15:45 +09:00
|
|
|
"time_range": "d0",
|
2025-08-27 04:45:07 +09:00
|
|
|
"agg_type": "max_level_with_stalker",
|
|
|
|
|
"level_field": "body.level",
|
|
|
|
|
"stalker_field": "body.stalker"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"level_max_stalker": {
|
|
|
|
|
"index": "ds-logs-live-level_up",
|
|
|
|
|
"time_range": "d0",
|
2025-08-27 04:45:07 +09:00
|
|
|
"agg_type": "max_level_stalker_name",
|
|
|
|
|
"level_field": "body.level",
|
|
|
|
|
"stalker_field": "body.stalker"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"tutorial_entry": {
|
|
|
|
|
"index": "ds-logs-live-tutorial_entry",
|
|
|
|
|
"time_range": "d0",
|
2025-08-27 04:45:07 +09:00
|
|
|
"agg_type": "count"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"tutorial_completed": {
|
|
|
|
|
"index": "ds-logs-live-log_tutorial",
|
|
|
|
|
"time_range": "d0",
|
2025-08-27 04:45:07 +09:00
|
|
|
"agg_type": "count",
|
2025-08-27 02:15:45 +09:00
|
|
|
"filters": [
|
2025-08-27 04:45:07 +09:00
|
|
|
{"term": {"body.action_type.keyword": "Complete"}},
|
2025-08-27 02:15:45 +09:00
|
|
|
{"term": {"body.stage_type.keyword": "result"}}
|
|
|
|
|
]
|
|
|
|
|
},
|
|
|
|
|
"guide_quest_stage": {
|
|
|
|
|
"index": "ds-logs-live-guide_quest_stage",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "max",
|
|
|
|
|
"field": "body.guide_step"
|
|
|
|
|
},
|
|
|
|
|
"skill_points_earned": {
|
|
|
|
|
"index": "ds-logs-live-skill_point_get",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.6 아이템 및 경제 ====================
|
|
|
|
|
"items_obtained_count": {
|
|
|
|
|
"index": "ds-logs-live-item_get",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"highest_item_grade": {
|
|
|
|
|
"index": "ds-logs-live-item_get",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "max",
|
|
|
|
|
"field": "body.item_grade"
|
|
|
|
|
},
|
|
|
|
|
"blueprint_use_count": {
|
|
|
|
|
"index": "ds-logs-live-craft_from_blueprint",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"shop_buy_count": {
|
|
|
|
|
"index": "ds-logs-live-shop_buy",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"shop_sell_count": {
|
|
|
|
|
"index": "ds-logs-live-shop_sell",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"gold_spent": {
|
|
|
|
|
"index": "ds-logs-live-shop_buy",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "conditional_sum",
|
|
|
|
|
"field": "body.amt",
|
|
|
|
|
"condition_field": "body.cost_id.keyword",
|
|
|
|
|
"condition_value": "i108000"
|
|
|
|
|
},
|
|
|
|
|
"gold_earned": {
|
|
|
|
|
"index": "ds-logs-live-shop_sell",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "conditional_sum",
|
|
|
|
|
"field": "body.amt",
|
|
|
|
|
"condition_field": "body.cost_id.keyword",
|
|
|
|
|
"condition_value": "i108000"
|
|
|
|
|
},
|
|
|
|
|
"storage_in_count": {
|
|
|
|
|
"index": "ds-logs-live-storage_use",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"term": {"body.oper_type": 1}}]
|
|
|
|
|
},
|
|
|
|
|
"storage_out_count": {
|
|
|
|
|
"index": "ds-logs-live-storage_use",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"term": {"body.oper_type": -1}}]
|
|
|
|
|
},
|
|
|
|
|
"enchant_count": {
|
|
|
|
|
"index": "ds-logs-live-enchant",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"enchant_gold_spent": {
|
|
|
|
|
"index": "ds-logs-live-enchant",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "sum",
|
|
|
|
|
"field": "body.amt"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.7 장비 관리 ====================
|
|
|
|
|
"ingame_equip_count": {
|
|
|
|
|
"index": "ds-logs-live-item_ingame_equip",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"term": {"body.base_type": 2}}]
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.8 오브젝트 상호작용 ====================
|
|
|
|
|
"object_interaction_count": {
|
|
|
|
|
"index": "ds-logs-live-obj_inter",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.9 매칭 시스템 ====================
|
|
|
|
|
"matching_start_count": {
|
|
|
|
|
"index": "ds-logs-live-matching_start",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"matching_complete_count": {
|
|
|
|
|
"index": "ds-logs-live-matching_complete",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"matching_failed_count": {
|
|
|
|
|
"index": "ds-logs-live-matching_failed",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"avg_matching_time": {
|
|
|
|
|
"index": "ds-logs-live-matching_complete",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "avg",
|
|
|
|
|
"field": "body.matchingtime"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.10 소셜 활동 ====================
|
|
|
|
|
"friend_add_count": {
|
|
|
|
|
"index": "ds-logs-live-friend",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [
|
|
|
|
|
{"term": {"body.oper_type": 0}},
|
|
|
|
|
{"term": {"body.friend_type": 0}}
|
|
|
|
|
]
|
|
|
|
|
},
|
|
|
|
|
"friend_delete_count": {
|
|
|
|
|
"index": "ds-logs-live-friend",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
|
|
|
|
"filters": [{"term": {"body.oper_type": 1}}]
|
|
|
|
|
},
|
|
|
|
|
"party_invite_sent": {
|
|
|
|
|
"index": "ds-logs-live-player_invite",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"party_invite_received": {
|
|
|
|
|
"index": "ds-logs-live-player_invite",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count",
|
2025-08-27 04:45:07 +09:00
|
|
|
"target_field": "body.target_uid"
|
2025-08-27 02:15:45 +09:00
|
|
|
},
|
|
|
|
|
"mail_read_count": {
|
|
|
|
|
"index": "ds-logs-live-mail_read",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.11 거래소 및 경제 활동 ====================
|
|
|
|
|
"exchange_register_count": {
|
|
|
|
|
"index": "ds-logs-live-exchange_reg",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"exchange_use_count": {
|
|
|
|
|
"index": "ds-logs-live-exchange_use",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"coupon_used": {
|
|
|
|
|
"index": "ds-logs-live-coupon",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "exists"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 3.12 기타 활동 ====================
|
|
|
|
|
"button_click_count": {
|
|
|
|
|
"index": "ds-logs-live-button_click",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"hideout_upgrade_count": {
|
|
|
|
|
"index": "ds-logs-live-log_hideout_upgrade",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "count"
|
|
|
|
|
},
|
|
|
|
|
"hideout_max_level": {
|
|
|
|
|
"index": "ds-logs-live-log_hideout_upgrade",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "max",
|
|
|
|
|
"field": "body.hideout_level"
|
|
|
|
|
},
|
|
|
|
|
"season_pass_buy": {
|
|
|
|
|
"index": "ds-logs-live-season_pass",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "exists",
|
|
|
|
|
"filters": [{"term": {"body.cause": 1}}]
|
|
|
|
|
},
|
|
|
|
|
"season_pass_max_step": {
|
|
|
|
|
"index": "ds-logs-live-season_pass",
|
|
|
|
|
"time_range": "d0",
|
|
|
|
|
"agg_type": "max",
|
|
|
|
|
"field": "body.season_pass_step"
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
# ==================== 리텐션 판정 (retention_d1 삭제됨) ====================
|
|
|
|
|
"retention_check": {
|
|
|
|
|
"index": "ds-logs-live-login_comp",
|
|
|
|
|
"time_range": "d1",
|
|
|
|
|
"agg_type": "exists"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
# 5. Composite Aggregation 신규 유저 코호트 선정 (동일)
|
2025-08-27 01:10:50 +09:00
|
|
|
# ==============================================================================
|
|
|
|
|
|
|
|
|
|
def get_new_user_cohort_optimized(
|
|
|
|
|
client: OpenSearch,
|
|
|
|
|
start_time: str,
|
|
|
|
|
end_time: str,
|
|
|
|
|
page_size: int = DEFAULT_COMPOSITE_SIZE
|
|
|
|
|
) -> Dict[str, Dict]:
|
2025-08-29 14:26:04 +09:00
|
|
|
"""최적화된 신규 유저 코호트 선정 (auth.id 수집 우선순위 적용)
|
2025-08-28 21:54:25 +09:00
|
|
|
|
|
|
|
|
ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별
|
2025-08-29 14:26:04 +09:00
|
|
|
auth.id 수집 우선순위: 1) login_comp, 2) log_return_to_lobby
|
2025-08-28 21:54:25 +09:00
|
|
|
"""
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
logger.info("=" * 80)
|
2025-08-29 14:26:04 +09:00
|
|
|
logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준, 최적화 버전)")
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}")
|
|
|
|
|
logger.info(f"페이지 크기: {page_size}")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
cohort = {}
|
|
|
|
|
after_key = None
|
|
|
|
|
|
2025-08-28 21:54:25 +09:00
|
|
|
# Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출
|
|
|
|
|
create_uid_query = {
|
2025-08-27 01:10:50 +09:00
|
|
|
"size": 0,
|
|
|
|
|
"query": {
|
|
|
|
|
"bool": {
|
|
|
|
|
"filter": [
|
|
|
|
|
{"range": {"@timestamp": {"gte": start_time, "lt": end_time}}}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
|
|
|
|
"new_users": {
|
|
|
|
|
"composite": {
|
|
|
|
|
"size": page_size,
|
|
|
|
|
"sources": [
|
2025-08-28 21:54:25 +09:00
|
|
|
{"uid": {"terms": {"field": "uid.keyword"}}},
|
|
|
|
|
{"auth_id": {"terms": {"field": "auth.id.keyword"}}}
|
2025-08-27 01:10:50 +09:00
|
|
|
]
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
2025-08-28 21:54:25 +09:00
|
|
|
"first_create": {"min": {"field": "@timestamp"}}
|
2025-08-27 01:10:50 +09:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-28 21:54:25 +09:00
|
|
|
# 신규 생성된 유저 수집
|
2025-08-29 14:26:04 +09:00
|
|
|
new_user_map = {} # uid -> {'create_time': ...}
|
2025-08-27 02:15:45 +09:00
|
|
|
page_count = 0
|
2025-08-28 21:54:25 +09:00
|
|
|
|
2025-08-27 01:10:50 +09:00
|
|
|
while True:
|
2025-08-27 02:15:45 +09:00
|
|
|
page_count += 1
|
2025-08-28 21:54:25 +09:00
|
|
|
query = create_uid_query.copy()
|
2025-08-27 01:10:50 +09:00
|
|
|
if after_key:
|
|
|
|
|
query["aggs"]["new_users"]["composite"]["after"] = after_key
|
|
|
|
|
|
|
|
|
|
try:
|
2025-08-28 21:54:25 +09:00
|
|
|
logger.info(f"create_uid 페이지 {page_count} 처리 중...")
|
2025-08-27 01:10:50 +09:00
|
|
|
response = exponential_backoff_retry(
|
|
|
|
|
client.search,
|
2025-08-28 21:54:25 +09:00
|
|
|
index="ds-logs-live-create_uid",
|
2025-08-27 01:10:50 +09:00
|
|
|
body=query,
|
|
|
|
|
request_timeout=DEFAULT_TIMEOUT,
|
2025-08-27 02:15:45 +09:00
|
|
|
track_total_hits=False
|
2025-08-27 01:10:50 +09:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
buckets = response["aggregations"]["new_users"]["buckets"]
|
|
|
|
|
if not buckets:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
for bucket in buckets:
|
|
|
|
|
uid = bucket["key"]["uid"]
|
2025-08-28 21:54:25 +09:00
|
|
|
first_create_utc = bucket["first_create"]["value_as_string"]
|
|
|
|
|
|
|
|
|
|
# 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리)
|
|
|
|
|
if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]:
|
|
|
|
|
new_user_map[uid] = {
|
|
|
|
|
"create_time": first_create_utc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨")
|
|
|
|
|
|
|
|
|
|
after_key = response["aggregations"]["new_users"].get("after_key")
|
|
|
|
|
if not after_key:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}")
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨")
|
|
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
# Step 2: 모든 create_uid 유저를 cohort에 추가
|
2025-08-28 21:54:25 +09:00
|
|
|
if not new_user_map:
|
|
|
|
|
logger.warning("신규 유저가 없습니다.")
|
|
|
|
|
return cohort
|
|
|
|
|
|
|
|
|
|
uid_list = list(new_user_map.keys())
|
|
|
|
|
chunk_size = 100
|
2025-08-29 14:26:04 +09:00
|
|
|
total_users = 0
|
2025-08-28 21:54:25 +09:00
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
# 모든 create_uid 유저를 cohort에 먼저 추가 (auth_id는 N/A로 초기화)
|
2025-08-29 13:43:23 +09:00
|
|
|
for uid in uid_list:
|
|
|
|
|
cohort[uid] = {
|
2025-08-29 14:26:04 +09:00
|
|
|
'auth_id': 'N/A',
|
2025-08-29 13:43:23 +09:00
|
|
|
'create_time_utc': new_user_map[uid]["create_time"],
|
|
|
|
|
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
|
|
|
|
|
'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')),
|
|
|
|
|
'language': 'N/A',
|
|
|
|
|
'device': 'N/A',
|
|
|
|
|
'nickname': 'N/A'
|
|
|
|
|
}
|
|
|
|
|
total_users += 1
|
|
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
logger.info(f"cohort에 {total_users}명의 신규 유저 추가 완료")
|
|
|
|
|
|
|
|
|
|
# Step 3: login_comp 인덱스에서 추가 정보 수집 (auth.id 1순위)
|
|
|
|
|
logger.info("login_comp 인덱스에서 추가 정보 수집 중 (auth.id 1순위)...")
|
2025-08-29 13:43:23 +09:00
|
|
|
login_comp_collected = set()
|
2025-08-29 12:36:33 +09:00
|
|
|
|
2025-08-28 21:54:25 +09:00
|
|
|
for i in range(0, len(uid_list), chunk_size):
|
|
|
|
|
chunk_uids = uid_list[i:i+chunk_size]
|
|
|
|
|
|
|
|
|
|
login_query = {
|
|
|
|
|
"size": 0,
|
|
|
|
|
"query": {
|
|
|
|
|
"bool": {
|
|
|
|
|
"filter": [
|
|
|
|
|
{"terms": {"uid.keyword": chunk_uids}}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
|
|
|
|
"users": {
|
|
|
|
|
"terms": {
|
|
|
|
|
"field": "uid.keyword",
|
|
|
|
|
"size": chunk_size
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
|
|
|
|
"user_info": {
|
|
|
|
|
"top_hits": {
|
|
|
|
|
"size": 1,
|
|
|
|
|
"sort": [{"@timestamp": {"order": "asc"}}],
|
2025-08-29 14:26:04 +09:00
|
|
|
"_source": ["body.device_mod", "body.nickname", "auth.id"]
|
2025-08-28 21:54:25 +09:00
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
"latest_info": {
|
|
|
|
|
"top_hits": {
|
|
|
|
|
"size": 1,
|
|
|
|
|
"sort": [{"@timestamp": {"order": "desc"}}],
|
2025-08-29 14:26:04 +09:00
|
|
|
"_source": ["body.nickname", "body.language", "auth.id"]
|
2025-08-28 21:54:25 +09:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
response = exponential_backoff_retry(
|
|
|
|
|
client.search,
|
|
|
|
|
index="ds-logs-live-login_comp",
|
|
|
|
|
body=login_query,
|
|
|
|
|
request_timeout=DEFAULT_TIMEOUT,
|
|
|
|
|
track_total_hits=False
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for bucket in response["aggregations"]["users"]["buckets"]:
|
|
|
|
|
uid = bucket["key"]
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-28 21:54:25 +09:00
|
|
|
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {}
|
|
|
|
|
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {}
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
# cohort 정보 업데이트 (auth.id 1순위 수집)
|
2025-08-29 13:43:23 +09:00
|
|
|
if uid in cohort:
|
|
|
|
|
cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A')
|
|
|
|
|
cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A')
|
|
|
|
|
cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
|
2025-08-29 14:26:04 +09:00
|
|
|
|
|
|
|
|
# auth.id 수집 (1순위)
|
|
|
|
|
auth_id = latest_info_hit.get('auth', {}).get('id') or user_hit.get('auth', {}).get('id')
|
|
|
|
|
if auth_id:
|
|
|
|
|
cohort[uid]['auth_id'] = auth_id
|
|
|
|
|
|
2025-08-29 13:43:23 +09:00
|
|
|
login_comp_collected.add(uid)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-08-28 21:54:25 +09:00
|
|
|
logger.error(f"login_comp 정보 수집 중 오류: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
logger.info(f"login_comp에서 {len(login_comp_collected)}명의 정보 수집 완료")
|
2025-08-29 13:43:23 +09:00
|
|
|
|
2025-08-29 17:45:50 +09:00
|
|
|
# Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (auth.id 2순위, language fallback)
|
|
|
|
|
# login_comp에서 수집되지 않은 유저 + language가 N/A인 유저들 처리
|
|
|
|
|
missing_uids = [uid for uid in uid_list if uid not in login_comp_collected or cohort[uid]['language'] == 'N/A']
|
2025-08-29 13:43:23 +09:00
|
|
|
|
|
|
|
|
if missing_uids:
|
2025-08-29 14:26:04 +09:00
|
|
|
logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중 (auth.id 2순위)...")
|
2025-08-29 13:43:23 +09:00
|
|
|
lobby_collected = set()
|
|
|
|
|
|
|
|
|
|
for i in range(0, len(missing_uids), chunk_size):
|
|
|
|
|
chunk_uids = missing_uids[i:i+chunk_size]
|
|
|
|
|
|
|
|
|
|
lobby_query = {
|
|
|
|
|
"size": 0,
|
|
|
|
|
"query": {
|
|
|
|
|
"bool": {
|
|
|
|
|
"filter": [
|
|
|
|
|
{"terms": {"uid.keyword": chunk_uids}}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
|
|
|
|
"users": {
|
|
|
|
|
"terms": {
|
|
|
|
|
"field": "uid.keyword",
|
|
|
|
|
"size": chunk_size
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
2025-08-29 14:26:04 +09:00
|
|
|
"info": {
|
2025-08-29 13:43:23 +09:00
|
|
|
"top_hits": {
|
|
|
|
|
"size": 1,
|
|
|
|
|
"sort": [{"@timestamp": {"order": "desc"}}],
|
2025-08-29 17:45:50 +09:00
|
|
|
"_source": ["body.nickname", "auth.id", "country"]
|
2025-08-29 13:43:23 +09:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
response = exponential_backoff_retry(
|
|
|
|
|
client.search,
|
|
|
|
|
index="ds-logs-live-log_return_to_lobby",
|
|
|
|
|
body=lobby_query,
|
|
|
|
|
request_timeout=DEFAULT_TIMEOUT,
|
|
|
|
|
track_total_hits=False
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for bucket in response["aggregations"]["users"]["buckets"]:
|
|
|
|
|
uid = bucket["key"]
|
|
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
info_hit = bucket["info"]["hits"]["hits"][0]["_source"] if bucket["info"]["hits"]["hits"] else {}
|
2025-08-29 13:43:23 +09:00
|
|
|
|
2025-08-29 17:45:50 +09:00
|
|
|
# 차선 정보 업데이트
|
2025-08-29 14:26:04 +09:00
|
|
|
if uid in cohort:
|
|
|
|
|
# nickname 업데이트 (없는 경우에만)
|
|
|
|
|
if cohort[uid]['nickname'] == 'N/A':
|
|
|
|
|
cohort[uid]['nickname'] = info_hit.get('body', {}).get('nickname', 'N/A')
|
|
|
|
|
|
2025-08-29 17:45:50 +09:00
|
|
|
# language fallback: country 값 사용 (language가 N/A인 경우에만)
|
|
|
|
|
if cohort[uid]['language'] == 'N/A':
|
|
|
|
|
country = info_hit.get('country')
|
|
|
|
|
if country:
|
|
|
|
|
cohort[uid]['language'] = f"country-{country}"
|
|
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
# auth.id 수집 (2순위, 없는 경우에만)
|
|
|
|
|
if cohort[uid]['auth_id'] == 'N/A':
|
|
|
|
|
auth_id = info_hit.get('auth', {}).get('id')
|
|
|
|
|
if auth_id:
|
|
|
|
|
cohort[uid]['auth_id'] = auth_id
|
|
|
|
|
|
2025-08-29 13:43:23 +09:00
|
|
|
lobby_collected.add(uid)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}")
|
|
|
|
|
|
2025-08-29 14:26:04 +09:00
|
|
|
logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 차선 정보 수집 완료")
|
|
|
|
|
|
|
|
|
|
# 최종 통계
|
|
|
|
|
auth_id_count = sum(1 for uid in cohort if cohort[uid]['auth_id'] != 'N/A')
|
|
|
|
|
logger.info(f"최종 auth.id 수집 완료: {auth_id_count}/{total_users}명")
|
2025-08-29 13:43:23 +09:00
|
|
|
|
2025-08-28 21:54:25 +09:00
|
|
|
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info("=" * 80)
|
2025-08-27 01:10:50 +09:00
|
|
|
return cohort
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
2025-08-27 02:15:45 +09:00
|
|
|
# 6. 수정된 msearch 쿼리 빌더 (nested → object)
|
2025-08-27 01:10:50 +09:00
|
|
|
# ==============================================================================
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
def build_fixed_msearch_queries(
|
|
|
|
|
uids: List[str],
|
|
|
|
|
cohort: Dict[str, Dict],
|
|
|
|
|
metrics_config: Dict[str, Dict]
|
|
|
|
|
) -> List[str]:
|
|
|
|
|
"""수정된 msearch 쿼리 생성 (nested 쿼리 제거)"""
|
|
|
|
|
|
|
|
|
|
queries = []
|
|
|
|
|
|
|
|
|
|
for uid in uids:
|
|
|
|
|
user_data = cohort[uid]
|
2025-08-29 12:36:33 +09:00
|
|
|
create_time_dt = user_data['create_time_dt']
|
2025-08-27 02:15:45 +09:00
|
|
|
|
2025-08-29 12:36:33 +09:00
|
|
|
# 시간 범위 정의 (create_time 기준)
|
|
|
|
|
d0_start = user_data['create_time_utc']
|
|
|
|
|
d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
|
2025-08-27 02:15:45 +09:00
|
|
|
d1_start = d0_end
|
2025-08-29 12:36:33 +09:00
|
|
|
d1_end = (create_time_dt + timedelta(hours=48)).strftime('%Y-%m-%dT%H:%M:%SZ')
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
for metric_name, config in metrics_config.items():
|
|
|
|
|
# 시간 범위 선택
|
|
|
|
|
if config["time_range"] == "d0":
|
|
|
|
|
time_filter = {"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
|
|
|
|
|
else: # d1
|
|
|
|
|
time_filter = {"range": {"@timestamp": {"gte": d1_start, "lt": d1_end}}}
|
|
|
|
|
|
|
|
|
|
# 사용자 식별 필터
|
|
|
|
|
if "target_field" in config:
|
2025-08-27 04:45:07 +09:00
|
|
|
if config.get("use_body_target", False):
|
|
|
|
|
# body.target_uid로 검색 (플레이어가 당한 경우)
|
2025-08-27 02:15:45 +09:00
|
|
|
user_filter = {"term": {config["target_field"]: uid}}
|
|
|
|
|
else:
|
2025-08-27 04:45:07 +09:00
|
|
|
# 일반적인 uid 검색
|
|
|
|
|
user_filter = {"term": {config["target_field"]: uid}}
|
2025-08-27 02:15:45 +09:00
|
|
|
else:
|
2025-08-27 04:45:07 +09:00
|
|
|
user_filter = {"term": {"uid.keyword": uid}}
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
# 쿼리 필터 구성
|
|
|
|
|
query_filters = [user_filter, time_filter]
|
|
|
|
|
|
|
|
|
|
# 추가 필터 적용 (nested 제거)
|
|
|
|
|
if "filters" in config:
|
|
|
|
|
query_filters.extend(config["filters"])
|
|
|
|
|
|
|
|
|
|
# 조건부 필터
|
|
|
|
|
if config.get("agg_type") == "conditional_sum":
|
|
|
|
|
condition_filter = {"term": {config['condition_field']: config['condition_value']}}
|
|
|
|
|
query_filters.append(condition_filter)
|
|
|
|
|
|
|
|
|
|
# 쿼리 바디 구성
|
2025-08-27 04:45:07 +09:00
|
|
|
agg_type = config.get("agg_type", "count")
|
|
|
|
|
needs_docs = ["first_value", "max_stalker", "max_level_with_stalker", "max_level_stalker_name"]
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
query_body = {
|
2025-08-27 04:45:07 +09:00
|
|
|
"size": 0 if agg_type not in needs_docs else 1000,
|
|
|
|
|
"query": {"bool": {"filter": query_filters}}
|
2025-08-27 02:15:45 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Aggregation 설정
|
|
|
|
|
if agg_type in ["sum", "avg", "max", "conditional_sum"]:
|
|
|
|
|
agg_func = "sum" if agg_type in ["sum", "conditional_sum"] else agg_type
|
|
|
|
|
query_body["aggs"] = {
|
|
|
|
|
"metric_value": {agg_func: {"field": config["field"]}}
|
|
|
|
|
}
|
|
|
|
|
elif agg_type == "max_timestamp":
|
|
|
|
|
query_body["aggs"] = {
|
|
|
|
|
"metric_value": {"max": {"field": "@timestamp"}}
|
|
|
|
|
}
|
|
|
|
|
elif agg_type in ["first_value", "max_stalker"]:
|
2025-08-27 04:45:07 +09:00
|
|
|
# first_value는 항상 @timestamp로 정렬 (earliest first)
|
|
|
|
|
# max_stalker는 다른 로직이므로 별도 처리 필요
|
|
|
|
|
if agg_type == "first_value":
|
|
|
|
|
query_body["sort"] = [{"@timestamp": {"order": "asc"}}]
|
|
|
|
|
else: # max_stalker
|
|
|
|
|
sort_field = config.get("field", "@timestamp")
|
|
|
|
|
query_body["sort"] = [{sort_field: {"order": "desc"}}]
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
query_body["_source"] = [config.get("field", "@timestamp")]
|
2025-08-27 04:45:07 +09:00
|
|
|
elif agg_type in ["max_level_with_stalker", "max_level_stalker_name"]:
|
|
|
|
|
# 최고 레벨 우선, 같은 레벨이면 최신 순
|
|
|
|
|
level_field = config.get("level_field", "body.level")
|
|
|
|
|
stalker_field = config.get("stalker_field", "body.stalker")
|
|
|
|
|
|
|
|
|
|
query_body["sort"] = [
|
|
|
|
|
{level_field: {"order": "desc"}},
|
|
|
|
|
{"@timestamp": {"order": "desc"}}
|
|
|
|
|
]
|
|
|
|
|
query_body["_source"] = [level_field, stalker_field]
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
# NDJSON 추가
|
|
|
|
|
queries.append(json.dumps({"index": config["index"]}, ensure_ascii=False))
|
|
|
|
|
queries.append(json.dumps(query_body, ensure_ascii=False))
|
|
|
|
|
|
|
|
|
|
return queries
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
# 7. 세션 지표 계산 (동일)
|
|
|
|
|
# ==============================================================================
|
|
|
|
|
|
|
|
|
|
def calculate_comprehensive_session_metrics(
|
2025-08-27 01:10:50 +09:00
|
|
|
client: OpenSearch,
|
|
|
|
|
uids: List[str],
|
|
|
|
|
cohort_data: Dict[str, Dict]
|
2025-08-27 02:15:45 +09:00
|
|
|
) -> Dict[str, Dict]:
|
|
|
|
|
"""포괄적인 세션 지표 계산"""
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
def stream_user_events(uid: str) -> Generator[Dict, None, None]:
|
|
|
|
|
user_info = cohort_data.get(uid)
|
|
|
|
|
if not user_info:
|
|
|
|
|
return
|
|
|
|
|
|
2025-08-29 12:36:33 +09:00
|
|
|
create_time_dt = user_info['create_time_dt']
|
|
|
|
|
d0_end_dt = create_time_dt + timedelta(hours=24)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
query = {
|
|
|
|
|
"query": {
|
|
|
|
|
"bool": {
|
|
|
|
|
"filter": [
|
|
|
|
|
{"term": {"uid.keyword": uid}},
|
|
|
|
|
{"range": {"@timestamp": {
|
2025-08-29 12:36:33 +09:00
|
|
|
"gte": user_info['create_time_utc'],
|
2025-08-27 01:10:50 +09:00
|
|
|
"lt": d0_end_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
}}}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
for doc in scan(
|
|
|
|
|
client,
|
|
|
|
|
query=query,
|
|
|
|
|
index="ds-logs-live-*",
|
|
|
|
|
scroll=SCROLL_TIMEOUT,
|
|
|
|
|
_source=["@timestamp", "type"]
|
|
|
|
|
):
|
|
|
|
|
source = doc['_source']
|
|
|
|
|
event_dt = datetime.fromisoformat(source['@timestamp'].replace('Z', '+00:00'))
|
|
|
|
|
|
2025-08-29 12:36:33 +09:00
|
|
|
if create_time_dt <= event_dt < d0_end_dt:
|
2025-08-27 01:10:50 +09:00
|
|
|
yield {
|
|
|
|
|
"time": event_dt,
|
|
|
|
|
"type": source.get('type', '').lower()
|
|
|
|
|
}
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
results = {}
|
|
|
|
|
|
|
|
|
|
for uid in uids:
|
|
|
|
|
events = list(stream_user_events(uid))
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
session_metrics = {
|
|
|
|
|
'active_seconds': 0,
|
|
|
|
|
'total_playtime_minutes': 0,
|
|
|
|
|
'session_count': 0,
|
|
|
|
|
'avg_session_length': 0,
|
|
|
|
|
'logout_abnormal': 0
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-27 01:10:50 +09:00
|
|
|
if len(events) < 2:
|
2025-08-27 02:15:45 +09:00
|
|
|
results[uid] = session_metrics
|
2025-08-27 01:10:50 +09:00
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
events.sort(key=lambda x: x['time'])
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
login_events = [e for e in events if e['type'] == 'login_comp']
|
|
|
|
|
logout_events = [e for e in events if e['type'] == 'logout']
|
|
|
|
|
heartbeat_events = [e for e in events if e['type'] == 'heartbeat']
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
session_metrics['session_count'] = len(login_events)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 활동 시간 계산
|
|
|
|
|
if heartbeat_events:
|
|
|
|
|
total_active_seconds = 0
|
|
|
|
|
for i in range(len(heartbeat_events) - 1):
|
|
|
|
|
time_diff = heartbeat_events[i + 1]['time'] - heartbeat_events[i]['time']
|
|
|
|
|
if time_diff <= timedelta(minutes=SESSION_GAP_MINUTES):
|
|
|
|
|
total_active_seconds += min(time_diff.total_seconds(), MAX_SESSION_HOURS * 3600)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
session_metrics['active_seconds'] = int(total_active_seconds)
|
|
|
|
|
|
|
|
|
|
# 총 플레이 시간
|
|
|
|
|
if login_events:
|
|
|
|
|
total_playtime_seconds = 0
|
|
|
|
|
session_lengths = []
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
for login_event in login_events:
|
|
|
|
|
login_time = login_event['time']
|
|
|
|
|
logout_time = None
|
|
|
|
|
for logout_event in logout_events:
|
|
|
|
|
if logout_event['time'] > login_time:
|
|
|
|
|
logout_time = logout_event['time']
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if not logout_time and events:
|
|
|
|
|
logout_time = events[-1]['time']
|
|
|
|
|
|
|
|
|
|
if logout_time:
|
|
|
|
|
session_duration = (logout_time - login_time).total_seconds()
|
|
|
|
|
session_duration = min(session_duration, MAX_SESSION_HOURS * 3600)
|
|
|
|
|
total_playtime_seconds += session_duration
|
|
|
|
|
session_lengths.append(session_duration)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
session_metrics['total_playtime_minutes'] = int(total_playtime_seconds / 60)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
if session_lengths:
|
|
|
|
|
session_metrics['avg_session_length'] = int(sum(session_lengths) / len(session_lengths) / 60)
|
|
|
|
|
|
|
|
|
|
# 비정상 종료 체크
|
|
|
|
|
if login_events and logout_events:
|
|
|
|
|
last_login = max(login_events, key=lambda x: x['time'])['time']
|
|
|
|
|
last_logout = max(logout_events, key=lambda x: x['time'])['time']
|
|
|
|
|
session_metrics['logout_abnormal'] = 1 if last_logout < last_login else 0
|
|
|
|
|
elif login_events and not logout_events:
|
|
|
|
|
session_metrics['logout_abnormal'] = 1
|
|
|
|
|
|
|
|
|
|
results[uid] = session_metrics
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
return results
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
2025-08-27 02:15:45 +09:00
|
|
|
# 8. 수정된 배치 처리
|
2025-08-27 01:10:50 +09:00
|
|
|
# ==============================================================================
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
def process_fixed_batch(
|
2025-08-27 01:10:50 +09:00
|
|
|
client: OpenSearch,
|
|
|
|
|
batch_uids: List[str],
|
2025-08-27 02:15:45 +09:00
|
|
|
cohort: Dict[str, Dict],
|
2025-08-29 12:36:33 +09:00
|
|
|
metrics_config: Dict[str, Dict],
|
|
|
|
|
include_session_metrics: bool = False
|
2025-08-27 01:10:50 +09:00
|
|
|
) -> List[Dict]:
|
2025-08-27 02:15:45 +09:00
|
|
|
"""수정된 배치 처리 함수"""
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"배치 처리 시작: {len(batch_uids)}명")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
try:
|
2025-08-29 12:36:33 +09:00
|
|
|
# 1. 세션 지표 계산 (--full 옵션일 때만)
|
|
|
|
|
if include_session_metrics:
|
|
|
|
|
session_metrics = calculate_comprehensive_session_metrics(client, batch_uids, cohort)
|
|
|
|
|
else:
|
|
|
|
|
# 기본값으로 빈 딕셔너리 생성
|
|
|
|
|
session_metrics = {uid: {
|
|
|
|
|
'active_seconds': 0,
|
|
|
|
|
'total_playtime_minutes': 0,
|
|
|
|
|
'session_count': 0,
|
|
|
|
|
'avg_session_length': 0,
|
|
|
|
|
'logout_abnormal': 0
|
|
|
|
|
} for uid in batch_uids}
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
# 2. 수정된 msearch 실행
|
|
|
|
|
msearch_queries = build_fixed_msearch_queries(batch_uids, cohort, metrics_config)
|
|
|
|
|
|
|
|
|
|
body_ndjson = "\n".join(msearch_queries) + "\n"
|
|
|
|
|
logger.info(f"msearch 실행: {len(msearch_queries)//2}개 쿼리")
|
|
|
|
|
|
|
|
|
|
msearch_responses = exponential_backoff_retry(
|
|
|
|
|
client.msearch,
|
|
|
|
|
body=body_ndjson,
|
|
|
|
|
request_timeout=300
|
|
|
|
|
).get('responses', [])
|
|
|
|
|
|
|
|
|
|
# 3. 결과 집계
|
|
|
|
|
batch_results = []
|
|
|
|
|
metrics_per_user = len(metrics_config)
|
|
|
|
|
|
|
|
|
|
for idx, uid in enumerate(batch_uids):
|
|
|
|
|
try:
|
|
|
|
|
user_data = cohort[uid]
|
|
|
|
|
user_session_metrics = session_metrics.get(uid, {})
|
|
|
|
|
user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user]
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-29 12:36:33 +09:00
|
|
|
# 기본 정보 (first_login_time 제거, create_time으로 통합)
|
2025-08-27 02:15:45 +09:00
|
|
|
result = {
|
|
|
|
|
'uid': uid,
|
2025-08-29 12:36:33 +09:00
|
|
|
'auth_id': user_data.get('auth_id', 'N/A'), # auth_id 기본값 처리
|
2025-08-27 02:15:45 +09:00
|
|
|
'nickname': user_data['nickname'],
|
2025-08-29 12:36:33 +09:00
|
|
|
'create_time': user_data.get('create_time_kst', 'N/A'),
|
|
|
|
|
'retention_status': 'Retained_d0', # 기본값, 나중에 업데이트
|
2025-08-27 04:45:07 +09:00
|
|
|
'language': user_data['language'],
|
2025-08-27 02:15:45 +09:00
|
|
|
'device': user_data['device'],
|
|
|
|
|
'active_seconds': user_session_metrics.get('active_seconds', 0),
|
|
|
|
|
'total_playtime_minutes': user_session_metrics.get('total_playtime_minutes', 0),
|
|
|
|
|
'session_count': user_session_metrics.get('session_count', 0),
|
|
|
|
|
'avg_session_length': user_session_metrics.get('avg_session_length', 0),
|
|
|
|
|
'logout_abnormal': user_session_metrics.get('logout_abnormal', 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# msearch 결과 파싱
|
|
|
|
|
metric_names = list(metrics_config.keys())
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
for i, metric_name in enumerate(metric_names):
|
|
|
|
|
if i >= len(user_responses):
|
|
|
|
|
result[metric_name] = 0
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
response = user_responses[i]
|
|
|
|
|
config = metrics_config[metric_name]
|
|
|
|
|
|
|
|
|
|
if 'error' in response:
|
|
|
|
|
result[metric_name] = 0
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
agg_type = config.get("agg_type", "count")
|
|
|
|
|
|
2025-08-27 04:45:07 +09:00
|
|
|
if agg_type == "count":
|
|
|
|
|
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
|
|
|
|
|
result[metric_name] = hits_total
|
|
|
|
|
elif agg_type == "exists":
|
2025-08-27 02:15:45 +09:00
|
|
|
hits_total = response.get('hits', {}).get('total', {}).get('value', 0)
|
|
|
|
|
result[metric_name] = 1 if hits_total > 0 else 0
|
|
|
|
|
|
|
|
|
|
elif agg_type in ["sum", "avg", "max", "conditional_sum"]:
|
|
|
|
|
agg_value = response.get('aggregations', {}).get('metric_value', {}).get('value')
|
|
|
|
|
result[metric_name] = int(agg_value) if agg_value else 0
|
|
|
|
|
|
|
|
|
|
elif agg_type == "max_timestamp":
|
|
|
|
|
timestamp_value = response.get('aggregations', {}).get('metric_value', {}).get('value_as_string')
|
|
|
|
|
result[metric_name] = format_kst_time(timestamp_value) if timestamp_value else None
|
|
|
|
|
|
|
|
|
|
elif agg_type == "first_value":
|
|
|
|
|
hits = response.get('hits', {}).get('hits', [])
|
|
|
|
|
if hits:
|
|
|
|
|
source_value = hits[0]['_source']
|
|
|
|
|
field_name = config.get("field", "")
|
|
|
|
|
|
|
|
|
|
if field_name.startswith("body."):
|
|
|
|
|
keys = field_name.split(".")
|
|
|
|
|
value = source_value
|
|
|
|
|
for key in keys:
|
2025-08-27 04:45:07 +09:00
|
|
|
if isinstance(value, dict):
|
|
|
|
|
value = value.get(key)
|
|
|
|
|
if value is None:
|
|
|
|
|
# 문자열 필드는 빈 문자열 또는 적절한 기본값 설정
|
|
|
|
|
if metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
|
|
|
|
|
value = ""
|
|
|
|
|
else:
|
|
|
|
|
value = 0
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
value = 0
|
|
|
|
|
break
|
2025-08-27 02:15:45 +09:00
|
|
|
result[metric_name] = value
|
|
|
|
|
else:
|
|
|
|
|
result[metric_name] = source_value.get(field_name, 0)
|
|
|
|
|
else:
|
|
|
|
|
if metric_name == "dungeon_first_result":
|
|
|
|
|
result[metric_name] = 2 # 미플레이
|
2025-08-27 04:45:07 +09:00
|
|
|
elif metric_name in ["dungeon_first_mode", "dungeon_first_stalker"]:
|
|
|
|
|
result[metric_name] = ""
|
2025-08-27 02:15:45 +09:00
|
|
|
else:
|
|
|
|
|
result[metric_name] = 0
|
|
|
|
|
|
|
|
|
|
elif agg_type == "max_stalker":
|
|
|
|
|
hits = response.get('hits', {}).get('hits', [])
|
|
|
|
|
if hits:
|
|
|
|
|
source_value = hits[0]['_source']
|
|
|
|
|
field_name = config.get("field", "")
|
|
|
|
|
keys = field_name.split(".")
|
|
|
|
|
value = source_value
|
|
|
|
|
for key in keys:
|
|
|
|
|
value = value.get(key, 'N/A') if isinstance(value, dict) else 'N/A'
|
|
|
|
|
result[metric_name] = value
|
|
|
|
|
else:
|
|
|
|
|
result[metric_name] = 'N/A'
|
|
|
|
|
|
2025-08-27 04:45:07 +09:00
|
|
|
elif agg_type == "max_level_with_stalker":
|
|
|
|
|
hits = response.get('hits', {}).get('hits', [])
|
|
|
|
|
if hits:
|
|
|
|
|
source_value = hits[0]['_source']
|
|
|
|
|
level_field = config.get("level_field", "body.level")
|
|
|
|
|
|
|
|
|
|
# body.level 값 추출
|
|
|
|
|
level_keys = level_field.split(".")
|
|
|
|
|
level_value = source_value
|
|
|
|
|
for key in level_keys:
|
|
|
|
|
level_value = level_value.get(key, 0) if isinstance(level_value, dict) else 0
|
|
|
|
|
result[metric_name] = level_value if level_value else 0
|
|
|
|
|
else:
|
|
|
|
|
result[metric_name] = 0
|
|
|
|
|
|
|
|
|
|
elif agg_type == "max_level_stalker_name":
|
|
|
|
|
hits = response.get('hits', {}).get('hits', [])
|
|
|
|
|
if hits:
|
|
|
|
|
source_value = hits[0]['_source']
|
|
|
|
|
stalker_field = config.get("stalker_field", "body.stalker")
|
|
|
|
|
|
|
|
|
|
# body.stalker 값 추출
|
|
|
|
|
stalker_keys = stalker_field.split(".")
|
|
|
|
|
stalker_value = source_value
|
|
|
|
|
for key in stalker_keys:
|
|
|
|
|
stalker_value = stalker_value.get(key, 0) if isinstance(stalker_value, dict) else 0
|
|
|
|
|
result[metric_name] = stalker_value if stalker_value else 0
|
|
|
|
|
else:
|
|
|
|
|
result[metric_name] = 0
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
else:
|
|
|
|
|
result[metric_name] = 0
|
|
|
|
|
|
|
|
|
|
# 리텐션 상태 업데이트
|
|
|
|
|
if metric_name == "retention_check" and result[metric_name] > 0:
|
|
|
|
|
result['retention_status'] = 'Retained_d1'
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 계산된 지표
|
|
|
|
|
if result.get('dungeon_entry_count', 0) > 0:
|
|
|
|
|
escape_count = result.get('dungeon_escape_count', 0)
|
|
|
|
|
result['dungeon_escape_rate'] = round((escape_count / result['dungeon_entry_count']) * 100, 2)
|
2025-08-27 04:45:07 +09:00
|
|
|
|
|
|
|
|
# 게임당 평균 데미지 계산을 위해 직접 쿼리 (제거된 필드들 대신)
|
|
|
|
|
dungeon_count = result['dungeon_entry_count']
|
|
|
|
|
try:
|
2025-08-29 12:36:33 +09:00
|
|
|
# 시간 범위 가져오기 (create_time 기준)
|
|
|
|
|
create_time_dt = user_data['create_time_dt']
|
|
|
|
|
d0_start = user_data['create_time_utc']
|
|
|
|
|
d0_end = (create_time_dt + timedelta(hours=24)).strftime('%Y-%m-%dT%H:%M:%SZ')
|
2025-08-27 04:45:07 +09:00
|
|
|
|
|
|
|
|
# survival_end 인덱스에서 직접 데미지 합계 조회
|
|
|
|
|
damage_query = {
|
|
|
|
|
"size": 0,
|
|
|
|
|
"query": {
|
|
|
|
|
"bool": {
|
|
|
|
|
"filter": [
|
|
|
|
|
{"term": {"uid.keyword": uid}},
|
|
|
|
|
{"range": {"@timestamp": {"gte": d0_start, "lt": d0_end}}}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
"aggs": {
|
|
|
|
|
"total_monster_damage": {"sum": {"field": "body.play_stats.damage_dealt_monster"}},
|
|
|
|
|
"total_player_damage": {"sum": {"field": "body.play_stats.damage_dealt_player"}}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
damage_response = exponential_backoff_retry(
|
|
|
|
|
client.search,
|
|
|
|
|
index="ds-logs-live-survival_end",
|
|
|
|
|
body=damage_query,
|
|
|
|
|
request_timeout=DEFAULT_TIMEOUT
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
monster_damage = damage_response.get('aggregations', {}).get('total_monster_damage', {}).get('value', 0) or 0
|
|
|
|
|
player_damage = damage_response.get('aggregations', {}).get('total_player_damage', {}).get('value', 0) or 0
|
|
|
|
|
|
|
|
|
|
result['avg_damage_per_game_monster'] = round(monster_damage / dungeon_count, 2)
|
|
|
|
|
result['avg_damage_per_game_player'] = round(player_damage / dungeon_count, 2)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"평균 데미지 계산 실패 (UID: {uid}): {e}")
|
|
|
|
|
result['avg_damage_per_game_monster'] = 0
|
|
|
|
|
result['avg_damage_per_game_player'] = 0
|
2025-08-27 01:10:50 +09:00
|
|
|
else:
|
2025-08-27 02:15:45 +09:00
|
|
|
result['dungeon_escape_rate'] = 0
|
2025-08-27 04:45:07 +09:00
|
|
|
result['avg_damage_per_game_monster'] = 0
|
|
|
|
|
result['avg_damage_per_game_player'] = 0
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
batch_results.append(result)
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.warning(f"UID '{uid}' 처리 중 오류: {e}")
|
|
|
|
|
|
|
|
|
|
logger.info(f"배치 처리 완료: {len(batch_results)}명 성공")
|
|
|
|
|
return batch_results
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"배치 처리 중 심각한 오류: {e}")
|
|
|
|
|
return []
|
|
|
|
|
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# ==============================================================================
|
|
|
|
|
# 9. 병렬 처리 및 결과 저장 (동일)
|
|
|
|
|
# ==============================================================================
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
def process_cohort_fixed_parallel(
|
2025-08-27 01:10:50 +09:00
|
|
|
client: OpenSearch,
|
|
|
|
|
cohort: Dict[str, Dict],
|
|
|
|
|
batch_size: int,
|
2025-08-27 02:15:45 +09:00
|
|
|
max_workers: int,
|
2025-08-29 12:36:33 +09:00
|
|
|
metrics_config: Dict[str, Dict],
|
|
|
|
|
include_session_metrics: bool = False
|
2025-08-27 01:10:50 +09:00
|
|
|
) -> List[Dict]:
|
2025-08-27 02:15:45 +09:00
|
|
|
"""수정된 병렬 처리"""
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info("=" * 80)
|
|
|
|
|
logger.info("2단계: 수정된 병렬 배치 처리")
|
|
|
|
|
logger.info(f"총 사용자: {len(cohort)}명")
|
|
|
|
|
logger.info(f"배치 크기: {batch_size}, 워커: {max_workers}")
|
|
|
|
|
logger.info(f"분석 지표: {len(metrics_config)}개")
|
2025-08-29 12:36:33 +09:00
|
|
|
logger.info(f"세션 지표 포함: {'예' if include_session_metrics else '아니오'}")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
uid_list = list(cohort.keys())
|
2025-08-27 02:15:45 +09:00
|
|
|
chunks = [uid_list[i:i + batch_size] for i in range(0, len(uid_list), batch_size)]
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
all_results = []
|
|
|
|
|
failed_chunks = []
|
|
|
|
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
|
|
future_to_chunk = {
|
2025-08-29 12:36:33 +09:00
|
|
|
executor.submit(process_fixed_batch, client, chunk, cohort, metrics_config, include_session_metrics): chunk
|
2025-08-27 01:10:50 +09:00
|
|
|
for chunk in chunks
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
with tqdm(total=len(chunks), desc="배치 처리 진행률") as pbar:
|
2025-08-27 01:10:50 +09:00
|
|
|
for future in as_completed(future_to_chunk):
|
|
|
|
|
chunk = future_to_chunk[future]
|
|
|
|
|
try:
|
2025-08-27 02:15:45 +09:00
|
|
|
batch_results = future.result(timeout=600)
|
2025-08-27 01:10:50 +09:00
|
|
|
if batch_results:
|
|
|
|
|
all_results.extend(batch_results)
|
|
|
|
|
else:
|
|
|
|
|
failed_chunks.append(chunk)
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.warning(f"배치 처리 실패: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
failed_chunks.append(chunk)
|
|
|
|
|
finally:
|
|
|
|
|
pbar.update(1)
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 실패한 청크 재처리
|
2025-08-27 01:10:50 +09:00
|
|
|
if failed_chunks:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"실패한 {len(failed_chunks)}개 배치 재처리 중...")
|
2025-08-27 01:10:50 +09:00
|
|
|
for chunk in failed_chunks:
|
|
|
|
|
try:
|
2025-08-29 12:36:33 +09:00
|
|
|
batch_results = process_fixed_batch(client, chunk, cohort, metrics_config, include_session_metrics)
|
2025-08-27 01:10:50 +09:00
|
|
|
all_results.extend(batch_results)
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error(f"재처리 실패: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"2단계 완료: {len(all_results)}명 처리 성공")
|
|
|
|
|
logger.info("=" * 80)
|
2025-08-27 01:10:50 +09:00
|
|
|
return all_results
|
|
|
|
|
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
def write_fixed_results(results: List[Dict], output_path: str) -> None:
|
|
|
|
|
"""수정된 결과 저장 (retention_d1 제거)"""
|
|
|
|
|
|
|
|
|
|
logger.info("=" * 80)
|
|
|
|
|
logger.info("3단계: 결과 저장")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
if not results:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error("저장할 결과 데이터가 없습니다.")
|
2025-08-27 01:10:50 +09:00
|
|
|
return
|
|
|
|
|
|
2025-08-29 12:36:33 +09:00
|
|
|
# 수정된 헤더 (first_login_time 제거, create_time만 사용)
|
2025-08-27 01:10:50 +09:00
|
|
|
headers = [
|
2025-08-29 12:36:33 +09:00
|
|
|
'uid', 'auth_id', 'nickname', 'create_time', 'retention_status', 'language', 'device',
|
2025-08-27 02:15:45 +09:00
|
|
|
'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal',
|
|
|
|
|
'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result',
|
|
|
|
|
'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time',
|
|
|
|
|
'total_armor_break', 'raid_play_count', 'escape_count',
|
|
|
|
|
'monster_kill_count', 'player_kill_count', 'player_killed_count', 'death_count',
|
2025-08-27 04:45:07 +09:00
|
|
|
'avg_damage_per_game_monster', 'avg_damage_per_game_player',
|
2025-08-27 02:15:45 +09:00
|
|
|
'level_max', 'level_max_stalker', 'tutorial_entry', 'tutorial_completed',
|
|
|
|
|
'guide_quest_stage', 'skill_points_earned',
|
|
|
|
|
'items_obtained_count', 'highest_item_grade', 'blueprint_use_count', 'shop_buy_count',
|
|
|
|
|
'shop_sell_count', 'gold_spent', 'gold_earned', 'storage_in_count', 'storage_out_count',
|
|
|
|
|
'enchant_count', 'enchant_gold_spent',
|
|
|
|
|
'ingame_equip_count', 'object_interaction_count',
|
|
|
|
|
'matching_start_count', 'matching_complete_count', 'matching_failed_count', 'avg_matching_time',
|
|
|
|
|
'friend_add_count', 'friend_delete_count', 'party_invite_sent', 'party_invite_received', 'mail_read_count',
|
|
|
|
|
'exchange_register_count', 'exchange_use_count', 'coupon_used',
|
|
|
|
|
'button_click_count', 'hideout_upgrade_count', 'hideout_max_level', 'season_pass_buy', 'season_pass_max_step',
|
|
|
|
|
'last_logout_time'
|
2025-08-27 01:10:50 +09:00
|
|
|
]
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
|
|
|
|
writer = csv.DictWriter(csvfile, fieldnames=headers, extrasaction='ignore')
|
|
|
|
|
writer.writeheader()
|
|
|
|
|
|
|
|
|
|
for result in results:
|
|
|
|
|
writer.writerow(result)
|
|
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"결과 파일 저장 완료: {output_path}")
|
|
|
|
|
logger.info(f"총 {len(results)}명의 데이터 저장")
|
|
|
|
|
logger.info(f"분석 지표: {len(headers)}개")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error(f"CSV 파일 저장 실패: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ==============================================================================
|
2025-08-27 02:15:45 +09:00
|
|
|
# 10. 메인 함수
|
2025-08-27 01:10:50 +09:00
|
|
|
# ==============================================================================
|
|
|
|
|
|
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
|
|
|
"""명령줄 인자 파싱"""
|
2025-08-27 02:15:45 +09:00
|
|
|
parser = argparse.ArgumentParser(description="던전 스토커즈 신규 유저 분석 (수정 버전)")
|
|
|
|
|
parser.add_argument('--start-time', required=True, help='분석 시작 시간 (KST)')
|
|
|
|
|
parser.add_argument('--end-time', required=True, help='분석 종료 시간 (KST)')
|
|
|
|
|
parser.add_argument('--batch-size', type=int, default=DEFAULT_BATCH_SIZE, help='배치 크기')
|
|
|
|
|
parser.add_argument('--max-workers', type=int, default=DEFAULT_MAX_WORKERS, help='병렬 워커 수')
|
|
|
|
|
parser.add_argument('--sample-size', type=int, help='샘플 분석 크기')
|
2025-08-29 12:36:33 +09:00
|
|
|
parser.add_argument('--full', action='store_true', help='세션 관련 지표 포함한 전체 분석')
|
2025-08-27 01:10:50 +09:00
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
"""메인 실행 함수"""
|
2025-08-27 02:15:45 +09:00
|
|
|
global logger
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
overall_start_time = time.time()
|
2025-08-27 02:15:45 +09:00
|
|
|
start_kst = datetime.now(KST)
|
|
|
|
|
timestamp = start_kst.strftime('%Y%m%d_%H%M%S')
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 17:21:05 +09:00
|
|
|
log_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.log"
|
2025-08-29 11:41:29 +09:00
|
|
|
csv_file_path = OUTPUT_DIR / f"ds-new_user_analy-{timestamp}.csv"
|
2025-08-27 02:15:45 +09:00
|
|
|
|
|
|
|
|
logger = setup_logging(str(log_file_path))
|
|
|
|
|
|
|
|
|
|
logger.info("=" * 80)
|
2025-08-29 12:36:33 +09:00
|
|
|
logger.info("던전 스토커즈 신규 유저 분석 v5.0")
|
|
|
|
|
logger.info("create_uid 기반 신규 유저 판별 + 세션 지표 조건부 수집")
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info("=" * 80)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
args = parse_arguments()
|
|
|
|
|
|
|
|
|
|
try:
|
2025-08-27 02:15:45 +09:00
|
|
|
start_kst_arg = datetime.fromisoformat(args.start_time)
|
|
|
|
|
end_kst_arg = datetime.fromisoformat(args.end_time)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
start_utc = start_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
end_utc = end_kst_arg.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"분석 기간 (KST): {args.start_time} ~ {args.end_time}")
|
|
|
|
|
logger.info(f"분석 기간 (UTC): {start_utc} ~ {end_utc}")
|
2025-08-29 12:36:33 +09:00
|
|
|
logger.info(f"세션 지표 포함: {'예 (--full 옵션 사용)' if args.full else '아니오 (빠른 분석 모드)'}")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error(f"시간 형식 오류: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
client = create_opensearch_client()
|
|
|
|
|
if not client:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
2025-08-27 02:15:45 +09:00
|
|
|
metrics_config = get_fixed_metrics_config()
|
|
|
|
|
logger.info(f"수정된 분석 지표 로드: {len(metrics_config)}개")
|
|
|
|
|
|
2025-08-27 01:10:50 +09:00
|
|
|
cohort = get_new_user_cohort_optimized(client, start_utc, end_utc)
|
|
|
|
|
|
|
|
|
|
if not cohort:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error("분석할 신규 유저가 없습니다.")
|
2025-08-27 01:10:50 +09:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if args.sample_size and args.sample_size < len(cohort):
|
|
|
|
|
uid_list = list(cohort.keys())
|
|
|
|
|
sampled_uids = uid_list[:args.sample_size]
|
|
|
|
|
cohort = {uid: cohort[uid] for uid in sampled_uids}
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info(f"샘플링 모드: {args.sample_size}명만 분석")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
results = process_cohort_fixed_parallel(
|
2025-08-29 12:36:33 +09:00
|
|
|
client, cohort, args.batch_size, args.max_workers, metrics_config, args.full
|
2025-08-27 02:15:45 +09:00
|
|
|
)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
write_fixed_results(results, str(csv_file_path))
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
# 요약
|
2025-08-27 01:10:50 +09:00
|
|
|
if results:
|
2025-08-27 02:15:45 +09:00
|
|
|
total_users = len(results)
|
2025-08-27 01:10:50 +09:00
|
|
|
retained_d1 = sum(1 for r in results if r.get('retention_status') == 'Retained_d1')
|
2025-08-27 02:15:45 +09:00
|
|
|
retention_rate = (retained_d1 / total_users) * 100
|
2025-08-27 01:10:50 +09:00
|
|
|
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.info("=" * 80)
|
|
|
|
|
logger.info("분석 요약")
|
|
|
|
|
logger.info("=" * 80)
|
|
|
|
|
logger.info(f"총 신규 유저: {total_users:,}명")
|
|
|
|
|
logger.info(f"D+1 리텐션: {retained_d1:,}명 ({retention_rate:.1f}%)")
|
|
|
|
|
logger.info(f"평균 활동 시간: {sum(r.get('active_seconds', 0) for r in results) / total_users / 60:.1f}분")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.warning("사용자에 의해 중단되었습니다.")
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
except Exception as e:
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error(f"예상치 못한 오류: {e}")
|
2025-08-27 01:10:50 +09:00
|
|
|
import traceback
|
2025-08-27 02:15:45 +09:00
|
|
|
logger.error(traceback.format_exc())
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
stop_timer_event.set()
|
|
|
|
|
end_time = time.time()
|
|
|
|
|
total_time = str(timedelta(seconds=int(end_time - overall_start_time)))
|
2025-08-27 02:15:45 +09:00
|
|
|
end_kst = datetime.now(KST)
|
|
|
|
|
|
|
|
|
|
logger.info("=" * 80)
|
|
|
|
|
logger.info(f"분석 종료 시간: {end_kst.strftime('%Y-%m-%dT%H:%M:%S+09:00')}")
|
|
|
|
|
logger.info(f"총 소요 시간: {total_time}")
|
|
|
|
|
logger.info("수정된 분석 완료!")
|
|
|
|
|
logger.info("=" * 80)
|
2025-08-27 01:10:50 +09:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|