Merge pull request #122 from FutaGuard/refactoring

Refactoring NRD
This commit is contained in:
踢低吸 2024-04-14 13:18:31 +08:00 committed by GitHub
commit 1cd0d902a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -5,8 +5,9 @@ import logging
import os
import pathlib
from base64 import b64encode
from io import BytesIO, StringIO
from typing import Dict, List
from io import BytesIO, TextIOWrapper
from itertools import chain, islice
from typing import Dict, List, Set
from zipfile import ZipFile, BadZipfile
import arrow
@ -18,10 +19,10 @@ logger = logging.getLogger(__name__)
class Phase1:
def __init__(self):
self.base_url = os.getenv("PHASE1_URL")
self.base_url = os.getenv("PHASE1_URL", "")
if not self.base_url:
raise ValueError("PHASE1_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self, date: arrow.Arrow) -> bool:
logger.info("Downloading: %s", date.format("YYYY-MM-DD"))
@ -37,30 +38,30 @@ class Phase1:
try:
with ZipFile(zip_file, "r") as zip_obj:
# print(zip_obj.read('domain-names.txt'))
self.data[date.format("YYYY-MM-DD")] = (
zip_obj.read("domain-names.txt").decode().splitlines()
)
self.data[date.format("YYYY-MM-DD")] = zip_obj.read(
"domain-names.txt"
).splitlines()
except BadZipfile:
logger.error("Bad Zipfile: %s", url)
return False
return True
async def run(self, loop: asyncio.AbstractEventLoop):
async def run(self):
today = arrow.utcnow()
for i in range(1, 31, 5):
task = []
for j in range(i, i + 5):
date = today.shift(days=-j)
task.append(loop.create_task(self.fetch(date)))
task = [
asyncio.create_task(self.fetch(today.shift(days=-j)))
for j in range(i, i + 5)
]
await asyncio.gather(*task)
class Phase2:
def __init__(self):
self.base_url = os.getenv("PHASE2_URL")
self.base_url = os.getenv("PHASE2_URL", "")
if not self.base_url:
raise ValueError("PHASE2_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self):
now = arrow.utcnow()
@ -74,11 +75,11 @@ class Phase2:
return False
if files == "nrd-1m.csv":
self.data[now.shift(months=-1).date().strftime("%Y-%m-%d")] = (
BytesIO(r.content).getvalue().decode().splitlines()
BytesIO(r.content).getvalue().splitlines()
)
else:
self.data[now.shift(weeks=-1).date().strftime("%Y-%m-%d")] = (
BytesIO(r.content).getvalue().decode().splitlines()
BytesIO(r.content).getvalue().splitlines()
)
async def run(self):
@ -87,10 +88,10 @@ class Phase2:
class Phase3:
def __init__(self):
self.base_url = os.getenv("PHASE3_URL")
self.base_url = os.getenv("PHASE3_URL", "")
if not self.base_url:
raise ValueError("PHASE3_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self):
async with httpx.AsyncClient() as client:
@ -101,13 +102,14 @@ class Phase3:
return False
with gzip.GzipFile(fileobj=BytesIO(r.content), mode="rb") as f:
raw_data = BytesIO(f.read()).getvalue().decode()
data_file = TextIOWrapper(BytesIO(f.read()))
data_file = StringIO(raw_data)
reader = csv.DictReader(data_file)
for row in reader:
if row["create_date"]:
self.data.setdefault(row["create_date"], []).append(row["domain_name"])
self.data.setdefault(row["create_date"], []).append(
row["domain_name"].encode()
)
async def run(self):
await self.fetch()
@ -115,10 +117,10 @@ class Phase3:
class Phase4:
def __init__(self):
self.base_url = os.getenv("PHASE4_URL")
self.base_url = os.getenv("PHASE4_URL", "")
if not self.base_url:
raise ValueError("PHASE4_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self):
now = arrow.utcnow()
@ -129,10 +131,10 @@ class Phase4:
logger.error("Download failed: %s", self.base_url)
return False
date = now.shift(days=-7).date().strftime("%Y-%m-%d")
self.data[date] = r.text.splitlines()[2:-2]
self.data[date] = r.content.splitlines()[2:-1]
async def run(self):
for i in range(5):
for _ in range(5):
try:
await self.fetch()
except httpx.ReadTimeout:
@ -142,41 +144,53 @@ class Phase4:
break
async def write_files(datalist: List[Dict[str, List[str]]]):
async def write_files(datalist: List[Dict[str, List[bytes]]]):
base_path = pathlib.Path("nrd")
if not base_path.exists():
base_path.mkdir()
combined_data: Dict[str, set] = {}
sorted_date = sorted(chain.from_iterable(datalist), reverse=True)
# combine only the first 30 days
combined_data: Dict[str, Set[bytes]] = {
date: set() for date in islice(sorted_date, 30)
}
for data in datalist:
for key, value in data.items():
if key not in combined_data:
combined_data[key] = set(value)
else:
combined_data[key].update(value)
continue
combined_data[key].update(value)
sort_date = sorted(combined_data.keys(), reverse=True)[:30]
accumulate = ""
for date in range(len(sort_date)):
accumulate += "\n".join(combined_data[sort_date[date]])
dataset = set()
# combined_data is ordered by insertion (sorted date)
for i, data in enumerate(combined_data.values(), start=1):
dataset.update(data)
# accumulate = "\n".join(sorted(set(accumulate.split("\n"))))
base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes(
accumulate.encode()
base_path.joinpath(f"past-{i:02d}day.txt").write_bytes(
b"\n".join(dataset)
)
if __name__ == "__main__":
async def main():
import time
start = time.time()
loop = asyncio.get_event_loop()
ph1 = Phase1()
ph2 = Phase2()
ph3 = Phase3()
ph4 = Phase4()
task = [ph1.run(loop), ph2.run(), ph3.run(), ph4.run()]
loop.run_until_complete(asyncio.gather(*task))
task = [
asyncio.create_task(ph1.run()),
asyncio.create_task(ph2.run()),
asyncio.create_task(ph3.run()),
asyncio.create_task(ph4.run()),
]
await asyncio.gather(*task)
logger.info("Download Complete, Now writing")
loop.run_until_complete(write_files([ph1.data, ph2.data, ph3.data, ph4.data]))
await write_files([ph1.data, ph2.data, ph3.data, ph4.data])
end = time.time() - start
logger.info(f"Time taken: {end:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())