diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index 7389610..d6fa43c 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -528,10 +528,13 @@ def get_new_user_cohort_optimized( end_time: str, page_size: int = DEFAULT_COMPOSITE_SIZE ) -> Dict[str, Dict]: - """Composite Aggregation을 활용한 신규 유저 코호트 선정""" + """Composite Aggregation을 활용한 신규 유저 코호트 선정 + + ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별 + """ logger.info("=" * 80) - logger.info("1단계: 신규 유저 코호트 선정 (Composite Aggregation)") + logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준)") logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}") logger.info(f"페이지 크기: {page_size}") @@ -539,7 +542,8 @@ def get_new_user_cohort_optimized( after_key = None total_users = 0 - base_query = { + # Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출 + create_uid_query = { "size": 0, "query": { "bool": { @@ -553,43 +557,32 @@ def get_new_user_cohort_optimized( "composite": { "size": page_size, "sources": [ - {"auth_id": {"terms": {"field": "auth.id.keyword"}}}, - {"uid": {"terms": {"field": "uid.keyword"}}} + {"uid": {"terms": {"field": "uid.keyword"}}}, + {"auth_id": {"terms": {"field": "auth.id.keyword"}}} ] }, "aggs": { - "first_login": {"min": {"field": "@timestamp"}}, - "user_info": { - "top_hits": { - "size": 1, - "sort": [{"@timestamp": {"order": "asc"}}], - "_source": ["body.device_mod", "body.nickname"] - } - }, - "latest_info": { - "top_hits": { - "size": 1, - "sort": [{"@timestamp": {"order": "desc"}}], - "_source": ["body.nickname", "body.language"] - } - } + "first_create": {"min": {"field": "@timestamp"}} } } } } + # 신규 생성된 유저 수집 + new_user_map = {} # uid -> {'auth_id': ..., 'create_time': ...} page_count = 0 + while True: page_count += 1 - query = base_query.copy() + query = create_uid_query.copy() if after_key: query["aggs"]["new_users"]["composite"]["after"] = after_key try: - logger.info(f"페이지 {page_count} 처리 중...") + logger.info(f"create_uid 페이지 {page_count} 처리 중...") response = exponential_backoff_retry( client.search, - index="ds-logs-live-login_comp", + index="ds-logs-live-create_uid", body=query, request_timeout=DEFAULT_TIMEOUT, track_total_hits=False @@ -600,15 +593,98 @@ def get_new_user_cohort_optimized( break for bucket in buckets: - auth_id = bucket["key"]["auth_id"] uid = bucket["key"]["uid"] + auth_id = bucket["key"]["auth_id"] + first_create_utc = bucket["first_create"]["value_as_string"] + + # 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리) + if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]: + new_user_map[uid] = { + "auth_id": auth_id, + "create_time": first_create_utc + } + + logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨") + + after_key = response["aggregations"]["new_users"].get("after_key") + if not after_key: + break + + except Exception as e: + logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}") + break + + logger.info(f"총 {len(new_user_map)}명의 신규 유저 확인됨") + + # Step 2: login_comp 인덱스에서 해당 유저들의 추가 정보 수집 + if not new_user_map: + logger.warning("신규 유저가 없습니다.") + return cohort + + # 유저 청크 단위로 처리 + uid_list = list(new_user_map.keys()) + chunk_size = 100 + + for i in range(0, len(uid_list), chunk_size): + chunk_uids = uid_list[i:i+chunk_size] + + login_query = { + "size": 0, + "query": { + "bool": { + "filter": [ + {"terms": {"uid.keyword": chunk_uids}} + ] + } + }, + "aggs": { + "users": { + "terms": { + "field": "uid.keyword", + "size": chunk_size + }, + "aggs": { + "first_login": {"min": {"field": "@timestamp"}}, + "user_info": { + "top_hits": { + "size": 1, + "sort": [{"@timestamp": {"order": "asc"}}], + "_source": ["body.device_mod", "body.nickname"] + } + }, + "latest_info": { + "top_hits": { + "size": 1, + "sort": [{"@timestamp": {"order": "desc"}}], + "_source": ["body.nickname", "body.language"] + } + } + } + } + } + } + + try: + response = exponential_backoff_retry( + client.search, + index="ds-logs-live-login_comp", + body=login_query, + request_timeout=DEFAULT_TIMEOUT, + track_total_hits=False + ) + + for bucket in response["aggregations"]["users"]["buckets"]: + uid = bucket["key"] first_login_utc = bucket["first_login"]["value_as_string"] - user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] - latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] + user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {} + latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {} + # create_uid 정보와 병합 cohort[uid] = { - 'auth_id': auth_id, + 'auth_id': new_user_map[uid]["auth_id"], + 'create_time_utc': new_user_map[uid]["create_time"], + 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), 'first_login_utc': first_login_utc, 'first_login_kst': format_kst_time(first_login_utc), 'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')), @@ -617,18 +693,11 @@ def get_new_user_cohort_optimized( 'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') } total_users += 1 - - logger.info(f"페이지 {page_count}: {len(buckets)}명 처리됨 (누적: {total_users}명)") - - after_key = response["aggregations"]["new_users"].get("after_key") - if not after_key: - break except Exception as e: - logger.error(f"코호트 선정 중 오류 (페이지 {page_count}): {e}") - break + logger.error(f"login_comp 정보 수집 중 오류: {e}") - logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정") + logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") logger.info("=" * 80) return cohort @@ -900,11 +969,12 @@ def process_fixed_batch( user_session_metrics = session_metrics.get(uid, {}) user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user] - # 기본 정보 + # 기본 정보 (create_time 추가) result = { 'uid': uid, 'auth_id': user_data['auth_id'], 'nickname': user_data['nickname'], + 'create_time': user_data.get('create_time_kst', user_data.get('first_login_kst')), # create_time이 있으면 사용, 없으면 first_login 사용 'first_login_time': user_data['first_login_kst'], 'retention_status': 'Retained_d0', 'language': user_data['language'], @@ -1166,9 +1236,9 @@ def write_fixed_results(results: List[Dict], output_path: str) -> None: logger.error("저장할 결과 데이터가 없습니다.") return - # 수정된 헤더 (retention_d1 제거) + # 수정된 헤더 (create_time 추가) headers = [ - 'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'language', 'device', + 'uid', 'auth_id', 'nickname', 'create_time', 'first_login_time', 'retention_status', 'language', 'device', 'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal', 'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result', 'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time',