완료된 수정사항
1. create_uid 기준 전체 코호트 포함
- create_uid에서 식별된 모든 신규 유저(126,666명)를 코호트에 포함
- 계정만 생성하고 이탈한 유저도 분석 대상에 포함
2. log_return_to_lobby 차선책 로직 추가
- login_comp에서 데이터를 수집하지 못한 유저들에 대해
- log_return_to_lobby 인덱스에서 body.nickname 수집
- 우선순위: login_comp → log_return_to_lobby
3. 성능 최적화 설정 업데이트
- DEFAULT_BATCH_SIZE: 500 → 2000 (4배 증가)
- DEFAULT_MAX_WORKERS: 6 → 16 (2.7배 증가)
- DEFAULT_COMPOSITE_SIZE: 500 → 2000 (4배 증가)
🔍 변경된 처리 로직
1. Step 1: create_uid에서 신규 유저 식별 (기존과 동일)
2. Step 2: heartbeat에서 auth.id 수집 (기존과 동일)
3. Step 3: 모든 create_uid 유저를 코호트에 우선 추가
4. Step 4: login_comp에서 추가 정보 수집 (1차 우선)
5. Step 5: log_return_to_lobby에서 차선 정보 수집
이제 누락 없이 모든 신규 유저가 분석 대상에 포함되며, 처리 속도도 크게 향상될 것입니다.
This commit is contained in:
@ -58,10 +58,10 @@ OPENSEARCH_CONFIG = {
|
||||
# 한국 표준시 설정
|
||||
KST = timezone(timedelta(hours=9))
|
||||
|
||||
# 성능 최적화 설정
|
||||
DEFAULT_BATCH_SIZE = 500
|
||||
DEFAULT_MAX_WORKERS = 6
|
||||
DEFAULT_COMPOSITE_SIZE = 500
|
||||
# 성능 최적화 설정 (오픈서치 스펙 기반 최적화)
|
||||
DEFAULT_BATCH_SIZE = 2000
|
||||
DEFAULT_MAX_WORKERS = 16
|
||||
DEFAULT_COMPOSITE_SIZE = 2000
|
||||
DEFAULT_TIMEOUT = 180
|
||||
SCROLL_TIMEOUT = "5m"
|
||||
SESSION_GAP_MINUTES = 5
|
||||
@ -680,9 +680,25 @@ def get_new_user_cohort_optimized(
|
||||
auth_id_count = sum(1 for uid in new_user_map if new_user_map[uid]["auth_id"] is not None)
|
||||
logger.info(f"auth.id 수집 완료: {auth_id_count}/{len(new_user_map)}명")
|
||||
|
||||
# Step 3: login_comp 인덱스에서 추가 정보 수집
|
||||
# Step 3: 모든 create_uid 유저를 cohort에 추가하고, 추가 정보 수집
|
||||
logger.info("모든 신규 유저를 cohort에 추가하고 추가 정보 수집 중...")
|
||||
|
||||
# 모든 create_uid 유저를 cohort에 먼저 추가
|
||||
for uid in uid_list:
|
||||
cohort[uid] = {
|
||||
'auth_id': new_user_map[uid]["auth_id"] or 'N/A',
|
||||
'create_time_utc': new_user_map[uid]["create_time"],
|
||||
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
|
||||
'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')),
|
||||
'language': 'N/A',
|
||||
'device': 'N/A',
|
||||
'nickname': 'N/A'
|
||||
}
|
||||
total_users += 1
|
||||
|
||||
# login_comp 인덱스에서 추가 정보 수집 (1차 우선)
|
||||
logger.info("login_comp 인덱스에서 추가 정보 수집 중...")
|
||||
# 유저 청크 단위로 처리
|
||||
login_comp_collected = set()
|
||||
|
||||
for i in range(0, len(uid_list), chunk_size):
|
||||
chunk_uids = uid_list[i:i+chunk_size]
|
||||
@ -703,7 +719,6 @@ def get_new_user_cohort_optimized(
|
||||
"size": chunk_size
|
||||
},
|
||||
"aggs": {
|
||||
"first_login": {"min": {"field": "@timestamp"}},
|
||||
"user_info": {
|
||||
"top_hits": {
|
||||
"size": 1,
|
||||
@ -734,27 +749,84 @@ def get_new_user_cohort_optimized(
|
||||
|
||||
for bucket in response["aggregations"]["users"]["buckets"]:
|
||||
uid = bucket["key"]
|
||||
first_login_utc = bucket["first_login"]["value_as_string"]
|
||||
|
||||
user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {}
|
||||
latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {}
|
||||
|
||||
# create_uid 정보와 병합 (first_login 제거, create_time으로 통합)
|
||||
# auth_id는 heartbeat에서 수집되었거나 N/A
|
||||
cohort[uid] = {
|
||||
'auth_id': new_user_map[uid]["auth_id"] or 'N/A',
|
||||
'create_time_utc': new_user_map[uid]["create_time"],
|
||||
'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]),
|
||||
'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')),
|
||||
'language': latest_info_hit.get('body', {}).get('language', 'N/A'),
|
||||
'device': user_hit.get('body', {}).get('device_mod', 'N/A'),
|
||||
'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
|
||||
}
|
||||
total_users += 1
|
||||
# 기존 cohort 정보 업데이트
|
||||
if uid in cohort:
|
||||
cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A')
|
||||
cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A')
|
||||
cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A')
|
||||
login_comp_collected.add(uid)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"login_comp 정보 수집 중 오류: {e}")
|
||||
|
||||
logger.info(f"login_comp에서 {len(login_comp_collected)}명의 추가 정보 수집 완료")
|
||||
|
||||
# Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (nickname만)
|
||||
missing_uids = [uid for uid in uid_list if uid not in login_comp_collected and cohort[uid]['nickname'] == 'N/A']
|
||||
|
||||
if missing_uids:
|
||||
logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중...")
|
||||
lobby_collected = set()
|
||||
|
||||
for i in range(0, len(missing_uids), chunk_size):
|
||||
chunk_uids = missing_uids[i:i+chunk_size]
|
||||
|
||||
lobby_query = {
|
||||
"size": 0,
|
||||
"query": {
|
||||
"bool": {
|
||||
"filter": [
|
||||
{"terms": {"uid.keyword": chunk_uids}}
|
||||
]
|
||||
}
|
||||
},
|
||||
"aggs": {
|
||||
"users": {
|
||||
"terms": {
|
||||
"field": "uid.keyword",
|
||||
"size": chunk_size
|
||||
},
|
||||
"aggs": {
|
||||
"nickname_info": {
|
||||
"top_hits": {
|
||||
"size": 1,
|
||||
"sort": [{"@timestamp": {"order": "desc"}}],
|
||||
"_source": ["body.nickname"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
response = exponential_backoff_retry(
|
||||
client.search,
|
||||
index="ds-logs-live-log_return_to_lobby",
|
||||
body=lobby_query,
|
||||
request_timeout=DEFAULT_TIMEOUT,
|
||||
track_total_hits=False
|
||||
)
|
||||
|
||||
for bucket in response["aggregations"]["users"]["buckets"]:
|
||||
uid = bucket["key"]
|
||||
|
||||
nickname_hit = bucket["nickname_info"]["hits"]["hits"][0]["_source"] if bucket["nickname_info"]["hits"]["hits"] else {}
|
||||
|
||||
# nickname만 업데이트 (다른 정보는 login_comp가 우선)
|
||||
if uid in cohort and cohort[uid]['nickname'] == 'N/A':
|
||||
cohort[uid]['nickname'] = nickname_hit.get('body', {}).get('nickname', 'N/A')
|
||||
lobby_collected.add(uid)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}")
|
||||
|
||||
logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 닉네임 수집 완료")
|
||||
|
||||
logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)")
|
||||
logger.info("=" * 80)
|
||||
return cohort
|
||||
|
||||
Reference in New Issue
Block a user