commit 6654bfe4a27f72075f316cb5be8e98678b5365a2 Author: Langley Date: Mon Aug 18 21:21:29 2025 +0900 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ec7f06b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/__pycache__/ +*.txt +*.csv \ No newline at end of file diff --git a/crawler.py b/crawler.py new file mode 100644 index 0000000..3d5a395 --- /dev/null +++ b/crawler.py @@ -0,0 +1,515 @@ +import json +import time +import sys +import csv +from tqdm import tqdm +from datetime import datetime, timezone +from collections import defaultdict +from typing import Dict, List, Optional, Tuple, Iterable, Set + +from opensearchpy import OpenSearch +from nickname import fetch_nicknames +from utils import load_checkpoint, save_checkpoint + +# ========================= +# 설정 +# ========================= +OPENSEARCH_HOSTS = [{"host": "ds-opensearch.oneunivrs.com", "port": 9200, "scheme": "https"}] +AUTH = ("admin", "DHp5#r#GYQ9d") + +ATTACK_LOG_INDEX = "ds-logs-live-player_attack" +SKILL_USE_LOG_INDEX = "ds-logs-live-skill_use" + +SORT_FIELD = "@timestamp" # 1차 정렬 필드(날짜형 권장) +PAGE_SIZE = 500 # 페이지 크기 +INTERVAL_SEC = 60 # 주기(초) +CHECKPOINT_PATH = "./checkpoint.txt" # 마지막 처리 지점을 저장 +OUTPUT_DIR = "./data" # 저장 디렉터리(ndjson) +REQUEST_TIMEOUT_SECONDS = 1800 + +TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S") +OUTPUT_FILENAME = f"ds-info-hacklogs_{TIMESTAMP}.csv" + +# 검색 기본 쿼리(필요 시 수정). 여기선 새 문서 전체를 시간 오름차순으로 순회. +BASE_QUERY = { + "bool": { + "must": [ + # 예) {"term": {"type": "log"}} + ], + "filter": [ + # 이후 search_after로 이어붙이므로 range는 필수 아님. + ] + } +} + + +# ========================= +# OpenSearch 클라이언트 +# ========================= +def make_client() -> OpenSearch: + return OpenSearch( + hosts=OPENSEARCH_HOSTS, + http_auth=AUTH, + use_ssl=OPENSEARCH_HOSTS[0].get("scheme", "http") == "https", + verify_certs=False, # 필요 시 True + ssl_show_warn=False, + timeout=60, + max_retries=3, + retry_on_timeout=True, + headers={"Connection": "close"} + ) + +# ========================= +# 필요한 로그를 주어진 시간범위안에서 모두 가져온다 +# ========================= +def find_candidate_groups_all( + client, + start_time, + end_time=None, + index: str = ATTACK_LOG_INDEX, + page_size: int = 1000, # 1000~2000 권장 + loop = 10, +) -> set[tuple[str, str]]: + """ + (target_id, ingame_srl) 후보 그룹을 전 범위에서 추출. + - start_time: ISO8601 문자열. 예) "2025-08-18T00:00:00Z" + - end_time: 지정하지 않으면(start_time~로그 끝) lte 조건 없이 진행 + - page_size: composite 페이지 크기 (권장 1000~2000) + + 반환: { (target_id, ingame_srl), ... } + """ + + print(f"후보 그룹을 추출합니다. 한번 실행에 최대 {page_size * loop} 개의 데이터를 조사합니다") + # 기본 range: gte만 필수, end_time 없으면 lte 생략 + ts_range = {"gte": start_time, "format": "strict_date_optional_time"} + if end_time: + ts_range["lte"] = end_time + + # 공통 필터(필요 시 수정) + filters = [ + {"range": {"@timestamp": ts_range}}, + {"terms": {"body.inter_type": [0, 2]}}, # 원 코드 로직 유지 + ] + + body = { + "size": 0, + "query": {"bool": {"filter": filters}}, + + "aggs": { + "candidate_groups": { + "composite": { + "size": page_size, + "sources": [ + {"target_id": {"terms": {"field": "body.target_id.keyword"}}}, + {"ingame_srl": {"terms": {"field": "body.ingame_srl.keyword"}}} + ] + }, + # 버킷 최소 문서 수 필터(예: 동일 키 2건 이상만) + "aggs": { + # 각 그룹 안에서 @timestamp의 최댓값(마지막 시간)을 구함 + "last_ts": {"max": {"field": "@timestamp"}}, + "min_doc_count_filter": { + "bucket_selector": { + "buckets_path": {"doc_count": "_count"}, + "script": "params.doc_count >= 2" + } + } + } + } + } + } + + after = None + out: set[tuple[str, str]] = set() + + def lastts_from_agg(agg_obj: dict) -> Optional[str]: + """ + max 집계 결과에서 ISO8601 문자열을 뽑아낸다. + date 매핑이면 value_as_string이 오고, 아니면 epoch_millis가 value로 온다. + """ + if not agg_obj: + return None + # 선호: value_as_string (매핑이 date일 때 제공) + vas = agg_obj.get("value_as_string") + if vas: + vas = vas.strip() + if vas.endswith('Z'): + vas = vas[:-1] + return datetime.fromisoformat(vas) + # 폴백: value(에포크 밀리초) → ISO + v = agg_obj.get("value") + if v is None: + return None + try: + # OpenSearch는 epoch_millis를 float로 주기도 함 + return datetime.fromtimestamp(float(v) / 1000.0, tz=timezone.utc) + except Exception: + return None + + max_last_ts = datetime.fromtimestamp(0) + for _ in range(loop): + if after: + print("query after : " , after) + body["aggs"]["candidate_groups"]["composite"]["after"] = after + + # composite agg는 PIT 미지원. 일반 search로 수행 + resp = client.search(index=index, body=body, request_timeout=120) + + agg = resp.get("aggregations", {}).get("candidate_groups", {}) + buckets = agg.get("buckets", []) + if not buckets: + break + + for b in buckets: + k = b.get("key", {}) + tgt = k.get("target_id") + srl = k.get("ingame_srl") + last_ts = lastts_from_agg(b.get("last_ts", {})) + max_last_ts = last_ts if last_ts > max_last_ts else max_last_ts + if tgt is not None and srl is not None: + out.add((tgt, srl)) + + after = agg.get("after_key") + if not after: + break + + end_time_str = max_last_ts.isoformat().replace("+00:00", "Z") + if not end_time_str.endswith("Z"): + end_time_str += "Z" + + return out, end_time_str + +def _backoff_delays(): + """1, 2, 4, 8초 대기.""" + for i in range(4): + yield 2 ** i + + +def _build_msearch_body( + pairs: List[Tuple[str, str]], + start_time: str, + end_time: Optional[str], + source_fields: List[str], + detail_size: int, +) -> str: + """msearch NDJSON 바디 생성.""" + ts_range = {"gte": start_time, "format": "strict_date_optional_time"} + if end_time: + ts_range["lte"] = end_time + + body_list: List[Dict] = [] + for (target_id, ingame_srl) in pairs: + header = {"index": ATTACK_LOG_INDEX} + query = { + "size": detail_size, + "track_total_hits": False, + "_source": source_fields, + "query": { + "bool": { + "filter": [ + {"term": {"body.target_id.keyword": target_id}}, + {"term": {"body.ingame_srl.keyword": ingame_srl}}, + {"term": {"body.skill_type": 1}}, + {"range": {"@timestamp": ts_range}}, + ] + } + } + } + + body_list.append(header) + body_list.append(query) + + return "\n".join(json.dumps(x, ensure_ascii=False) for x in body_list) + "\n" + + +# ========================= +# 1) 분석 전용 (파일 I/O 없음) +# ========================= +def analyse_candidates_msearch( + client, + candidate_keys: Iterable[Tuple[str, str]], + start_time: str, + end_time: Optional[str] = None, + chunk_size: int = 50, # msearch 묶음 크기(권장 50~100) + detail_size: int = 1000, # 각 소쿼리 size + source_fields: Optional[List[str]] = None, +): + """ + 후보 (target_id, ingame_srl) 키들을 msearch로 조회해 + - 그룹핑(동일 key) + - 빈도 필터(2~8) + - 스킬 로그 교차검증(제외) + 을 거친 'verified_hack_logs'를 배치 단위로 yield. + + yield: (verified_hack_logs: List[Dict], headers: List[str]) + """ + if source_fields is None: + source_fields = [ + "uid", "@timestamp", + "body.ingame_srl", "body.target_id", + "body.bef_xyz", "body.xyz", + "body.damage_part", "body.damage_direction", + "body.inter_type", "body.skill_type", + ] + + candidate_list = list(candidate_keys) + if not candidate_list: + print("\n[3단계] 핵 의심 후보 그룹이 없어 추가 작업을 진행하지 않습니다.") + return + + chunk_size = max(10, min(100, chunk_size)) + print(f"\n[3단계] 총 {len(candidate_list)}개의 후보 그룹을 {chunk_size}개씩 나누어 분석을 시작합니다...") + + try: + with tqdm(total=len(candidate_list), desc=" - 후보군 처리 진행률", unit=" groups") as pbar: + for i in range(0, len(candidate_list), chunk_size): + chunk = candidate_list[i:i + chunk_size] + + # 1) msearch 조회 + m_body = _build_msearch_body( + pairs=chunk, + start_time=start_time, + end_time=end_time, + source_fields=source_fields, + detail_size=detail_size, + ) + + resp = None + last_err = None + for delay in _backoff_delays(): + try: + resp = client.msearch(body=m_body, request_timeout=REQUEST_TIMEOUT_SECONDS) + last_err = None + break + except Exception as e: + last_err = e + tqdm.write(f"\n - msearch 실패, {delay}s 후 재시도: {e}", file=sys.stderr) + time.sleep(delay) + if last_err is not None: + tqdm.write(f"\n - ❌ 상세 로그 추출 중 오류 발생 (배치 {i//chunk_size + 1}): {last_err}", file=sys.stderr) + pbar.update(len(chunk)) + continue + + responses = resp.get("responses", []) + batch_logs: List[Dict] = [] + for sub in responses: + hits = sub.get("hits", {}).get("hits", []) + if hits: + batch_logs.extend(h["_source"] for h in hits) + + # 2) 그룹핑 → 빈도 필터(2~8) + grouped_logs: Dict[Tuple, List[Dict]] = defaultdict(list) + for log in batch_logs: + body = log.get('body', {}) + bef_xyz = body.get('bef_xyz') + xyz = body.get('xyz') + dmg_dir = body.get('damage_direction') + key = ( + log.get('@timestamp'), + body.get('target_id'), + body.get('ingame_srl'), + tuple(bef_xyz) if isinstance(bef_xyz, list) else bef_xyz, + tuple(xyz) if isinstance(xyz, list) else xyz, + body.get('damage_part'), + tuple(dmg_dir) if isinstance(dmg_dir, list) else dmg_dir + ) + grouped_logs[key].append(log) + + initial_hack_logs: List[Dict] = [] + for k, log_list in grouped_logs.items(): + if 2 <= len(log_list) <= 8: + initial_hack_logs.extend(log_list) + + # 3) 스킬 로그 교차 검증(있다면 제외) + verified_hack_logs: List[Dict] = [] + if initial_hack_logs: + try: + skill_use_markers = find_skill_uses_for_validation(client, initial_hack_logs) + except Exception as e: + tqdm.write(f"\n - 스킬 검증 중 오류(배치 {i//chunk_size + 1}): {e}", file=sys.stderr) + skill_use_markers = set() + + if not skill_use_markers: + verified_hack_logs = initial_hack_logs + else: + verified_hack_logs = [ + log for log in initial_hack_logs + if (log.get('uid'), log.get('@timestamp')) not in skill_use_markers + ] + + if verified_hack_logs: + # 파일 저장은 하지 않고 결과만 배출 + yield verified_hack_logs + + pbar.update(len(chunk)) + finally: + pass + + +def _chunks(seq, n): + for i in range(0, len(seq), n): + yield seq[i:i+n] + +def find_skill_uses_for_validation( + client, + logs_to_check: Iterable[Dict], + uid_chunk_size: int = 50, # 한 번에 처리할 uid 수 (권장 30~100) + ts_chunk_size: int = 500, # 한 uid에서 한 번에 던질 timestamp 개수 +) -> Set[Tuple[str, str]]: + """ + 의심 로그들의 (uid, @timestamp) 존재 여부를 스킬 로그 인덱스에서 교차 검증. + - 기존: (uid, timestamp) 페어를 OR should로 한 방에 → 비효율 + - 개선: uid별로 timestamps를 terms로 묶어 msearch로 배치 조회 + 반환: {(uid, @timestamp), ...} # 실제 스킬 사용이 확인된 페어 집합 + """ + # 0) 입력 정리 (유효 페어 집합) + validation_keys: Set[Tuple[str, str]] = set() + for log in logs_to_check or []: + uid = log.get('uid'); ts = log.get('@timestamp') + if uid and ts: + validation_keys.add((uid, ts)) + if not validation_keys: + return set() + + # 1) uid → [timestamps] 로 묶기 (중복 제거) + ts_by_uid: Dict[str, List[str]] = defaultdict(list) + for uid, ts in validation_keys: + ts_by_uid[uid].append(ts) + for uid in ts_by_uid: + # 중복 제거 및 소팅(선택) + ts_by_uid[uid] = sorted(set(ts_by_uid[uid])) + + def build_msearch_body(uids: List[str]) -> str: + body_list = [] + for uid in uids: + ts_list = ts_by_uid[uid] + # timestamp가 매우 많으면 ts_chunk_size로 나눠 여러 서브쿼리 생성 + for ts_sub in _chunks(ts_list, ts_chunk_size): + header = {"index": SKILL_USE_LOG_INDEX} + q = { + "size": len(ts_sub), # 우리가 보낸 timestamp 만큼만 받으면 충분 + "track_total_hits": False, + "_source": ["uid", "@timestamp"], # 페이로드 최소화 + "query": { + "bool": { + "filter": [ + {"term": {"uid.keyword": uid}}, + {"terms": {"@timestamp": ts_sub}}, # IN (여러 timestamp) + ] + } + } + } + body_list.append(header) + body_list.append(q) + return "\n".join(json.dumps(x, ensure_ascii=False) for x in body_list) + "\n" + + def backoff(): + for i in range(4): # 1,2,4,8s + yield 2 ** i + + skill_use_set: Set[Tuple[str, str]] = set() + + # 3) uid를 배치로 나눠 msearch 수행 + all_uids = list(ts_by_uid.keys()) + print(f" - {len(validation_keys)}개 페어({len(all_uids)}명)에 대해 스킬 사용 여부 교차 검증 중...") + + for uid_batch in _chunks(all_uids, uid_chunk_size): + body_ndjson = build_msearch_body(uid_batch) + + resp = None + last_err = None + for delay in backoff(): + try: + resp = client.msearch(body=body_ndjson, request_timeout=REQUEST_TIMEOUT_SECONDS) + last_err = None + break + except Exception as e: + last_err = e + print(f" - msearch 재시도 {delay}s 대기: {e}", file=sys.stderr) + time.sleep(delay) + if last_err is not None: + print(f" - ❌ msearch 실패(해당 uid 배치 건너뜀): {last_err}", file=sys.stderr) + continue + + # msearch 응답 처리 + for sub in resp.get("responses", []): + hits = sub.get("hits", {}).get("hits", []) + for h in hits: + src = h.get("_source") or {} + uid = src.get("uid"); ts = src.get("@timestamp") + if uid and ts: + skill_use_set.add((uid, ts)) + + print(f" - ✅ 교차 검증 완료. {len(skill_use_set)}건의 스킬 사용 내역을 확인했습니다.") + return skill_use_set + + +# ========================= +# 2) 저장 전용 (분석 없음) +# ========================= +def save_verified_batches_to_csv( + client, + verified_batches_iter: Iterable[Tuple[List[Dict], List[str]]], + output_filename: str, +): + """ + analyse_candidates_msearch(...) 의 결과 배치를 받아 + - uid→nickname 조회 + - CSV 행 기록 + 만 수행. + """ + + nickname_map = {} + headers = ['uid', 'nickname', 'type', 'count'] + + with open(output_filename, 'w', newline='', encoding='utf-8-sig') as csvfile: + writer = csv.writer(csvfile) + writer.writerow(headers) + + for verified_hack_logs in verified_batches_iter: + summary_data = defaultdict(lambda: defaultdict(int)) + + for log in verified_hack_logs: + uid = log.get('uid') + if uid: + hack_type = len(log) + summary_data[uid][hack_type] += 1 + + uids_to_lookup = {log['uid'] for log in verified_hack_logs if 'uid' in log} + try: + nickname_map.update(fetch_nicknames(client, uids_to_lookup)) + except Exception as e: + tqdm.write(f"\n - 닉네임 조회 중 오류: {e}", file=sys.stderr) + + + # uid를 기준으로 정렬하여 보기 좋게 출력 + for uid, type_counts in sorted(summary_data.items()): + nickname = nickname_map.get(uid, 'N/A') + # type(연속 횟수)을 기준으로 정렬 + for type, count in sorted(type_counts.items()): + writer.writerow([uid, nickname, type, count]) + + print(f"사용자별 핵 사용 통계 저장 완료 '{output_filename}' ---") + + +def main(): + try: + client = make_client() + last_sort_values = load_checkpoint(CHECKPOINT_PATH) + + print(f"[{datetime.now()}] 시작. 마지막 체크포인트: {last_sort_values}") + + start_time = last_sort_values or "2025-08-01T00:00:00Z" + + candidates, end_time = find_candidate_groups_all(client, start_time) + verified_iter = analyse_candidates_msearch(client, candidates, start_time, end_time) + save_verified_batches_to_csv(client, verified_iter, OUTPUT_FILENAME) + + save_checkpoint(CHECKPOINT_PATH, end_time) + + except Exception as e: + print(e) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/nickname.py b/nickname.py new file mode 100644 index 0000000..df78726 --- /dev/null +++ b/nickname.py @@ -0,0 +1,29 @@ +import tqdm +import sys + +NICKNAME_LOG_INDEX = "ds-logs-live-log_return_to_lobby" +REQUEST_TIMEOUT_SECONDS = 180 + +def fetch_nicknames(client, uids): + if not uids: return {} + query = { + "size": 0, "query": {"terms": {"uid.keyword": list(uids)}}, + "aggs": {"users": {"terms": {"field": "uid.keyword", "size": len(uids)}, + "aggs": {"latest_nickname": {"top_hits": { + "sort": [{"@timestamp": {"order": "desc"}}], "_source": {"includes": ["body.nickname"]}, "size": 1 + }}} + }} + } + nickname_map = {} + try: + response = client.search(index=NICKNAME_LOG_INDEX, body=query, request_timeout=REQUEST_TIMEOUT_SECONDS) + buckets = response.get("aggregations", {}).get("users", {}).get("buckets", []) + for bucket in buckets: + uid = bucket.get("key") + latest_hit = bucket.get("latest_nickname", {}).get("hits", {}).get("hits", [{}])[0] + nickname = latest_hit.get("_source", {}).get("body", {}).get("nickname", "N/A") + if uid: nickname_map[uid] = nickname + return nickname_map + except Exception as e: + tqdm.write(f" - ❌ 닉네임 조회 중 오류: {e}", file=sys.stderr) + return {} \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..30dc6e7 --- /dev/null +++ b/utils.py @@ -0,0 +1,17 @@ +import os +from typing import Any, List, Optional + +# ========================= +# 유틸: 체크포인트 저장/로드 +# ========================= +def load_checkpoint(path: str) -> Optional[List[Any]]: + """search_after에 전달할 마지막 sort 값 배열([timestamp, _id]) 로드.""" + if not os.path.exists(path): + return None + with open(path, "r", encoding="utf-8") as f: + return f.readline() + +def save_checkpoint(path: str, lastTime: str) -> None: + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(lastTime)