新增複數 NDR 來源清單整合

This commit is contained in:
踢低吸 2024-04-13 21:52:16 +08:00
parent fc6ced7f5a
commit 11ffc8dea9

View File

@ -1,26 +1,27 @@
import signal
from typing import Dict
import httpx
import arrow
from base64 import b64encode
import pathlib
import logging
import asyncio
import csv
import gzip
import logging
import os
import pathlib
from base64 import b64encode
from io import BytesIO, StringIO
from typing import Dict, List
from zipfile import ZipFile, BadZipfile
from io import BytesIO
import arrow
import httpx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Downloader:
class Phase1:
def __init__(self):
self.base_url = "https://www.whoisds.com//whois-database/newly-registered-domains/{args}/nrd"
self.base_path = pathlib.Path("nrd")
self.data: Dict[str, BytesIO] = {}
if not self.base_path.exists():
self.base_path.mkdir()
self.base_url = os.getenv("PHASE1_URL")
if not self.base_url:
raise ValueError("PHASE1_URL not set")
self.data: Dict[str, List[str]] = {}
async def fetch(self, date: arrow.Arrow) -> bool:
logger.info("Downloading: %s", date.format("YYYY-MM-DD"))
@ -36,37 +37,107 @@ class Downloader:
try:
with ZipFile(zip_file, "r") as zip_obj:
# print(zip_obj.read('domain-names.txt'))
self.data[date.format("YYYY-MM-DD")] = zip_obj.read(
"domain-names.txt"
self.data[date.format("YYYY-MM-DD")] = (
zip_obj.read("domain-names.txt").decode().splitlines()
)
except BadZipfile:
logger.error("Bad Zipfile: %s", url)
return False
return True
async def write(self):
sort_date = sorted(self.data.keys(), reverse=True)
accumulate = ""
for date in range(len(sort_date)):
accumulate += self.data[sort_date[date]].decode()
accumulate = "\n".join(sorted(set(accumulate.split("\n"))))
self.base_path.joinpath(f"past-{date+1}day.txt").write_bytes(
accumulate.encode()
)
def run(self):
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGILL, loop.stop)
loop.add_signal_handler(signal.SIGINT, loop.stop)
async def run(self, loop: asyncio.AbstractEventLoop):
today = arrow.utcnow()
task = []
for i in range(1, 30, 5):
for i in range(1, 31, 5):
task = []
for j in range(i, i + 5):
date = today.shift(days=-j)
task.append(loop.create_task(self.fetch(date)))
loop.run_until_complete(asyncio.gather(*task))
asyncio.run(self.write())
await asyncio.gather(*task)
class Phase2:
def __init__(self):
self.base_url = os.getenv("PHASE2_URL")
if not self.base_url:
raise ValueError("PHASE2_URL not set")
self.data: Dict[str, List[str]] = {}
async def fetch(self):
now = arrow.utcnow()
async with httpx.AsyncClient() as client:
for files in ["nrd-1m.csv", "nrd-1w.csv"]:
url = self.base_url + files
logger.info("Downloading: %s", files)
r = await client.get(url)
if r.status_code != 200:
logger.error("Download failed: %s", files)
return False
if files == "nrd-1m.csv":
self.data[now.shift(months=-1).date().strftime("%Y-%m-%d")] = (
BytesIO(r.content).getvalue().decode().splitlines()
)
else:
self.data[now.shift(weeks=-1).date().strftime("%Y-%m-%d")] = (
BytesIO(r.content).getvalue().decode().splitlines()
)
async def run(self):
await self.fetch()
class Phase3:
def __init__(self):
self.base_url = os.getenv("PHASE3_URL")
if not self.base_url:
raise ValueError("PHASE3_URL not set")
self.data: Dict[str, List[str]] = {}
async def fetch(self):
async with httpx.AsyncClient() as client:
logger.info("Downloading: %s", self.base_url)
r = await client.get(self.base_url)
if r.status_code != 200:
logger.error("Download failed: %s", self.base_url)
return False
with gzip.GzipFile(fileobj=BytesIO(r.content), mode="rb") as f:
raw_data = BytesIO(f.read()).getvalue().decode()
data_file = StringIO(raw_data)
reader = csv.DictReader(data_file)
for row in reader:
if row["create_date"]:
self.data.setdefault(row["create_date"], []).append(row["domain_name"])
async def run(self):
await self.fetch()
if __name__ == "__main__":
Downloader().run()
loop = asyncio.get_event_loop()
ph1 = Phase1()
ph2 = Phase2()
ph3 = Phase3()
task = [ph1.run(loop), ph2.run(), ph3.run()]
loop.run_until_complete(asyncio.gather(*task))
logger.info("Download Complete, Now writing")
base_path = pathlib.Path("nrd")
if not base_path.exists():
base_path.mkdir()
combined_data: Dict[str, set] = {}
for data in [ph1.data, ph2.data, ph3.data]:
for key, value in data.items():
if key not in combined_data:
combined_data[key] = set(value)
else:
combined_data[key].update(value)
sort_date = sorted(combined_data.keys(), reverse=True)
accumulate = ""
for date in range(len(sort_date)):
accumulate += "\n".join(combined_data[sort_date[date]])
accumulate = "\n".join(sorted(set(accumulate.split("\n"))))
base_path.joinpath(f"past-{date + 1}day.txt").write_bytes(accumulate.encode())