From 6075a2577f1fdc4ab97b2e850896ba33b8cb767f Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 02:29:02 +0800 Subject: [PATCH 01/12] move code to main function --- AutoBuild/nrdlist.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index de056f7..3d7f1c1 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -165,8 +165,9 @@ async def write_files(datalist: List[Dict[str, List[str]]]): ) -if __name__ == "__main__": +async def main(): import time + start = time.time() loop = asyncio.get_event_loop() ph1 = Phase1() @@ -180,3 +181,7 @@ if __name__ == "__main__": loop.run_until_complete(write_files([ph1.data, ph2.data, ph3.data, ph4.data])) end = time.time() - start logger.info(f"Time taken: {end:.2f} seconds") + + +if __name__ == "__main__": + asyncio.run(main()) From ba17662f6648c34fae539af1d12caa5101b7ba28 Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 02:31:08 +0800 Subject: [PATCH 02/12] Fix linting errors --- AutoBuild/nrdlist.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 3d7f1c1..d8237a7 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) class Phase1: def __init__(self): - self.base_url = os.getenv("PHASE1_URL") + self.base_url = os.getenv("PHASE1_URL", "") if not self.base_url: raise ValueError("PHASE1_URL not set") self.data: Dict[str, List[str]] = {} @@ -57,7 +57,7 @@ class Phase1: class Phase2: def __init__(self): - self.base_url = os.getenv("PHASE2_URL") + self.base_url = os.getenv("PHASE2_URL", "") if not self.base_url: raise ValueError("PHASE2_URL not set") self.data: Dict[str, List[str]] = {} @@ -87,7 +87,7 @@ class Phase2: class Phase3: def __init__(self): - self.base_url = os.getenv("PHASE3_URL") + self.base_url = os.getenv("PHASE3_URL", "") if not self.base_url: raise ValueError("PHASE3_URL not set") self.data: Dict[str, List[str]] = {} @@ -115,7 +115,7 @@ class Phase3: class Phase4: def __init__(self): - self.base_url = os.getenv("PHASE4_URL") + self.base_url = os.getenv("PHASE4_URL", "") if not self.base_url: raise ValueError("PHASE4_URL not set") self.data: Dict[str, List[str]] = {} @@ -132,7 +132,7 @@ class Phase4: self.data[date] = r.text.splitlines()[2:-2] async def run(self): - for i in range(5): + for _ in range(5): try: await self.fetch() except httpx.ReadTimeout: From a58542307ffac3f97359011599b860714b67b4c7 Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 02:52:34 +0800 Subject: [PATCH 03/12] Eliminate manual event loop --- AutoBuild/nrdlist.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index d8237a7..b69bd39 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -45,13 +45,13 @@ class Phase1: return False return True - async def run(self, loop: asyncio.AbstractEventLoop): + async def run(self): today = arrow.utcnow() for i in range(1, 31, 5): task = [] for j in range(i, i + 5): date = today.shift(days=-j) - task.append(loop.create_task(self.fetch(date))) + task.append(asyncio.create_task(self.fetch(date))) await asyncio.gather(*task) @@ -169,16 +169,20 @@ async def main(): import time start = time.time() - loop = asyncio.get_event_loop() ph1 = Phase1() ph2 = Phase2() ph3 = Phase3() ph4 = Phase4() - task = [ph1.run(loop), ph2.run(), ph3.run(), ph4.run()] - loop.run_until_complete(asyncio.gather(*task)) + task = [ + asyncio.create_task(ph1.run()), + asyncio.create_task(ph2.run()), + asyncio.create_task(ph3.run()), + asyncio.create_task(ph4.run()), + ] + await asyncio.gather(*task) logger.info("Download Complete, Now writing") - loop.run_until_complete(write_files([ph1.data, ph2.data, ph3.data, ph4.data])) + await write_files([ph1.data, ph2.data, ph3.data, ph4.data]) end = time.time() - start logger.info(f"Time taken: {end:.2f} seconds") From 14f0a3ba7f92d97fe40fceb50b0643a8d3d93e9a Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 02:54:59 +0800 Subject: [PATCH 04/12] Manipulating bytes directly --- AutoBuild/nrdlist.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index b69bd39..d5136b5 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -6,7 +6,7 @@ import os import pathlib from base64 import b64encode from io import BytesIO, StringIO -from typing import Dict, List +from typing import Dict, List, Set from zipfile import ZipFile, BadZipfile import arrow @@ -21,7 +21,7 @@ class Phase1: self.base_url = os.getenv("PHASE1_URL", "") if not self.base_url: raise ValueError("PHASE1_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self, date: arrow.Arrow) -> bool: logger.info("Downloading: %s", date.format("YYYY-MM-DD")) @@ -37,9 +37,9 @@ class Phase1: try: with ZipFile(zip_file, "r") as zip_obj: # print(zip_obj.read('domain-names.txt')) - self.data[date.format("YYYY-MM-DD")] = ( - zip_obj.read("domain-names.txt").decode().splitlines() - ) + self.data[date.format("YYYY-MM-DD")] = zip_obj.read( + "domain-names.txt" + ).splitlines() except BadZipfile: logger.error("Bad Zipfile: %s", url) return False @@ -60,7 +60,7 @@ class Phase2: self.base_url = os.getenv("PHASE2_URL", "") if not self.base_url: raise ValueError("PHASE2_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self): now = arrow.utcnow() @@ -74,11 +74,11 @@ class Phase2: return False if files == "nrd-1m.csv": self.data[now.shift(months=-1).date().strftime("%Y-%m-%d")] = ( - BytesIO(r.content).getvalue().decode().splitlines() + BytesIO(r.content).getvalue().splitlines() ) else: self.data[now.shift(weeks=-1).date().strftime("%Y-%m-%d")] = ( - BytesIO(r.content).getvalue().decode().splitlines() + BytesIO(r.content).getvalue().splitlines() ) async def run(self): @@ -90,7 +90,7 @@ class Phase3: self.base_url = os.getenv("PHASE3_URL", "") if not self.base_url: raise ValueError("PHASE3_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self): async with httpx.AsyncClient() as client: @@ -107,7 +107,9 @@ class Phase3: reader = csv.DictReader(data_file) for row in reader: if row["create_date"]: - self.data.setdefault(row["create_date"], []).append(row["domain_name"]) + self.data.setdefault(row["create_date"], []).append( + row["domain_name"].encode() + ) async def run(self): await self.fetch() @@ -118,7 +120,7 @@ class Phase4: self.base_url = os.getenv("PHASE4_URL", "") if not self.base_url: raise ValueError("PHASE4_URL not set") - self.data: Dict[str, List[str]] = {} + self.data: Dict[str, List[bytes]] = {} async def fetch(self): now = arrow.utcnow() @@ -129,7 +131,7 @@ class Phase4: logger.error("Download failed: %s", self.base_url) return False date = now.shift(days=-7).date().strftime("%Y-%m-%d") - self.data[date] = r.text.splitlines()[2:-2] + self.data[date] = r.content.splitlines()[2:-2] async def run(self): for _ in range(5): @@ -142,12 +144,12 @@ class Phase4: break -async def write_files(datalist: List[Dict[str, List[str]]]): +async def write_files(datalist: List[Dict[str, List[bytes]]]): base_path = pathlib.Path("nrd") if not base_path.exists(): base_path.mkdir() - combined_data: Dict[str, set] = {} + combined_data: Dict[str, Set[bytes]] = {} for data in datalist: for key, value in data.items(): if key not in combined_data: @@ -156,13 +158,11 @@ async def write_files(datalist: List[Dict[str, List[str]]]): combined_data[key].update(value) sort_date = sorted(combined_data.keys(), reverse=True)[:30] - accumulate = "" + accumulate = b"" for date in range(len(sort_date)): - accumulate += "\n".join(combined_data[sort_date[date]]) + accumulate += b"\n".join(combined_data[sort_date[date]]) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) - base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes( - accumulate.encode() - ) + base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes(accumulate) async def main(): From 624f5c8b586f09c801cd89dfcda01b67733c65b5 Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 02:58:45 +0800 Subject: [PATCH 05/12] Optimize task creation with list comprehension --- AutoBuild/nrdlist.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index d5136b5..634a065 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -48,10 +48,10 @@ class Phase1: async def run(self): today = arrow.utcnow() for i in range(1, 31, 5): - task = [] - for j in range(i, i + 5): - date = today.shift(days=-j) - task.append(asyncio.create_task(self.fetch(date))) + task = [ + asyncio.create_task(self.fetch(today.shift(days=-j))) + for j in range(i, i + 5) + ] await asyncio.gather(*task) From 8d8e97afa09d98e314b148cc1c797af2f9cb2a18 Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 03:09:56 +0800 Subject: [PATCH 06/12] Optimize: Replace StringIO with TextWrapIO --- AutoBuild/nrdlist.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 634a065..4dc4fb8 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -5,7 +5,7 @@ import logging import os import pathlib from base64 import b64encode -from io import BytesIO, StringIO +from io import BytesIO, TextIOWrapper from typing import Dict, List, Set from zipfile import ZipFile, BadZipfile @@ -101,9 +101,8 @@ class Phase3: return False with gzip.GzipFile(fileobj=BytesIO(r.content), mode="rb") as f: - raw_data = BytesIO(f.read()).getvalue().decode() + data_file = TextIOWrapper(BytesIO(f.read())) - data_file = StringIO(raw_data) reader = csv.DictReader(data_file) for row in reader: if row["create_date"]: From ea2bba853c034fde0b066c5819e7e1a189b6cae8 Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 03:14:14 +0800 Subject: [PATCH 07/12] Fix missing line break for accumulated files --- AutoBuild/nrdlist.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 4dc4fb8..80771ba 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -159,7 +159,10 @@ async def write_files(datalist: List[Dict[str, List[bytes]]]): sort_date = sorted(combined_data.keys(), reverse=True)[:30] accumulate = b"" for date in range(len(sort_date)): - accumulate += b"\n".join(combined_data[sort_date[date]]) + if not accumulate: + accumulate = b"\n".join(combined_data[sort_date[date]]) + else: + accumulate += b"\n" + b"\n".join(combined_data[sort_date[date]]) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes(accumulate) From 4ac75ba90d5bd12ef75869104df3ef8d5352384b Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 03:17:32 +0800 Subject: [PATCH 08/12] Use enuerate for idiomatic looping --- AutoBuild/nrdlist.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 80771ba..1cd5a96 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -158,13 +158,13 @@ async def write_files(datalist: List[Dict[str, List[bytes]]]): sort_date = sorted(combined_data.keys(), reverse=True)[:30] accumulate = b"" - for date in range(len(sort_date)): + for i, date in enumerate(sort_date): if not accumulate: - accumulate = b"\n".join(combined_data[sort_date[date]]) + accumulate = b"\n".join(combined_data[date]) else: - accumulate += b"\n" + b"\n".join(combined_data[sort_date[date]]) + accumulate += b"\n" + b"\n".join(combined_data[date]) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) - base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes(accumulate) + base_path.joinpath(f"past-{(i + 1):02d}day.txt").write_bytes(accumulate) async def main(): From 8b26bcfaab04e8eb80b179d25b632ba14b166a9b Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 03:19:56 +0800 Subject: [PATCH 09/12] Fix data range in Phase4 --- AutoBuild/nrdlist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 1cd5a96..17a6c38 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -130,7 +130,7 @@ class Phase4: logger.error("Download failed: %s", self.base_url) return False date = now.shift(days=-7).date().strftime("%Y-%m-%d") - self.data[date] = r.content.splitlines()[2:-2] + self.data[date] = r.content.splitlines()[2:-1] async def run(self): for _ in range(5): From a552b790c918de3137266b7cf0a62848bf5b3c39 Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 04:00:57 +0800 Subject: [PATCH 10/12] Combine only the first 30 days of data and leverage the insertion order behavior of the dictionary --- AutoBuild/nrdlist.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 17a6c38..887b691 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -6,6 +6,7 @@ import os import pathlib from base64 import b64encode from io import BytesIO, TextIOWrapper +from itertools import chain, islice from typing import Dict, List, Set from zipfile import ZipFile, BadZipfile @@ -148,21 +149,24 @@ async def write_files(datalist: List[Dict[str, List[bytes]]]): if not base_path.exists(): base_path.mkdir() - combined_data: Dict[str, Set[bytes]] = {} + sorted_date = sorted(chain.from_iterable(datalist), reverse=True) + # combine only the first 30 days + combined_data: Dict[str, Set[bytes]] = { + date: set() for date in islice(sorted_date, 30) + } for data in datalist: for key, value in data.items(): if key not in combined_data: - combined_data[key] = set(value) - else: - combined_data[key].update(value) + continue + combined_data[key].update(value) - sort_date = sorted(combined_data.keys(), reverse=True)[:30] accumulate = b"" - for i, date in enumerate(sort_date): + # combined_data is ordered by insertion (sorted date) + for i, data in enumerate(combined_data.values()): if not accumulate: - accumulate = b"\n".join(combined_data[date]) + accumulate = b"\n".join(data) else: - accumulate += b"\n" + b"\n".join(combined_data[date]) + accumulate += b"\n" + b"\n".join(data) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) base_path.joinpath(f"past-{(i + 1):02d}day.txt").write_bytes(accumulate) From 506fbbf0b1276bf629c36885be59add7aae4087b Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 04:05:31 +0800 Subject: [PATCH 11/12] Prevent storing duplicate records --- AutoBuild/nrdlist.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index 887b691..a5cecb7 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -160,15 +160,14 @@ async def write_files(datalist: List[Dict[str, List[bytes]]]): continue combined_data[key].update(value) - accumulate = b"" + dataset = set() # combined_data is ordered by insertion (sorted date) for i, data in enumerate(combined_data.values()): - if not accumulate: - accumulate = b"\n".join(data) - else: - accumulate += b"\n" + b"\n".join(data) + dataset.update(data) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) - base_path.joinpath(f"past-{(i + 1):02d}day.txt").write_bytes(accumulate) + base_path.joinpath(f"past-{(i + 1):02d}day.txt").write_bytes( + b"\n".join(dataset) + ) async def main(): From eab244b94266b1f1cf311af2cf38f237e543cbef Mon Sep 17 00:00:00 2001 From: eth3lbert Date: Sun, 14 Apr 2024 11:47:10 +0800 Subject: [PATCH 12/12] Change enumerate to start from 1 --- AutoBuild/nrdlist.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/AutoBuild/nrdlist.py b/AutoBuild/nrdlist.py index a5cecb7..87402ad 100644 --- a/AutoBuild/nrdlist.py +++ b/AutoBuild/nrdlist.py @@ -162,10 +162,10 @@ async def write_files(datalist: List[Dict[str, List[bytes]]]): dataset = set() # combined_data is ordered by insertion (sorted date) - for i, data in enumerate(combined_data.values()): + for i, data in enumerate(combined_data.values(), start=1): dataset.update(data) # accumulate = "\n".join(sorted(set(accumulate.split("\n")))) - base_path.joinpath(f"past-{(i + 1):02d}day.txt").write_bytes( + base_path.joinpath(f"past-{i:02d}day.txt").write_bytes( b"\n".join(dataset) )