515 lines
19 KiB
Python
515 lines
19 KiB
Python
|
|
import json
|
||
|
|
import time
|
||
|
|
import sys
|
||
|
|
import csv
|
||
|
|
from tqdm import tqdm
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from collections import defaultdict
|
||
|
|
from typing import Dict, List, Optional, Tuple, Iterable, Set
|
||
|
|
|
||
|
|
from opensearchpy import OpenSearch
|
||
|
|
from nickname import fetch_nicknames
|
||
|
|
from utils import load_checkpoint, save_checkpoint
|
||
|
|
|
||
|
|
# =========================
|
||
|
|
# 설정
|
||
|
|
# =========================
|
||
|
|
OPENSEARCH_HOSTS = [{"host": "ds-opensearch.oneunivrs.com", "port": 9200, "scheme": "https"}]
|
||
|
|
AUTH = ("admin", "DHp5#r#GYQ9d")
|
||
|
|
|
||
|
|
ATTACK_LOG_INDEX = "ds-logs-live-player_attack"
|
||
|
|
SKILL_USE_LOG_INDEX = "ds-logs-live-skill_use"
|
||
|
|
|
||
|
|
SORT_FIELD = "@timestamp" # 1차 정렬 필드(날짜형 권장)
|
||
|
|
PAGE_SIZE = 500 # 페이지 크기
|
||
|
|
INTERVAL_SEC = 60 # 주기(초)
|
||
|
|
CHECKPOINT_PATH = "./checkpoint.txt" # 마지막 처리 지점을 저장
|
||
|
|
OUTPUT_DIR = "./data" # 저장 디렉터리(ndjson)
|
||
|
|
REQUEST_TIMEOUT_SECONDS = 1800
|
||
|
|
|
||
|
|
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
|
OUTPUT_FILENAME = f"ds-info-hacklogs_{TIMESTAMP}.csv"
|
||
|
|
|
||
|
|
# 검색 기본 쿼리(필요 시 수정). 여기선 새 문서 전체를 시간 오름차순으로 순회.
|
||
|
|
BASE_QUERY = {
|
||
|
|
"bool": {
|
||
|
|
"must": [
|
||
|
|
# 예) {"term": {"type": "log"}}
|
||
|
|
],
|
||
|
|
"filter": [
|
||
|
|
# 이후 search_after로 이어붙이므로 range는 필수 아님.
|
||
|
|
]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# =========================
|
||
|
|
# OpenSearch 클라이언트
|
||
|
|
# =========================
|
||
|
|
def make_client() -> OpenSearch:
|
||
|
|
return OpenSearch(
|
||
|
|
hosts=OPENSEARCH_HOSTS,
|
||
|
|
http_auth=AUTH,
|
||
|
|
use_ssl=OPENSEARCH_HOSTS[0].get("scheme", "http") == "https",
|
||
|
|
verify_certs=False, # 필요 시 True
|
||
|
|
ssl_show_warn=False,
|
||
|
|
timeout=60,
|
||
|
|
max_retries=3,
|
||
|
|
retry_on_timeout=True,
|
||
|
|
headers={"Connection": "close"}
|
||
|
|
)
|
||
|
|
|
||
|
|
# =========================
|
||
|
|
# 필요한 로그를 주어진 시간범위안에서 모두 가져온다
|
||
|
|
# =========================
|
||
|
|
def find_candidate_groups_all(
|
||
|
|
client,
|
||
|
|
start_time,
|
||
|
|
end_time=None,
|
||
|
|
index: str = ATTACK_LOG_INDEX,
|
||
|
|
page_size: int = 1000, # 1000~2000 권장
|
||
|
|
loop = 10,
|
||
|
|
) -> set[tuple[str, str]]:
|
||
|
|
"""
|
||
|
|
(target_id, ingame_srl) 후보 그룹을 전 범위에서 추출.
|
||
|
|
- start_time: ISO8601 문자열. 예) "2025-08-18T00:00:00Z"
|
||
|
|
- end_time: 지정하지 않으면(start_time~로그 끝) lte 조건 없이 진행
|
||
|
|
- page_size: composite 페이지 크기 (권장 1000~2000)
|
||
|
|
|
||
|
|
반환: { (target_id, ingame_srl), ... }
|
||
|
|
"""
|
||
|
|
|
||
|
|
print(f"후보 그룹을 추출합니다. 한번 실행에 최대 {page_size * loop} 개의 데이터를 조사합니다")
|
||
|
|
# 기본 range: gte만 필수, end_time 없으면 lte 생략
|
||
|
|
ts_range = {"gte": start_time, "format": "strict_date_optional_time"}
|
||
|
|
if end_time:
|
||
|
|
ts_range["lte"] = end_time
|
||
|
|
|
||
|
|
# 공통 필터(필요 시 수정)
|
||
|
|
filters = [
|
||
|
|
{"range": {"@timestamp": ts_range}},
|
||
|
|
{"terms": {"body.inter_type": [0, 2]}}, # 원 코드 로직 유지
|
||
|
|
]
|
||
|
|
|
||
|
|
body = {
|
||
|
|
"size": 0,
|
||
|
|
"query": {"bool": {"filter": filters}},
|
||
|
|
|
||
|
|
"aggs": {
|
||
|
|
"candidate_groups": {
|
||
|
|
"composite": {
|
||
|
|
"size": page_size,
|
||
|
|
"sources": [
|
||
|
|
{"target_id": {"terms": {"field": "body.target_id.keyword"}}},
|
||
|
|
{"ingame_srl": {"terms": {"field": "body.ingame_srl.keyword"}}}
|
||
|
|
]
|
||
|
|
},
|
||
|
|
# 버킷 최소 문서 수 필터(예: 동일 키 2건 이상만)
|
||
|
|
"aggs": {
|
||
|
|
# 각 그룹 안에서 @timestamp의 최댓값(마지막 시간)을 구함
|
||
|
|
"last_ts": {"max": {"field": "@timestamp"}},
|
||
|
|
"min_doc_count_filter": {
|
||
|
|
"bucket_selector": {
|
||
|
|
"buckets_path": {"doc_count": "_count"},
|
||
|
|
"script": "params.doc_count >= 2"
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
after = None
|
||
|
|
out: set[tuple[str, str]] = set()
|
||
|
|
|
||
|
|
def lastts_from_agg(agg_obj: dict) -> Optional[str]:
|
||
|
|
"""
|
||
|
|
max 집계 결과에서 ISO8601 문자열을 뽑아낸다.
|
||
|
|
date 매핑이면 value_as_string이 오고, 아니면 epoch_millis가 value로 온다.
|
||
|
|
"""
|
||
|
|
if not agg_obj:
|
||
|
|
return None
|
||
|
|
# 선호: value_as_string (매핑이 date일 때 제공)
|
||
|
|
vas = agg_obj.get("value_as_string")
|
||
|
|
if vas:
|
||
|
|
vas = vas.strip()
|
||
|
|
if vas.endswith('Z'):
|
||
|
|
vas = vas[:-1]
|
||
|
|
return datetime.fromisoformat(vas)
|
||
|
|
# 폴백: value(에포크 밀리초) → ISO
|
||
|
|
v = agg_obj.get("value")
|
||
|
|
if v is None:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
# OpenSearch는 epoch_millis를 float로 주기도 함
|
||
|
|
return datetime.fromtimestamp(float(v) / 1000.0, tz=timezone.utc)
|
||
|
|
except Exception:
|
||
|
|
return None
|
||
|
|
|
||
|
|
max_last_ts = datetime.fromtimestamp(0)
|
||
|
|
for _ in range(loop):
|
||
|
|
if after:
|
||
|
|
print("query after : " , after)
|
||
|
|
body["aggs"]["candidate_groups"]["composite"]["after"] = after
|
||
|
|
|
||
|
|
# composite agg는 PIT 미지원. 일반 search로 수행
|
||
|
|
resp = client.search(index=index, body=body, request_timeout=120)
|
||
|
|
|
||
|
|
agg = resp.get("aggregations", {}).get("candidate_groups", {})
|
||
|
|
buckets = agg.get("buckets", [])
|
||
|
|
if not buckets:
|
||
|
|
break
|
||
|
|
|
||
|
|
for b in buckets:
|
||
|
|
k = b.get("key", {})
|
||
|
|
tgt = k.get("target_id")
|
||
|
|
srl = k.get("ingame_srl")
|
||
|
|
last_ts = lastts_from_agg(b.get("last_ts", {}))
|
||
|
|
max_last_ts = last_ts if last_ts > max_last_ts else max_last_ts
|
||
|
|
if tgt is not None and srl is not None:
|
||
|
|
out.add((tgt, srl))
|
||
|
|
|
||
|
|
after = agg.get("after_key")
|
||
|
|
if not after:
|
||
|
|
break
|
||
|
|
|
||
|
|
end_time_str = max_last_ts.isoformat().replace("+00:00", "Z")
|
||
|
|
if not end_time_str.endswith("Z"):
|
||
|
|
end_time_str += "Z"
|
||
|
|
|
||
|
|
return out, end_time_str
|
||
|
|
|
||
|
|
def _backoff_delays():
|
||
|
|
"""1, 2, 4, 8초 대기."""
|
||
|
|
for i in range(4):
|
||
|
|
yield 2 ** i
|
||
|
|
|
||
|
|
|
||
|
|
def _build_msearch_body(
|
||
|
|
pairs: List[Tuple[str, str]],
|
||
|
|
start_time: str,
|
||
|
|
end_time: Optional[str],
|
||
|
|
source_fields: List[str],
|
||
|
|
detail_size: int,
|
||
|
|
) -> str:
|
||
|
|
"""msearch NDJSON 바디 생성."""
|
||
|
|
ts_range = {"gte": start_time, "format": "strict_date_optional_time"}
|
||
|
|
if end_time:
|
||
|
|
ts_range["lte"] = end_time
|
||
|
|
|
||
|
|
body_list: List[Dict] = []
|
||
|
|
for (target_id, ingame_srl) in pairs:
|
||
|
|
header = {"index": ATTACK_LOG_INDEX}
|
||
|
|
query = {
|
||
|
|
"size": detail_size,
|
||
|
|
"track_total_hits": False,
|
||
|
|
"_source": source_fields,
|
||
|
|
"query": {
|
||
|
|
"bool": {
|
||
|
|
"filter": [
|
||
|
|
{"term": {"body.target_id.keyword": target_id}},
|
||
|
|
{"term": {"body.ingame_srl.keyword": ingame_srl}},
|
||
|
|
{"term": {"body.skill_type": 1}},
|
||
|
|
{"range": {"@timestamp": ts_range}},
|
||
|
|
]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
body_list.append(header)
|
||
|
|
body_list.append(query)
|
||
|
|
|
||
|
|
return "\n".join(json.dumps(x, ensure_ascii=False) for x in body_list) + "\n"
|
||
|
|
|
||
|
|
|
||
|
|
# =========================
|
||
|
|
# 1) 분석 전용 (파일 I/O 없음)
|
||
|
|
# =========================
|
||
|
|
def analyse_candidates_msearch(
|
||
|
|
client,
|
||
|
|
candidate_keys: Iterable[Tuple[str, str]],
|
||
|
|
start_time: str,
|
||
|
|
end_time: Optional[str] = None,
|
||
|
|
chunk_size: int = 50, # msearch 묶음 크기(권장 50~100)
|
||
|
|
detail_size: int = 1000, # 각 소쿼리 size
|
||
|
|
source_fields: Optional[List[str]] = None,
|
||
|
|
):
|
||
|
|
"""
|
||
|
|
후보 (target_id, ingame_srl) 키들을 msearch로 조회해
|
||
|
|
- 그룹핑(동일 key)
|
||
|
|
- 빈도 필터(2~8)
|
||
|
|
- 스킬 로그 교차검증(제외)
|
||
|
|
을 거친 'verified_hack_logs'를 배치 단위로 yield.
|
||
|
|
|
||
|
|
yield: (verified_hack_logs: List[Dict], headers: List[str])
|
||
|
|
"""
|
||
|
|
if source_fields is None:
|
||
|
|
source_fields = [
|
||
|
|
"uid", "@timestamp",
|
||
|
|
"body.ingame_srl", "body.target_id",
|
||
|
|
"body.bef_xyz", "body.xyz",
|
||
|
|
"body.damage_part", "body.damage_direction",
|
||
|
|
"body.inter_type", "body.skill_type",
|
||
|
|
]
|
||
|
|
|
||
|
|
candidate_list = list(candidate_keys)
|
||
|
|
if not candidate_list:
|
||
|
|
print("\n[3단계] 핵 의심 후보 그룹이 없어 추가 작업을 진행하지 않습니다.")
|
||
|
|
return
|
||
|
|
|
||
|
|
chunk_size = max(10, min(100, chunk_size))
|
||
|
|
print(f"\n[3단계] 총 {len(candidate_list)}개의 후보 그룹을 {chunk_size}개씩 나누어 분석을 시작합니다...")
|
||
|
|
|
||
|
|
try:
|
||
|
|
with tqdm(total=len(candidate_list), desc=" - 후보군 처리 진행률", unit=" groups") as pbar:
|
||
|
|
for i in range(0, len(candidate_list), chunk_size):
|
||
|
|
chunk = candidate_list[i:i + chunk_size]
|
||
|
|
|
||
|
|
# 1) msearch 조회
|
||
|
|
m_body = _build_msearch_body(
|
||
|
|
pairs=chunk,
|
||
|
|
start_time=start_time,
|
||
|
|
end_time=end_time,
|
||
|
|
source_fields=source_fields,
|
||
|
|
detail_size=detail_size,
|
||
|
|
)
|
||
|
|
|
||
|
|
resp = None
|
||
|
|
last_err = None
|
||
|
|
for delay in _backoff_delays():
|
||
|
|
try:
|
||
|
|
resp = client.msearch(body=m_body, request_timeout=REQUEST_TIMEOUT_SECONDS)
|
||
|
|
last_err = None
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
last_err = e
|
||
|
|
tqdm.write(f"\n - msearch 실패, {delay}s 후 재시도: {e}", file=sys.stderr)
|
||
|
|
time.sleep(delay)
|
||
|
|
if last_err is not None:
|
||
|
|
tqdm.write(f"\n - ❌ 상세 로그 추출 중 오류 발생 (배치 {i//chunk_size + 1}): {last_err}", file=sys.stderr)
|
||
|
|
pbar.update(len(chunk))
|
||
|
|
continue
|
||
|
|
|
||
|
|
responses = resp.get("responses", [])
|
||
|
|
batch_logs: List[Dict] = []
|
||
|
|
for sub in responses:
|
||
|
|
hits = sub.get("hits", {}).get("hits", [])
|
||
|
|
if hits:
|
||
|
|
batch_logs.extend(h["_source"] for h in hits)
|
||
|
|
|
||
|
|
# 2) 그룹핑 → 빈도 필터(2~8)
|
||
|
|
grouped_logs: Dict[Tuple, List[Dict]] = defaultdict(list)
|
||
|
|
for log in batch_logs:
|
||
|
|
body = log.get('body', {})
|
||
|
|
bef_xyz = body.get('bef_xyz')
|
||
|
|
xyz = body.get('xyz')
|
||
|
|
dmg_dir = body.get('damage_direction')
|
||
|
|
key = (
|
||
|
|
log.get('@timestamp'),
|
||
|
|
body.get('target_id'),
|
||
|
|
body.get('ingame_srl'),
|
||
|
|
tuple(bef_xyz) if isinstance(bef_xyz, list) else bef_xyz,
|
||
|
|
tuple(xyz) if isinstance(xyz, list) else xyz,
|
||
|
|
body.get('damage_part'),
|
||
|
|
tuple(dmg_dir) if isinstance(dmg_dir, list) else dmg_dir
|
||
|
|
)
|
||
|
|
grouped_logs[key].append(log)
|
||
|
|
|
||
|
|
initial_hack_logs: List[Dict] = []
|
||
|
|
for k, log_list in grouped_logs.items():
|
||
|
|
if 2 <= len(log_list) <= 8:
|
||
|
|
initial_hack_logs.extend(log_list)
|
||
|
|
|
||
|
|
# 3) 스킬 로그 교차 검증(있다면 제외)
|
||
|
|
verified_hack_logs: List[Dict] = []
|
||
|
|
if initial_hack_logs:
|
||
|
|
try:
|
||
|
|
skill_use_markers = find_skill_uses_for_validation(client, initial_hack_logs)
|
||
|
|
except Exception as e:
|
||
|
|
tqdm.write(f"\n - 스킬 검증 중 오류(배치 {i//chunk_size + 1}): {e}", file=sys.stderr)
|
||
|
|
skill_use_markers = set()
|
||
|
|
|
||
|
|
if not skill_use_markers:
|
||
|
|
verified_hack_logs = initial_hack_logs
|
||
|
|
else:
|
||
|
|
verified_hack_logs = [
|
||
|
|
log for log in initial_hack_logs
|
||
|
|
if (log.get('uid'), log.get('@timestamp')) not in skill_use_markers
|
||
|
|
]
|
||
|
|
|
||
|
|
if verified_hack_logs:
|
||
|
|
# 파일 저장은 하지 않고 결과만 배출
|
||
|
|
yield verified_hack_logs
|
||
|
|
|
||
|
|
pbar.update(len(chunk))
|
||
|
|
finally:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
def _chunks(seq, n):
|
||
|
|
for i in range(0, len(seq), n):
|
||
|
|
yield seq[i:i+n]
|
||
|
|
|
||
|
|
def find_skill_uses_for_validation(
|
||
|
|
client,
|
||
|
|
logs_to_check: Iterable[Dict],
|
||
|
|
uid_chunk_size: int = 50, # 한 번에 처리할 uid 수 (권장 30~100)
|
||
|
|
ts_chunk_size: int = 500, # 한 uid에서 한 번에 던질 timestamp 개수
|
||
|
|
) -> Set[Tuple[str, str]]:
|
||
|
|
"""
|
||
|
|
의심 로그들의 (uid, @timestamp) 존재 여부를 스킬 로그 인덱스에서 교차 검증.
|
||
|
|
- 기존: (uid, timestamp) 페어를 OR should로 한 방에 → 비효율
|
||
|
|
- 개선: uid별로 timestamps를 terms로 묶어 msearch로 배치 조회
|
||
|
|
반환: {(uid, @timestamp), ...} # 실제 스킬 사용이 확인된 페어 집합
|
||
|
|
"""
|
||
|
|
# 0) 입력 정리 (유효 페어 집합)
|
||
|
|
validation_keys: Set[Tuple[str, str]] = set()
|
||
|
|
for log in logs_to_check or []:
|
||
|
|
uid = log.get('uid'); ts = log.get('@timestamp')
|
||
|
|
if uid and ts:
|
||
|
|
validation_keys.add((uid, ts))
|
||
|
|
if not validation_keys:
|
||
|
|
return set()
|
||
|
|
|
||
|
|
# 1) uid → [timestamps] 로 묶기 (중복 제거)
|
||
|
|
ts_by_uid: Dict[str, List[str]] = defaultdict(list)
|
||
|
|
for uid, ts in validation_keys:
|
||
|
|
ts_by_uid[uid].append(ts)
|
||
|
|
for uid in ts_by_uid:
|
||
|
|
# 중복 제거 및 소팅(선택)
|
||
|
|
ts_by_uid[uid] = sorted(set(ts_by_uid[uid]))
|
||
|
|
|
||
|
|
def build_msearch_body(uids: List[str]) -> str:
|
||
|
|
body_list = []
|
||
|
|
for uid in uids:
|
||
|
|
ts_list = ts_by_uid[uid]
|
||
|
|
# timestamp가 매우 많으면 ts_chunk_size로 나눠 여러 서브쿼리 생성
|
||
|
|
for ts_sub in _chunks(ts_list, ts_chunk_size):
|
||
|
|
header = {"index": SKILL_USE_LOG_INDEX}
|
||
|
|
q = {
|
||
|
|
"size": len(ts_sub), # 우리가 보낸 timestamp 만큼만 받으면 충분
|
||
|
|
"track_total_hits": False,
|
||
|
|
"_source": ["uid", "@timestamp"], # 페이로드 최소화
|
||
|
|
"query": {
|
||
|
|
"bool": {
|
||
|
|
"filter": [
|
||
|
|
{"term": {"uid.keyword": uid}},
|
||
|
|
{"terms": {"@timestamp": ts_sub}}, # IN (여러 timestamp)
|
||
|
|
]
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
body_list.append(header)
|
||
|
|
body_list.append(q)
|
||
|
|
return "\n".join(json.dumps(x, ensure_ascii=False) for x in body_list) + "\n"
|
||
|
|
|
||
|
|
def backoff():
|
||
|
|
for i in range(4): # 1,2,4,8s
|
||
|
|
yield 2 ** i
|
||
|
|
|
||
|
|
skill_use_set: Set[Tuple[str, str]] = set()
|
||
|
|
|
||
|
|
# 3) uid를 배치로 나눠 msearch 수행
|
||
|
|
all_uids = list(ts_by_uid.keys())
|
||
|
|
print(f" - {len(validation_keys)}개 페어({len(all_uids)}명)에 대해 스킬 사용 여부 교차 검증 중...")
|
||
|
|
|
||
|
|
for uid_batch in _chunks(all_uids, uid_chunk_size):
|
||
|
|
body_ndjson = build_msearch_body(uid_batch)
|
||
|
|
|
||
|
|
resp = None
|
||
|
|
last_err = None
|
||
|
|
for delay in backoff():
|
||
|
|
try:
|
||
|
|
resp = client.msearch(body=body_ndjson, request_timeout=REQUEST_TIMEOUT_SECONDS)
|
||
|
|
last_err = None
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
last_err = e
|
||
|
|
print(f" - msearch 재시도 {delay}s 대기: {e}", file=sys.stderr)
|
||
|
|
time.sleep(delay)
|
||
|
|
if last_err is not None:
|
||
|
|
print(f" - ❌ msearch 실패(해당 uid 배치 건너뜀): {last_err}", file=sys.stderr)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# msearch 응답 처리
|
||
|
|
for sub in resp.get("responses", []):
|
||
|
|
hits = sub.get("hits", {}).get("hits", [])
|
||
|
|
for h in hits:
|
||
|
|
src = h.get("_source") or {}
|
||
|
|
uid = src.get("uid"); ts = src.get("@timestamp")
|
||
|
|
if uid and ts:
|
||
|
|
skill_use_set.add((uid, ts))
|
||
|
|
|
||
|
|
print(f" - ✅ 교차 검증 완료. {len(skill_use_set)}건의 스킬 사용 내역을 확인했습니다.")
|
||
|
|
return skill_use_set
|
||
|
|
|
||
|
|
|
||
|
|
# =========================
|
||
|
|
# 2) 저장 전용 (분석 없음)
|
||
|
|
# =========================
|
||
|
|
def save_verified_batches_to_csv(
|
||
|
|
client,
|
||
|
|
verified_batches_iter: Iterable[Tuple[List[Dict], List[str]]],
|
||
|
|
output_filename: str,
|
||
|
|
):
|
||
|
|
"""
|
||
|
|
analyse_candidates_msearch(...) 의 결과 배치를 받아
|
||
|
|
- uid→nickname 조회
|
||
|
|
- CSV 행 기록
|
||
|
|
만 수행.
|
||
|
|
"""
|
||
|
|
|
||
|
|
nickname_map = {}
|
||
|
|
headers = ['uid', 'nickname', 'type', 'count']
|
||
|
|
|
||
|
|
with open(output_filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
||
|
|
writer = csv.writer(csvfile)
|
||
|
|
writer.writerow(headers)
|
||
|
|
|
||
|
|
for verified_hack_logs in verified_batches_iter:
|
||
|
|
summary_data = defaultdict(lambda: defaultdict(int))
|
||
|
|
|
||
|
|
for log in verified_hack_logs:
|
||
|
|
uid = log.get('uid')
|
||
|
|
if uid:
|
||
|
|
hack_type = len(log)
|
||
|
|
summary_data[uid][hack_type] += 1
|
||
|
|
|
||
|
|
uids_to_lookup = {log['uid'] for log in verified_hack_logs if 'uid' in log}
|
||
|
|
try:
|
||
|
|
nickname_map.update(fetch_nicknames(client, uids_to_lookup))
|
||
|
|
except Exception as e:
|
||
|
|
tqdm.write(f"\n - 닉네임 조회 중 오류: {e}", file=sys.stderr)
|
||
|
|
|
||
|
|
|
||
|
|
# uid를 기준으로 정렬하여 보기 좋게 출력
|
||
|
|
for uid, type_counts in sorted(summary_data.items()):
|
||
|
|
nickname = nickname_map.get(uid, 'N/A')
|
||
|
|
# type(연속 횟수)을 기준으로 정렬
|
||
|
|
for type, count in sorted(type_counts.items()):
|
||
|
|
writer.writerow([uid, nickname, type, count])
|
||
|
|
|
||
|
|
print(f"사용자별 핵 사용 통계 저장 완료 '{output_filename}' ---")
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
try:
|
||
|
|
client = make_client()
|
||
|
|
last_sort_values = load_checkpoint(CHECKPOINT_PATH)
|
||
|
|
|
||
|
|
print(f"[{datetime.now()}] 시작. 마지막 체크포인트: {last_sort_values}")
|
||
|
|
|
||
|
|
start_time = last_sort_values or "2025-08-01T00:00:00Z"
|
||
|
|
|
||
|
|
candidates, end_time = find_candidate_groups_all(client, start_time)
|
||
|
|
verified_iter = analyse_candidates_msearch(client, candidates, start_time, end_time)
|
||
|
|
save_verified_batches_to_csv(client, verified_iter, OUTPUT_FILENAME)
|
||
|
|
|
||
|
|
save_checkpoint(CHECKPOINT_PATH, end_time)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(e)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|