Enhance new user cohort selection by utilizing create_uid index and improving logging details

This commit is contained in:
Gnill82
2025-08-28 21:54:25 +09:00
parent 829d504936
commit ff1878cc90

View File

@ -528,10 +528,13 @@ def get_new_user_cohort_optimized(
end_time: str,
page_size: int = DEFAULT_COMPOSITE_SIZE
) -> Dict[str, Dict]:
"""Composite Aggregation을 활용한 신규 유저 코호트 선정"""
"""Composite Aggregation을 활용한 신규 유저 코호트 선정
ds-logs-live-create_uid 인덱스를 사용하여 실제 계정 생성 시점 기준으로 신규 유저를 판별
"""
logger.info("=" * 80)
logger.info("1단계: 신규 유저 코호트 선정 (Composite Aggregation)")
logger.info("1단계: 신규 유저 코호트 선정 (create_uid 기준)")
logger.info(f"분석 기간: {format_kst_time(start_time)} ~ {format_kst_time(end_time)}")
logger.info(f"페이지 크기: {page_size}")
@ -539,7 +542,8 @@ def get_new_user_cohort_optimized(
after_key = None
total_users = 0
base_query = {
# Step 1: create_uid 인덱스에서 분석 기간 중 생성된 신규 유저 추출
create_uid_query = {
"size": 0,
"query": {
"bool": {
@ -553,43 +557,32 @@ def get_new_user_cohort_optimized(
"composite": {
"size": page_size,
"sources": [
{"auth_id": {"terms": {"field": "auth.id.keyword"}}},
{"uid": {"terms": {"field": "uid.keyword"}}}
{"uid": {"terms": {"field": "uid.keyword"}}},
{"auth_id": {"terms": {"field": "auth.id.keyword"}}}
]
},
"aggs": {
"first_login": {"min": {"field": "@timestamp"}},
"user_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["body.device_mod", "body.nickname"]
}
},
"latest_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname", "body.language"]
}
}
"first_create": {"min": {"field": "@timestamp"}}
}
}
}
}
# 신규 생성된 유저 수집
new_user_map = {} # uid -> {'auth_id': ..., 'create_time': ...}
page_count = 0
while True:
page_count += 1
query = base_query.copy()
query = create_uid_query.copy()
if after_key:
query["aggs"]["new_users"]["composite"]["after"] = after_key
try:
logger.info(f"페이지 {page_count} 처리 중...")
logger.info(f"create_uid 페이지 {page_count} 처리 중...")
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-login_comp",
index="ds-logs-live-create_uid",
body=query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
@ -600,15 +593,98 @@ def get_new_user_cohort_optimized(
break
for bucket in buckets:
auth_id = bucket["key"]["auth_id"]
uid = bucket["key"]["uid"]
auth_id = bucket["key"]["auth_id"]
first_create_utc = bucket["first_create"]["value_as_string"]
# 가장 빠른 create 시간만 저장 (client_event로 인한 중복 처리)
if uid not in new_user_map or first_create_utc < new_user_map[uid]["create_time"]:
new_user_map[uid] = {
"auth_id": auth_id,
"create_time": first_create_utc
}
logger.info(f"create_uid 페이지 {page_count}: {len(buckets)}개 처리됨")
after_key = response["aggregations"]["new_users"].get("after_key")
if not after_key:
break
except Exception as e:
logger.error(f"create_uid 처리 중 오류 (페이지 {page_count}): {e}")
break
logger.info(f"{len(new_user_map)}명의 신규 유저 확인됨")
# Step 2: login_comp 인덱스에서 해당 유저들의 추가 정보 수집
if not new_user_map:
logger.warning("신규 유저가 없습니다.")
return cohort
# 유저 청크 단위로 처리
uid_list = list(new_user_map.keys())
chunk_size = 100
for i in range(0, len(uid_list), chunk_size):
chunk_uids = uid_list[i:i+chunk_size]
login_query = {
"size": 0,
"query": {
"bool": {
"filter": [
{"terms": {"uid.keyword": chunk_uids}}
]
}
},
"aggs": {
"users": {
"terms": {
"field": "uid.keyword",
"size": chunk_size
},
"aggs": {
"first_login": {"min": {"field": "@timestamp"}},
"user_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "asc"}}],
"_source": ["body.device_mod", "body.nickname"]
}
},
"latest_info": {
"top_hits": {
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}],
"_source": ["body.nickname", "body.language"]
}
}
}
}
}
}
try:
response = exponential_backoff_retry(
client.search,
index="ds-logs-live-login_comp",
body=login_query,
request_timeout=DEFAULT_TIMEOUT,
track_total_hits=False
)
for bucket in response["aggregations"]["users"]["buckets"]:
uid = bucket["key"]
first_login_utc = bucket["first_login"]["value_as_string"]
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"]
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"]
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {}
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {}
# create_uid 정보와 병합
cohort[uid] = {
'auth_id': auth_id,
'auth_id': new_user_map[uid]["auth_id"],
'create_time_utc': new_user_map[uid]["create_time"],
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
'first_login_utc': first_login_utc,
'first_login_kst': format_kst_time(first_login_utc),
'first_login_dt': datetime.fromisoformat(first_login_utc.replace('Z', '+00:00')),
@ -617,18 +693,11 @@ def get_new_user_cohort_optimized(
'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
}
total_users += 1
logger.info(f"페이지 {page_count}: {len(buckets)}명 처리됨 (누적: {total_users}명)")
after_key = response["aggregations"]["new_users"].get("after_key")
if not after_key:
break
except Exception as e:
logger.error(f"코호트 선정 중 오류 (페이지 {page_count}): {e}")
break
logger.error(f"login_comp 정보 수집 중 오류: {e}")
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정")
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
logger.info("=" * 80)
return cohort
@ -900,11 +969,12 @@ def process_fixed_batch(
user_session_metrics = session_metrics.get(uid, {})
user_responses = msearch_responses[idx * metrics_per_user : (idx + 1) * metrics_per_user]
# 기본 정보
# 기본 정보 (create_time 추가)
result = {
'uid': uid,
'auth_id': user_data['auth_id'],
'nickname': user_data['nickname'],
'create_time': user_data.get('create_time_kst', user_data.get('first_login_kst')), # create_time이 있으면 사용, 없으면 first_login 사용
'first_login_time': user_data['first_login_kst'],
'retention_status': 'Retained_d0',
'language': user_data['language'],
@ -1166,9 +1236,9 @@ def write_fixed_results(results: List[Dict], output_path: str) -> None:
logger.error("저장할 결과 데이터가 없습니다.")
return
# 수정된 헤더 (retention_d1 제거)
# 수정된 헤더 (create_time 추가)
headers = [
'uid', 'auth_id', 'nickname', 'first_login_time', 'retention_status', 'language', 'device',
'uid', 'auth_id', 'nickname', 'create_time', 'first_login_time', 'retention_status', 'language', 'device',
'active_seconds', 'total_playtime_minutes', 'session_count', 'avg_session_length', 'logout_abnormal',
'dungeon_entry_count', 'dungeon_first_mode', 'dungeon_first_stalker', 'dungeon_first_result',
'dungeon_escape_count', 'dungeon_escape_rate', 'avg_survival_time', 'max_survival_time',