From 02333ae272dd72163ffaa7b427be3a2641296a56 Mon Sep 17 00:00:00 2001 From: Gnill82 Date: Fri, 29 Aug 2025 13:43:23 +0900 Subject: [PATCH] =?UTF-8?q?=EC=99=84=EB=A3=8C=EB=90=9C=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=EC=82=AC=ED=95=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. create_uid 기준 전체 코호트 포함 - create_uid에서 식별된 모든 신규 유저(126,666명)를 코호트에 포함 - 계정만 생성하고 이탈한 유저도 분석 대상에 포함 2. log_return_to_lobby 차선책 로직 추가 - login_comp에서 데이터를 수집하지 못한 유저들에 대해 - log_return_to_lobby 인덱스에서 body.nickname 수집 - 우선순위: login_comp → log_return_to_lobby 3. 성능 최적화 설정 업데이트 - DEFAULT_BATCH_SIZE: 500 → 2000 (4배 증가) - DEFAULT_MAX_WORKERS: 6 → 16 (2.7배 증가) - DEFAULT_COMPOSITE_SIZE: 500 → 2000 (4배 증가) 🔍 변경된 처리 로직 1. Step 1: create_uid에서 신규 유저 식별 (기존과 동일) 2. Step 2: heartbeat에서 auth.id 수집 (기존과 동일) 3. Step 3: 모든 create_uid 유저를 코호트에 우선 추가 4. Step 4: login_comp에서 추가 정보 수집 (1차 우선) 5. Step 5: log_return_to_lobby에서 차선 정보 수집 이제 누락 없이 모든 신규 유저가 분석 대상에 포함되며, 처리 속도도 크게 향상될 것입니다. --- ds_new_user_analy.py | 112 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 92 insertions(+), 20 deletions(-) diff --git a/ds_new_user_analy.py b/ds_new_user_analy.py index d8c0b6b..cf84da3 100644 --- a/ds_new_user_analy.py +++ b/ds_new_user_analy.py @@ -58,10 +58,10 @@ OPENSEARCH_CONFIG = { # 한국 표준시 설정 KST = timezone(timedelta(hours=9)) -# 성능 최적화 설정 -DEFAULT_BATCH_SIZE = 500 -DEFAULT_MAX_WORKERS = 6 -DEFAULT_COMPOSITE_SIZE = 500 +# 성능 최적화 설정 (오픈서치 스펙 기반 최적화) +DEFAULT_BATCH_SIZE = 2000 +DEFAULT_MAX_WORKERS = 16 +DEFAULT_COMPOSITE_SIZE = 2000 DEFAULT_TIMEOUT = 180 SCROLL_TIMEOUT = "5m" SESSION_GAP_MINUTES = 5 @@ -680,9 +680,25 @@ def get_new_user_cohort_optimized( auth_id_count = sum(1 for uid in new_user_map if new_user_map[uid]["auth_id"] is not None) logger.info(f"auth.id 수집 완료: {auth_id_count}/{len(new_user_map)}명") - # Step 3: login_comp 인덱스에서 추가 정보 수집 + # Step 3: 모든 create_uid 유저를 cohort에 추가하고, 추가 정보 수집 + logger.info("모든 신규 유저를 cohort에 추가하고 추가 정보 수집 중...") + + # 모든 create_uid 유저를 cohort에 먼저 추가 + for uid in uid_list: + cohort[uid] = { + 'auth_id': new_user_map[uid]["auth_id"] or 'N/A', + 'create_time_utc': new_user_map[uid]["create_time"], + 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), + 'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')), + 'language': 'N/A', + 'device': 'N/A', + 'nickname': 'N/A' + } + total_users += 1 + + # login_comp 인덱스에서 추가 정보 수집 (1차 우선) logger.info("login_comp 인덱스에서 추가 정보 수집 중...") - # 유저 청크 단위로 처리 + login_comp_collected = set() for i in range(0, len(uid_list), chunk_size): chunk_uids = uid_list[i:i+chunk_size] @@ -703,7 +719,6 @@ def get_new_user_cohort_optimized( "size": chunk_size }, "aggs": { - "first_login": {"min": {"field": "@timestamp"}}, "user_info": { "top_hits": { "size": 1, @@ -734,27 +749,84 @@ def get_new_user_cohort_optimized( for bucket in response["aggregations"]["users"]["buckets"]: uid = bucket["key"] - first_login_utc = bucket["first_login"]["value_as_string"] user_hit = bucket["user_info"]["hits"]["hits"][0]["_source"] if bucket["user_info"]["hits"]["hits"] else {} latest_info_hit = bucket["latest_info"]["hits"]["hits"][0]["_source"] if bucket["latest_info"]["hits"]["hits"] else {} - # create_uid 정보와 병합 (first_login 제거, create_time으로 통합) - # auth_id는 heartbeat에서 수집되었거나 N/A - cohort[uid] = { - 'auth_id': new_user_map[uid]["auth_id"] or 'N/A', - 'create_time_utc': new_user_map[uid]["create_time"], - 'create_time_kst': format_kst_time(new_user_map[uid]["create_time"]), - 'create_time_dt': datetime.fromisoformat(new_user_map[uid]["create_time"].replace('Z', '+00:00')), - 'language': latest_info_hit.get('body', {}).get('language', 'N/A'), - 'device': user_hit.get('body', {}).get('device_mod', 'N/A'), - 'nickname': latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') - } - total_users += 1 + # 기존 cohort 정보 업데이트 + if uid in cohort: + cohort[uid]['language'] = latest_info_hit.get('body', {}).get('language', 'N/A') + cohort[uid]['device'] = user_hit.get('body', {}).get('device_mod', 'N/A') + cohort[uid]['nickname'] = latest_info_hit.get('body', {}).get('nickname') or user_hit.get('body', {}).get('nickname', 'N/A') + login_comp_collected.add(uid) except Exception as e: logger.error(f"login_comp 정보 수집 중 오류: {e}") + logger.info(f"login_comp에서 {len(login_comp_collected)}명의 추가 정보 수집 완료") + + # Step 4: log_return_to_lobby 인덱스에서 차선 정보 수집 (nickname만) + missing_uids = [uid for uid in uid_list if uid not in login_comp_collected and cohort[uid]['nickname'] == 'N/A'] + + if missing_uids: + logger.info(f"log_return_to_lobby 인덱스에서 {len(missing_uids)}명의 차선 정보 수집 중...") + lobby_collected = set() + + for i in range(0, len(missing_uids), chunk_size): + chunk_uids = missing_uids[i:i+chunk_size] + + lobby_query = { + "size": 0, + "query": { + "bool": { + "filter": [ + {"terms": {"uid.keyword": chunk_uids}} + ] + } + }, + "aggs": { + "users": { + "terms": { + "field": "uid.keyword", + "size": chunk_size + }, + "aggs": { + "nickname_info": { + "top_hits": { + "size": 1, + "sort": [{"@timestamp": {"order": "desc"}}], + "_source": ["body.nickname"] + } + } + } + } + } + } + + try: + response = exponential_backoff_retry( + client.search, + index="ds-logs-live-log_return_to_lobby", + body=lobby_query, + request_timeout=DEFAULT_TIMEOUT, + track_total_hits=False + ) + + for bucket in response["aggregations"]["users"]["buckets"]: + uid = bucket["key"] + + nickname_hit = bucket["nickname_info"]["hits"]["hits"][0]["_source"] if bucket["nickname_info"]["hits"]["hits"] else {} + + # nickname만 업데이트 (다른 정보는 login_comp가 우선) + if uid in cohort and cohort[uid]['nickname'] == 'N/A': + cohort[uid]['nickname'] = nickname_hit.get('body', {}).get('nickname', 'N/A') + lobby_collected.add(uid) + + except Exception as e: + logger.error(f"log_return_to_lobby 정보 수집 중 오류: {e}") + + logger.info(f"log_return_to_lobby에서 {len(lobby_collected)}명의 닉네임 수집 완료") + logger.info(f"1단계 완료: 총 {total_users}명의 신규 유저 코호트 확정 (create_uid 기준)") logger.info("=" * 80) return cohort