import json import time import sys import csv from tqdm import tqdm from datetime import datetime, timezone from collections import defaultdict from typing import Dict, List, Optional, Tuple, Iterable, Set from opensearchpy import OpenSearch from nickname import fetch_nicknames from utils import load_checkpoint, save_checkpoint # ========================= # 설정 # ========================= OPENSEARCH_HOSTS = [{"host": "ds-opensearch.oneunivrs.com", "port": 9200, "scheme": "https"}] AUTH = ("admin", "DHp5#r#GYQ9d") ATTACK_LOG_INDEX = "ds-logs-live-player_attack" SKILL_USE_LOG_INDEX = "ds-logs-live-skill_use" SORT_FIELD = "@timestamp" # 1차 정렬 필드(날짜형 권장) PAGE_SIZE = 500 # 페이지 크기 INTERVAL_SEC = 60 # 주기(초) CHECKPOINT_PATH = "./checkpoint.txt" # 마지막 처리 지점을 저장 OUTPUT_DIR = "./data" # 저장 디렉터리(ndjson) REQUEST_TIMEOUT_SECONDS = 1800 TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S") OUTPUT_FILENAME = f"ds-info-hacklogs_{TIMESTAMP}.csv" # 검색 기본 쿼리(필요 시 수정). 여기선 새 문서 전체를 시간 오름차순으로 순회. BASE_QUERY = { "bool": { "must": [ # 예) {"term": {"type": "log"}} ], "filter": [ # 이후 search_after로 이어붙이므로 range는 필수 아님. ] } } # ========================= # OpenSearch 클라이언트 # ========================= def make_client() -> OpenSearch: return OpenSearch( hosts=OPENSEARCH_HOSTS, http_auth=AUTH, use_ssl=OPENSEARCH_HOSTS[0].get("scheme", "http") == "https", verify_certs=False, # 필요 시 True ssl_show_warn=False, timeout=60, max_retries=3, retry_on_timeout=True, headers={"Connection": "close"} ) # ========================= # 필요한 로그를 주어진 시간범위안에서 모두 가져온다 # ========================= def find_candidate_groups_all( client, start_time, end_time=None, index: str = ATTACK_LOG_INDEX, page_size: int = 1000, # 1000~2000 권장 loop = 10, ) -> set[tuple[str, str]]: """ (target_id, ingame_srl) 후보 그룹을 전 범위에서 추출. - start_time: ISO8601 문자열. 예) "2025-08-18T00:00:00Z" - end_time: 지정하지 않으면(start_time~로그 끝) lte 조건 없이 진행 - page_size: composite 페이지 크기 (권장 1000~2000) 반환: { (target_id, ingame_srl), ... } """ print(f"후보 그룹을 추출합니다. 한번 실행에 최대 {page_size * loop} 개의 데이터를 조사합니다") # 기본 range: gte만 필수, end_time 없으면 lte 생략 ts_range = {"gte": start_time, "format": "strict_date_optional_time"} if end_time: ts_range["lte"] = end_time # 공통 필터(필요 시 수정) filters = [ {"range": {"@timestamp": ts_range}}, {"terms": {"body.inter_type": [0, 2]}}, # 원 코드 로직 유지 ] body = { "size": 0, "query": {"bool": {"filter": filters}}, "aggs": { "candidate_groups": { "composite": { "size": page_size, "sources": [ {"target_id": {"terms": {"field": "body.target_id.keyword"}}}, {"ingame_srl": {"terms": {"field": "body.ingame_srl.keyword"}}} ] }, # 버킷 최소 문서 수 필터(예: 동일 키 2건 이상만) "aggs": { # 각 그룹 안에서 @timestamp의 최댓값(마지막 시간)을 구함 "last_ts": {"max": {"field": "@timestamp"}}, "min_doc_count_filter": { "bucket_selector": { "buckets_path": {"doc_count": "_count"}, "script": "params.doc_count >= 2" } } } } } } after = None out: set[tuple[str, str]] = set() def lastts_from_agg(agg_obj: dict) -> Optional[str]: """ max 집계 결과에서 ISO8601 문자열을 뽑아낸다. date 매핑이면 value_as_string이 오고, 아니면 epoch_millis가 value로 온다. """ if not agg_obj: return None # 선호: value_as_string (매핑이 date일 때 제공) vas = agg_obj.get("value_as_string") if vas: vas = vas.strip() if vas.endswith('Z'): vas = vas[:-1] return datetime.fromisoformat(vas) # 폴백: value(에포크 밀리초) → ISO v = agg_obj.get("value") if v is None: return None try: # OpenSearch는 epoch_millis를 float로 주기도 함 return datetime.fromtimestamp(float(v) / 1000.0, tz=timezone.utc) except Exception: return None max_last_ts = datetime.fromtimestamp(0) for _ in range(loop): if after: print("query after : " , after) body["aggs"]["candidate_groups"]["composite"]["after"] = after # composite agg는 PIT 미지원. 일반 search로 수행 resp = client.search(index=index, body=body, request_timeout=120) agg = resp.get("aggregations", {}).get("candidate_groups", {}) buckets = agg.get("buckets", []) if not buckets: break for b in buckets: k = b.get("key", {}) tgt = k.get("target_id") srl = k.get("ingame_srl") last_ts = lastts_from_agg(b.get("last_ts", {})) max_last_ts = last_ts if last_ts > max_last_ts else max_last_ts if tgt is not None and srl is not None: out.add((tgt, srl)) after = agg.get("after_key") if not after: break end_time_str = max_last_ts.isoformat().replace("+00:00", "Z") if not end_time_str.endswith("Z"): end_time_str += "Z" return out, end_time_str def _backoff_delays(): """1, 2, 4, 8초 대기.""" for i in range(4): yield 2 ** i def _build_msearch_body( pairs: List[Tuple[str, str]], start_time: str, end_time: Optional[str], source_fields: List[str], detail_size: int, ) -> str: """msearch NDJSON 바디 생성.""" ts_range = {"gte": start_time, "format": "strict_date_optional_time"} if end_time: ts_range["lte"] = end_time body_list: List[Dict] = [] for (target_id, ingame_srl) in pairs: header = {"index": ATTACK_LOG_INDEX} query = { "size": detail_size, "track_total_hits": False, "_source": source_fields, "query": { "bool": { "filter": [ {"term": {"body.target_id.keyword": target_id}}, {"term": {"body.ingame_srl.keyword": ingame_srl}}, {"term": {"body.skill_type": 1}}, {"range": {"@timestamp": ts_range}}, ] } } } body_list.append(header) body_list.append(query) return "\n".join(json.dumps(x, ensure_ascii=False) for x in body_list) + "\n" # ========================= # 1) 분석 전용 (파일 I/O 없음) # ========================= def analyse_candidates_msearch( client, candidate_keys: Iterable[Tuple[str, str]], start_time: str, end_time: Optional[str] = None, chunk_size: int = 50, # msearch 묶음 크기(권장 50~100) detail_size: int = 1000, # 각 소쿼리 size source_fields: Optional[List[str]] = None, ): """ 후보 (target_id, ingame_srl) 키들을 msearch로 조회해 - 그룹핑(동일 key) - 빈도 필터(2~8) - 스킬 로그 교차검증(제외) 을 거친 'verified_hack_logs'를 배치 단위로 yield. yield: (verified_hack_logs: List[Dict], headers: List[str]) """ if source_fields is None: source_fields = [ "uid", "@timestamp", "body.ingame_srl", "body.target_id", "body.bef_xyz", "body.xyz", "body.damage_part", "body.damage_direction", "body.inter_type", "body.skill_type", ] candidate_list = list(candidate_keys) if not candidate_list: print("\n[3단계] 핵 의심 후보 그룹이 없어 추가 작업을 진행하지 않습니다.") return chunk_size = max(10, min(100, chunk_size)) print(f"\n[3단계] 총 {len(candidate_list)}개의 후보 그룹을 {chunk_size}개씩 나누어 분석을 시작합니다...") try: with tqdm(total=len(candidate_list), desc=" - 후보군 처리 진행률", unit=" groups") as pbar: for i in range(0, len(candidate_list), chunk_size): chunk = candidate_list[i:i + chunk_size] # 1) msearch 조회 m_body = _build_msearch_body( pairs=chunk, start_time=start_time, end_time=end_time, source_fields=source_fields, detail_size=detail_size, ) resp = None last_err = None for delay in _backoff_delays(): try: resp = client.msearch(body=m_body, request_timeout=REQUEST_TIMEOUT_SECONDS) last_err = None break except Exception as e: last_err = e tqdm.write(f"\n - msearch 실패, {delay}s 후 재시도: {e}", file=sys.stderr) time.sleep(delay) if last_err is not None: tqdm.write(f"\n - ❌ 상세 로그 추출 중 오류 발생 (배치 {i//chunk_size + 1}): {last_err}", file=sys.stderr) pbar.update(len(chunk)) continue responses = resp.get("responses", []) batch_logs: List[Dict] = [] for sub in responses: hits = sub.get("hits", {}).get("hits", []) if hits: batch_logs.extend(h["_source"] for h in hits) # 2) 그룹핑 → 빈도 필터(2~8) grouped_logs: Dict[Tuple, List[Dict]] = defaultdict(list) for log in batch_logs: body = log.get('body', {}) bef_xyz = body.get('bef_xyz') xyz = body.get('xyz') dmg_dir = body.get('damage_direction') key = ( log.get('@timestamp'), body.get('target_id'), body.get('ingame_srl'), tuple(bef_xyz) if isinstance(bef_xyz, list) else bef_xyz, tuple(xyz) if isinstance(xyz, list) else xyz, body.get('damage_part'), tuple(dmg_dir) if isinstance(dmg_dir, list) else dmg_dir ) grouped_logs[key].append(log) initial_hack_logs: List[Dict] = [] for k, log_list in grouped_logs.items(): if 2 <= len(log_list) <= 8: initial_hack_logs.extend(log_list) # 3) 스킬 로그 교차 검증(있다면 제외) verified_hack_logs: List[Dict] = [] if initial_hack_logs: try: skill_use_markers = find_skill_uses_for_validation(client, initial_hack_logs) except Exception as e: tqdm.write(f"\n - 스킬 검증 중 오류(배치 {i//chunk_size + 1}): {e}", file=sys.stderr) skill_use_markers = set() if not skill_use_markers: verified_hack_logs = initial_hack_logs else: verified_hack_logs = [ log for log in initial_hack_logs if (log.get('uid'), log.get('@timestamp')) not in skill_use_markers ] if verified_hack_logs: # 파일 저장은 하지 않고 결과만 배출 yield verified_hack_logs pbar.update(len(chunk)) finally: pass def _chunks(seq, n): for i in range(0, len(seq), n): yield seq[i:i+n] def find_skill_uses_for_validation( client, logs_to_check: Iterable[Dict], uid_chunk_size: int = 50, # 한 번에 처리할 uid 수 (권장 30~100) ts_chunk_size: int = 500, # 한 uid에서 한 번에 던질 timestamp 개수 ) -> Set[Tuple[str, str]]: """ 의심 로그들의 (uid, @timestamp) 존재 여부를 스킬 로그 인덱스에서 교차 검증. - 기존: (uid, timestamp) 페어를 OR should로 한 방에 → 비효율 - 개선: uid별로 timestamps를 terms로 묶어 msearch로 배치 조회 반환: {(uid, @timestamp), ...} # 실제 스킬 사용이 확인된 페어 집합 """ # 0) 입력 정리 (유효 페어 집합) validation_keys: Set[Tuple[str, str]] = set() for log in logs_to_check or []: uid = log.get('uid'); ts = log.get('@timestamp') if uid and ts: validation_keys.add((uid, ts)) if not validation_keys: return set() # 1) uid → [timestamps] 로 묶기 (중복 제거) ts_by_uid: Dict[str, List[str]] = defaultdict(list) for uid, ts in validation_keys: ts_by_uid[uid].append(ts) for uid in ts_by_uid: # 중복 제거 및 소팅(선택) ts_by_uid[uid] = sorted(set(ts_by_uid[uid])) def build_msearch_body(uids: List[str]) -> str: body_list = [] for uid in uids: ts_list = ts_by_uid[uid] # timestamp가 매우 많으면 ts_chunk_size로 나눠 여러 서브쿼리 생성 for ts_sub in _chunks(ts_list, ts_chunk_size): header = {"index": SKILL_USE_LOG_INDEX} q = { "size": len(ts_sub), # 우리가 보낸 timestamp 만큼만 받으면 충분 "track_total_hits": False, "_source": ["uid", "@timestamp"], # 페이로드 최소화 "query": { "bool": { "filter": [ {"term": {"uid.keyword": uid}}, {"terms": {"@timestamp": ts_sub}}, # IN (여러 timestamp) ] } } } body_list.append(header) body_list.append(q) return "\n".join(json.dumps(x, ensure_ascii=False) for x in body_list) + "\n" def backoff(): for i in range(4): # 1,2,4,8s yield 2 ** i skill_use_set: Set[Tuple[str, str]] = set() # 3) uid를 배치로 나눠 msearch 수행 all_uids = list(ts_by_uid.keys()) print(f" - {len(validation_keys)}개 페어({len(all_uids)}명)에 대해 스킬 사용 여부 교차 검증 중...") for uid_batch in _chunks(all_uids, uid_chunk_size): body_ndjson = build_msearch_body(uid_batch) resp = None last_err = None for delay in backoff(): try: resp = client.msearch(body=body_ndjson, request_timeout=REQUEST_TIMEOUT_SECONDS) last_err = None break except Exception as e: last_err = e print(f" - msearch 재시도 {delay}s 대기: {e}", file=sys.stderr) time.sleep(delay) if last_err is not None: print(f" - ❌ msearch 실패(해당 uid 배치 건너뜀): {last_err}", file=sys.stderr) continue # msearch 응답 처리 for sub in resp.get("responses", []): hits = sub.get("hits", {}).get("hits", []) for h in hits: src = h.get("_source") or {} uid = src.get("uid"); ts = src.get("@timestamp") if uid and ts: skill_use_set.add((uid, ts)) print(f" - ✅ 교차 검증 완료. {len(skill_use_set)}건의 스킬 사용 내역을 확인했습니다.") return skill_use_set def collapse_duplicated(data): # 그룹핑 key: uid + timestamp + xyz + bef_xyz def make_key(item): return ( item["uid"], item["@timestamp"], tuple(item["body"]["xyz"]), tuple(item["body"]["bef_xyz"]) ) grouped = defaultdict(list) for d in data: grouped[make_key(d)].append(d) collapsed = [] for _, items in grouped.items(): base = items[0].copy() base["duplicated"] = len(items) collapsed.append(base) return collapsed # ========================= # 2) 저장 전용 (분석 없음) # ========================= def save_verified_batches_to_csv( client, verified_batches_iter: Iterable[Tuple[List[Dict], List[str]]], output_filename: str, ): """ analyse_candidates_msearch(...) 의 결과 배치를 받아 - uid→nickname 조회 - CSV 행 기록 만 수행. """ nickname_map = {} headers = ['uid', 'nickname', 'type', 'count'] with open(output_filename, 'w', newline='', encoding='utf-8-sig') as csvfile: writer = csv.writer(csvfile) writer.writerow(headers) for verified_hack_logs in verified_batches_iter: summary_data = defaultdict(lambda: defaultdict(int)) collapsed = collapse_duplicated(verified_hack_logs) for log in collapsed: uid = log.get('uid') if uid: hack_type = log.get('duplicated') summary_data[uid][hack_type] += 1 uids_to_lookup = {log['uid'] for log in verified_hack_logs if 'uid' in log} try: nickname_map.update(fetch_nicknames(client, uids_to_lookup)) except Exception as e: tqdm.write(f"\n - 닉네임 조회 중 오류: {e}", file=sys.stderr) # uid를 기준으로 정렬하여 보기 좋게 출력 for uid, type_counts in sorted(summary_data.items()): nickname = nickname_map.get(uid, 'N/A') # type(연속 횟수)을 기준으로 정렬 for type, count in sorted(type_counts.items()): writer.writerow([uid, nickname, type, count]) print(f"사용자별 핵 사용 통계 저장 완료 '{output_filename}' ---") def main(): try: client = make_client() last_sort_values = load_checkpoint(CHECKPOINT_PATH) print(f"[{datetime.now()}] 시작. 마지막 체크포인트: {last_sort_values}") start_time = last_sort_values or "2025-08-01T00:00:00Z" candidates, end_time = find_candidate_groups_all(client, start_time) verified_iter = analyse_candidates_msearch(client, candidates, start_time, end_time) save_verified_batches_to_csv(client, verified_iter, OUTPUT_FILENAME) save_checkpoint(CHECKPOINT_PATH, end_time) except Exception as e: print(e) if __name__ == "__main__": main()