diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index de056f7..87402ad 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -5,8 +5,9 @@ import logging import os import pathlib from base64 import b64encode -from io import BytesIO, StringIO -from typing import Dict, List +from io import BytesIO, TextIOWrapper +from itertools import chain, islice +from typing import Dict, List, Set from zipfile import ZipFile, BadZipfile import arrow @@ -18,10 +19,10 @@ logger = logging.getLogger(__name__) class Phase1: def __init__(self): - self.base_url = os.getenv("PHASE1_URL") + self.base_url = os.getenv("PHASE1_URL", "") if not self.base_url: raise ValueError("PHASE1_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self, date: arrow.Arrow) -> bool: logger.info("Downloading: %s", date.format("YYYY-MM-DD")) @@ -37,30 +38,30 @@ class Phase1: try: with ZipFile(zip_file, "r") as zip_obj: # print(zip_obj.read('domain-names.txt')) - self.data[date.format("YYYY-MM-DD")] = ( - zip_obj.read("domain-names.txt").decode().splitlines() - ) + self.data[date.format("YYYY-MM-DD")] = zip_obj.read( + "domain-names.txt" + ).splitlines() except BadZipfile: logger.error("Bad Zipfile: %s", url) return False return True - async def run(self, loop: asyncio.AbstractEventLoop): + async def run(self): today = arrow.utcnow() for i in range(1, 31, 5): - task = [] - for j in range(i, i + 5): - date = today.shift(days=-j) - task.append(loop.create_task(self.fetch(date))) + task = [ + asyncio.create_task(self.fetch(today.shift(days=-j))) + for j in range(i, i + 5) + ] await asyncio.gather(*task) class Phase2: def __init__(self): - self.base_url = os.getenv("PHASE2_URL") + self.base_url = os.getenv("PHASE2_URL", "") if not self.base_url: raise ValueError("PHASE2_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self): now = arrow.utcnow() @@ -74,11 +75,11 @@ class Phase2: return False if files == "nrd-1m.csv": self.data[now.shift(months=-1).date().strftime("%Y-%m-%d")] = ( - BytesIO(r.content).getvalue().decode().splitlines() + BytesIO(r.content).getvalue().splitlines() ) else: self.data[now.shift(weeks=-1).date().strftime("%Y-%m-%d")] = ( - BytesIO(r.content).getvalue().decode().splitlines() + BytesIO(r.content).getvalue().splitlines() ) async def run(self): @@ -87,10 +88,10 @@ class Phase2: class Phase3: def __init__(self): - self.base_url = os.getenv("PHASE3_URL") + self.base_url = os.getenv("PHASE3_URL", "") if not self.base_url: raise ValueError("PHASE3_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self): async with httpx.AsyncClient() as client: @@ -101,13 +102,14 @@ class Phase3: return False with gzip.GzipFile(fileobj=BytesIO(r.content), mode="rb") as f: - raw_data = BytesIO(f.read()).getvalue().decode() + data_file = TextIOWrapper(BytesIO(f.read())) - data_file = StringIO(raw_data) reader = csv.DictReader(data_file) for row in reader: if row["create_date"]: - self.data.setdefault(row["create_date"], []).append(row["domain_name"]) + self.data.setdefault(row["create_date"], []).append( + row["domain_name"].encode() + ) async def run(self): await self.fetch() @@ -115,10 +117,10 @@ class Phase3: class Phase4: def __init__(self): - self.base_url = os.getenv("PHASE4_URL") + self.base_url = os.getenv("PHASE4_URL", "") if not self.base_url: raise ValueError("PHASE4_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self): now = arrow.utcnow() @@ -129,10 +131,10 @@ class Phase4: logger.error("Download failed: %s", self.base_url) return False date = now.shift(days=-7).date().strftime("%Y-%m-%d") - self.data[date] = r.text.splitlines()[2:-2] + self.data[date] = r.content.splitlines()[2:-1] async def run(self): - for i in range(5): + for _ in range(5): try: await self.fetch() except httpx.ReadTimeout: @@ -142,41 +144,53 @@ class Phase4: break -async def write_files(datalist: List[Dict[str, List[str]]]): +async def write_files(datalist: List[Dict[str, List[bytes]]]): base_path = pathlib.Path("nrd") if not base_path.exists(): base_path.mkdir() - combined_data: Dict[str, set] = {} + sorted_date = sorted(chain.from_iterable(datalist), reverse=True) + # combine only the first 30 days + combined_data: Dict[str, Set[bytes]] = { + date: set() for date in islice(sorted_date, 30) + } for data in datalist: for key, value in data.items(): if key not in combined_data: - combined_data[key] = set(value) - else: - combined_data[key].update(value) + continue + combined_data[key].update(value) - sort_date = sorted(combined_data.keys(), reverse=True)[:30] - accumulate = "" - for date in range(len(sort_date)): - accumulate += "\n".join(combined_data[sort_date[date]]) + dataset = set() + # combined_data is ordered by insertion (sorted date) + for i, data in enumerate(combined_data.values(), start=1): + dataset.update(data) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) - base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes( - accumulate.encode() + base_path.joinpath(f"past-{i:02d}day.txt").write_bytes( + b"\n".join(dataset) ) -if __name__ == "__main__": +async def main(): import time + start = time.time() - loop = asyncio.get_event_loop() ph1 = Phase1() ph2 = Phase2() ph3 = Phase3() ph4 = Phase4() - task = [ph1.run(loop), ph2.run(), ph3.run(), ph4.run()] - loop.run_until_complete(asyncio.gather(*task)) + task = [ + asyncio.create_task(ph1.run()), + asyncio.create_task(ph2.run()), + asyncio.create_task(ph3.run()), + asyncio.create_task(ph4.run()), + ] + await asyncio.gather(*task) logger.info("Download Complete, Now writing") - loop.run_until_complete(write_files([ph1.data, ph2.data, ph3.data, ph4.data])) + await write_files([ph1.data, ph2.data, ph3.data, ph4.data]) end = time.time() - start logger.info(f"Time taken: {end:.2f} seconds") + + +if __name__ == "__main__": + asyncio.run(main())