Compare commits

..

15 Commits

Author SHA1 Message Date
踢低吸
c60bdfa574
Merge pull request #123 from FutaGuard/fix-combined
Fixed missing dates in combined_data
2024-04-14 13:55:42 +08:00
eth3lbert
8a15ae947a
Fixed missing dates in combined_data 2024-04-14 13:52:10 +08:00
踢低吸
1cd0d902a6
Merge pull request #122 from FutaGuard/refactoring
Refactoring NRD
2024-04-14 13:18:31 +08:00
eth3lbert
eab244b942
Change enumerate to start from 1 2024-04-14 11:47:10 +08:00
eth3lbert
506fbbf0b1
Prevent storing duplicate records 2024-04-14 04:05:31 +08:00
eth3lbert
a552b790c9
Combine only the first 30 days of data and leverage the insertion order behavior of the dictionary 2024-04-14 04:00:57 +08:00
eth3lbert
8b26bcfaab
Fix data range in Phase4 2024-04-14 03:19:56 +08:00
eth3lbert
4ac75ba90d
Use enuerate for idiomatic looping 2024-04-14 03:17:32 +08:00
eth3lbert
ea2bba853c
Fix missing line break for accumulated files 2024-04-14 03:14:14 +08:00
eth3lbert
8d8e97afa0
Optimize: Replace StringIO with TextWrapIO 2024-04-14 03:09:56 +08:00
eth3lbert
624f5c8b58
Optimize task creation with list comprehension 2024-04-14 02:58:45 +08:00
eth3lbert
14f0a3ba7f
Manipulating bytes directly 2024-04-14 02:54:59 +08:00
eth3lbert
a58542307f
Eliminate manual event loop 2024-04-14 02:52:34 +08:00
eth3lbert
ba17662f66
Fix linting errors 2024-04-14 02:31:08 +08:00
eth3lbert
6075a2577f
move code to main function 2024-04-14 02:29:02 +08:00

View File

@ -5,8 +5,9 @@ import logging
import os
import pathlib
from base64 import b64encode
from io import BytesIO, StringIO
from typing import Dict, List
from io import BytesIO, TextIOWrapper
from itertools import chain, islice
from typing import Dict, List, Set
from zipfile import ZipFile, BadZipfile
import arrow
@ -18,10 +19,10 @@ logger = logging.getLogger(__name__)
class Phase1:
def __init__(self):
self.base_url = os.getenv("PHASE1_URL")
self.base_url = os.getenv("PHASE1_URL", "")
if not self.base_url:
raise ValueError("PHASE1_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self, date: arrow.Arrow) -> bool:
logger.info("Downloading: %s", date.format("YYYY-MM-DD"))
@ -37,30 +38,30 @@ class Phase1:
try:
with ZipFile(zip_file, "r") as zip_obj:
# print(zip_obj.read('domain-names.txt'))
self.data[date.format("YYYY-MM-DD")] = (
zip_obj.read("domain-names.txt").decode().splitlines()
)
self.data[date.format("YYYY-MM-DD")] = zip_obj.read(
"domain-names.txt"
).splitlines()
except BadZipfile:
logger.error("Bad Zipfile: %s", url)
return False
return True
async def run(self, loop: asyncio.AbstractEventLoop):
async def run(self):
today = arrow.utcnow()
for i in range(1, 31, 5):
task = []
for j in range(i, i + 5):
date = today.shift(days=-j)
task.append(loop.create_task(self.fetch(date)))
task = [
asyncio.create_task(self.fetch(today.shift(days=-j)))
for j in range(i, i + 5)
]
await asyncio.gather(*task)
class Phase2:
def __init__(self):
self.base_url = os.getenv("PHASE2_URL")
self.base_url = os.getenv("PHASE2_URL", "")
if not self.base_url:
raise ValueError("PHASE2_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self):
now = arrow.utcnow()
@ -74,11 +75,11 @@ class Phase2:
return False
if files == "nrd-1m.csv":
self.data[now.shift(months=-1).date().strftime("%Y-%m-%d")] = (
BytesIO(r.content).getvalue().decode().splitlines()
BytesIO(r.content).getvalue().splitlines()
)
else:
self.data[now.shift(weeks=-1).date().strftime("%Y-%m-%d")] = (
BytesIO(r.content).getvalue().decode().splitlines()
BytesIO(r.content).getvalue().splitlines()
)
async def run(self):
@ -87,10 +88,10 @@ class Phase2:
class Phase3:
def __init__(self):
self.base_url = os.getenv("PHASE3_URL")
self.base_url = os.getenv("PHASE3_URL", "")
if not self.base_url:
raise ValueError("PHASE3_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self):
async with httpx.AsyncClient() as client:
@ -101,13 +102,14 @@ class Phase3:
return False
with gzip.GzipFile(fileobj=BytesIO(r.content), mode="rb") as f:
raw_data = BytesIO(f.read()).getvalue().decode()
data_file = TextIOWrapper(BytesIO(f.read()))
data_file = StringIO(raw_data)
reader = csv.DictReader(data_file)
for row in reader:
if row["create_date"]:
self.data.setdefault(row["create_date"], []).append(row["domain_name"])
self.data.setdefault(row["create_date"], []).append(
row["domain_name"].encode()
)
async def run(self):
await self.fetch()
@ -115,10 +117,10 @@ class Phase3:
class Phase4:
def __init__(self):
self.base_url = os.getenv("PHASE4_URL")
self.base_url = os.getenv("PHASE4_URL", "")
if not self.base_url:
raise ValueError("PHASE4_URL not set")
self.data: Dict[str, List[str]] = {}
self.data: Dict[str, List[bytes]] = {}
async def fetch(self):
now = arrow.utcnow()
@ -129,10 +131,10 @@ class Phase4:
logger.error("Download failed: %s", self.base_url)
return False
date = now.shift(days=-7).date().strftime("%Y-%m-%d")
self.data[date] = r.text.splitlines()[2:-2]
self.data[date] = r.content.splitlines()[2:-1]
async def run(self):
for i in range(5):
for _ in range(5):
try:
await self.fetch()
except httpx.ReadTimeout:
@ -142,41 +144,53 @@ class Phase4:
break
async def write_files(datalist: List[Dict[str, List[str]]]):
async def write_files(datalist: List[Dict[str, List[bytes]]]):
base_path = pathlib.Path("nrd")
if not base_path.exists():
base_path.mkdir()
combined_data: Dict[str, set] = {}
sorted_date = sorted(set(chain.from_iterable(datalist)), reverse=True)
# combine only the first 30 days
combined_data: Dict[str, Set[bytes]] = {
date: set() for date in islice(sorted_date, 30)
}
for data in datalist:
for key, value in data.items():
if key not in combined_data:
combined_data[key] = set(value)
else:
combined_data[key].update(value)
continue
combined_data[key].update(value)
sort_date = sorted(combined_data.keys(), reverse=True)[:30]
accumulate = ""
for date in range(len(sort_date)):
accumulate += "\n".join(combined_data[sort_date[date]])
dataset = set()
# combined_data is ordered by insertion (sorted date)
for i, data in enumerate(combined_data.values(), start=1):
dataset.update(data)
# accumulate = "\n".join(sorted(set(accumulate.split("\n"))))
base_path.joinpath(f"past-{(date + 1):02d}day.txt").write_bytes(
accumulate.encode()
base_path.joinpath(f"past-{i:02d}day.txt").write_bytes(
b"\n".join(dataset)
)
if __name__ == "__main__":
async def main():
import time
start = time.time()
loop = asyncio.get_event_loop()
ph1 = Phase1()
ph2 = Phase2()
ph3 = Phase3()
ph4 = Phase4()
task = [ph1.run(loop), ph2.run(), ph3.run(), ph4.run()]
loop.run_until_complete(asyncio.gather(*task))
task = [
asyncio.create_task(ph1.run()),
asyncio.create_task(ph2.run()),
asyncio.create_task(ph3.run()),
asyncio.create_task(ph4.run()),
]
await asyncio.gather(*task)
logger.info("Download Complete, Now writing")
loop.run_until_complete(write_files([ph1.data, ph2.data, ph3.data, ph4.data]))
await write_files([ph1.data, ph2.data, ph3.data, ph4.data])
end = time.time() - start
logger.info(f"Time taken: {end:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())