更新 NRD 腳本同時又被 rust 制裁了

This commit is contained in:
tdc 2024-04-10 11:32:02 +08:00
parent 247b4b6306
commit 93247ca8f2
6 changed files with 191 additions and 128 deletions

13
AutoBuild/build.sh Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash
# Get the first argument
arg=$1
# Check the argument value
if [ "$arg" == "build" ]; then
pip install -r AutoBuild/requirements.txt && python AutoBuild/builder.py
elif [ "$arg" == "nrdlist" ]; then
pip install -r AutoBuild/requirements.txt && python AutoBuild/nrdlist.py
else
echo "Invalid argument. Please use 'build' or 'nrdlist'."
fi

View File

@ -4,31 +4,51 @@ from shutil import copyfile
import requests
filterlist = {
'abp': ['experimental.txt', 'filter.txt', 'PureView/news.txt', 'PureView/news_mobile.txt'],
'hosts': ['hosts.txt', 'nofarm.txt', 'TW165.txt', 'TWNIC-RPZ.txt']
"abp": [
"experimental.txt",
"filter.txt",
"PureView/news.txt",
"PureView/news_mobile.txt",
],
"hosts": [
"hosts.txt",
"nofarm.txt",
"TW165.txt",
"TWNIC-RPZ.txt",
"nrd/past-1day.txt",
"nrd/past-2day.txt",
"nrd/past-3day.txt",
"nrd/past-4day.txt",
"nrd/past-5day.txt",
"nrd/past-6day.txt",
],
}
url = 'https://filter.futa.gg/'
url = "https://filter.futa.gg/"
tz = timezone(timedelta(hours=+8))
today = datetime.now(tz).date()
class HEAD:
abp: str = '[Adblock Plus]\n' \
'! Title: LowTechFilter {name}\n' \
'! Version: {version}\n' \
'! Expires: 1 hour\n' \
'! Homepage: https://t.me/AdBlock_TW\n' \
'! ----------------------------------------------------------------------\n'
hosts: str = '! FutaHosts\n' \
'! Title: LowTechFilter {name}\n' \
'! URL: <https://github.com/FutaGuard/LowTechFilter>\n' \
'! Version: {version}\n' \
'! --------------------------------------------------\n'
abp: str = (
"[Adblock Plus]\n"
"! Title: LowTechFilter {name}\n"
"! Version: {version}\n"
"! Expires: 1 hour\n"
"! Homepage: https://t.me/AdBlock_TW\n"
"! ----------------------------------------------------------------------\n"
)
hosts: str = (
"! FutaHosts\n"
"! Title: LowTechFilter {name}\n"
"! URL: <https://github.com/FutaGuard/LowTechFilter>\n"
"! Version: {version}\n"
"! --------------------------------------------------\n"
)
def update_version(filename: str) -> str:
pattern = r'(?<=Version: )(\d+\.\d+\.)(\d+)'
newversion = ''
pattern = r"(?<=Version: )(\d+\.\d+\.)(\d+)"
newversion = ""
r = requests.get(url + filename)
first = None
@ -36,18 +56,18 @@ def update_version(filename: str) -> str:
if r.status_code != 200:
pass
else:
first = '\n'.join(r.text.splitlines()[:5])
first = "\n".join(r.text.splitlines()[:5])
try:
version = re.findall(pattern, first, re.MULTILINE)[0]
except:
# https://www.ptt.cc/bbs/Battlegirlhs/M.1506615677.A.1A4.html
version = ('2017.0929.', '1')
version = ("2017.0929.", "1")
dt = datetime.strptime(version[0], '%Y.%m%d.').date()
newversion = today.strftime('%Y.%m%d.')
dt = datetime.strptime(version[0], "%Y.%m%d.").date()
newversion = today.strftime("%Y.%m%d.")
if dt != today:
newversion += '1'
newversion += "1"
else:
newversion += str(int(version[1]) + 1)
return newversion
@ -57,100 +77,106 @@ for category in filterlist:
for filename in filterlist[category]:
newversion = update_version(filename)
with open(f'{filename}', 'r') as files:
with open(f"{filename}", "r") as files:
data = files.read()
with open(f'{filename}', 'w') as output:
with open(f"{filename}", "w") as output:
heads: str = HEAD().__getattribute__(category)
newhead = heads.format(
name=filename.split('.')[0].replace('_', ' ').replace('/', ' ').title(),
version=newversion
name=filename.split(".")[0]
.replace("_", " ")
.replace("/", " ")
.title(),
version=newversion,
)
output.write(newhead + data)
# hide farm site from google 轉換 abp
if filename == 'nofarm.txt':
domain_list = ''
if filename == "nofarm.txt":
domain_list = ""
for domains in data.splitlines():
if not domains.startswith('!'):
if not domains.startswith("!"):
domain = domains[2:-1]
domain_list += 'google.*##div.g:has(div[data-hveid] a[href*="{domain}"])\n'.format(
domain=domain
)
heads: str = HEAD().__getattribute__('abp')
heads: str = HEAD().__getattribute__("abp")
newhead = heads.format(
name='hide farm content from google',
version=newversion
name="hide farm content from google", version=newversion
)
with open('hide_farm_from_search.txt', 'w') as f:
with open("hide_farm_from_search.txt", "w") as f:
f.write(newhead + domain_list)
if filename == 'TW165.txt':
newfilename = 'TW165-redirect.txt'
heads: str = HEAD().__getattribute__('abp')
newhead = heads.format(
name='TW165 redirect',
version=newversion
)
with open(newfilename, 'w') as f:
if filename == "TW165.txt":
newfilename = "TW165-redirect.txt"
heads: str = HEAD().__getattribute__("abp")
newhead = heads.format(name="TW165 redirect", version=newversion)
with open(newfilename, "w") as f:
f.write(newhead)
f.write(''.join(f'||{e}^$dnsrewrite=NOERROR;A;34.102.218.71\n' for e in data.splitlines()))
f.write(
"".join(
f"||{e}^$dnsrewrite=NOERROR;A;34.102.218.71\n"
for e in data.splitlines()
)
)
# hosts to domains
def to_pure_domain(filename: str, data: str):
data = data.splitlines()
newdata = '\n'.join(data)
name = filename.split('.txt')[0].split('_')[0]
with open(name+'_domains.txt', 'w') as output:
if name == 'hosts':
pattern = r'(?<=^\|\|)\S+\.\S{2,}(?=\^)'
newoutput = '\n'.join(re.findall(pattern, newdata, re.MULTILINE))
newdata = "\n".join(data)
name = filename.split(".txt")[0].split("_")[0]
with open(name + "_domains.txt", "w") as output:
if name == "hosts":
pattern = r"(?<=^\|\|)\S+\.\S{2,}(?=\^)"
newoutput = "\n".join(
re.findall(pattern, newdata, re.MULTILINE)
)
else:
newoutput = '\n'.join(data)
newoutput = "\n".join(data)
output.write(newoutput)
if filename in filterlist['hosts']:
if filename in filterlist["hosts"]:
to_pure_domain(filename, data)
# make hosts formats
def to_hosts(filename: str, data: str):
data = data.splitlines()
newdata = '\n'.join(data)
name = filename.split('.txt')[0].split('_')[0]
heads: str = HEAD().__getattribute__('hosts')
newhead = heads.format(
name=name + ' hosts',
version=newversion
)
newfilename = name + '_hosts.txt' if name != 'hosts' else 'hosts.txt'
with open(newfilename, 'w') as output:
if name == 'hosts':
pattern = r'(?<=^\|\|)\S+\.\S{2,}(?=\^)'
newoutput = '\n'.join('0.0.0.0 ' + e for e in re.findall(pattern, newdata, re.MULTILINE))
newdata = "\n".join(data)
name = filename.split(".txt")[0].split("_")[0]
heads: str = HEAD().__getattribute__("hosts")
newhead = heads.format(name=name + " hosts", version=newversion)
newfilename = name + "_hosts.txt" if name != "hosts" else "hosts.txt"
with open(newfilename, "w") as output:
if name == "hosts":
pattern = r"(?<=^\|\|)\S+\.\S{2,}(?=\^)"
newoutput = "\n".join(
"0.0.0.0 " + e
for e in re.findall(pattern, newdata, re.MULTILINE)
)
else:
newoutput = '\n'.join('0.0.0.0 ' + e for e in data)
newoutput = "\n".join("0.0.0.0 " + e for e in data)
output.write(newhead)
output.write(newoutput)
# if filename in filterlist['hosts']:
# to_hosts(filename, data)
# 轉換為 abp 格式
def to_abp(filename: str, data: str):
data = data.splitlines()
newdata = '\n'.join(data)
name = filename.split('.txt')[0].split('_')[0]
heads: str = HEAD().__getattribute__('abp')
newhead = heads.format(
name=name + ' abp',
version=newversion
)
newdata = "\n".join(data)
name = filename.split(".txt")[0].split("_")[0]
heads: str = HEAD().__getattribute__("abp")
newhead = heads.format(name=name + " abp", version=newversion)
with open(name+'_abp.txt', 'w') as output:
if name == 'hosts':
with open(name + "_abp.txt", "w") as output:
if name == "hosts":
output.write(newhead + newdata)
else:
newoutput = '\n'.join(f'||{e}^' for e in data)
newoutput = "\n".join(f"||{e}^" for e in data)
output.write(newhead)
output.write(newoutput)
if filename in filterlist['hosts']:
if filename in filterlist["hosts"]:
to_abp(filename, data)
to_hosts(filename, data)

View File

@ -7,7 +7,7 @@ from base64 import b64encode
import pathlib
import logging
import asyncio
from zipfile import ZipFile
from zipfile import ZipFile, BadZipfile
from io import BytesIO
logging.basicConfig(level=logging.INFO)
@ -16,9 +16,7 @@ logger = logging.getLogger(__name__)
class Downloader:
def __init__(self):
self.base_url = (
"https://www.whoisds.com//whois-database/newly-registered-domains/{args}/nrd"
)
self.base_url = "https://www.whoisds.com//whois-database/newly-registered-domains/{args}/nrd"
self.base_path = pathlib.Path("nrd")
self.data: Dict[str, BytesIO] = {}
if not self.base_path.exists():
@ -39,19 +37,25 @@ class Downloader:
logger.error("Download failed: %s", url)
return False
zip_file = BytesIO(r.content)
with ZipFile(zip_file, 'r') as zip_obj:
# print(zip_obj.read('domain-names.txt'))
self.data[date.format("YYYY-MM-DD")] = zip_obj.read('domain-names.txt')
try:
with ZipFile(zip_file, "r") as zip_obj:
# print(zip_obj.read('domain-names.txt'))
self.data[date.format("YYYY-MM-DD")] = zip_obj.read(
"domain-names.txt"
)
except BadZipfile:
logger.error("Bad Zipfile: %s", url)
return False
return True
async def write(self):
# todo: extract zip file and write to disk
sort_date = sorted(self.data.keys(), reverse=True)
accumulate = ''
accumulate = ""
for date in range(len(sort_date)):
accumulate += self.data[sort_date[date]].decode()
self.base_path.joinpath(f"ndr_past_{date}.txt").write_bytes(accumulate.encode())
self.base_path.joinpath(f"past-{date+1}day.txt").write_bytes(
accumulate.encode()
)
def run(self):
loop = asyncio.get_event_loop()
@ -68,4 +72,4 @@ class Downloader:
if __name__ == "__main__":
Downloader().run()
Downloader().run()

View File

@ -1,5 +1,17 @@
anyio==4.3.0
arrow==1.3.0
certifi==2023.7.22
charset-normalizer==2.1.1
cssselect==1.2.0
h11==0.14.0
httpcore==1.0.5
httpx==0.27.0
idna==3.3
lxml==5.2.1
python-dateutil==2.9.0.post0
requests==2.28.1
urllib3==1.26.18
ruff==0.3.5
six==1.16.0
sniffio==1.3.1
types-python-dateutil==2.9.0.20240316
urllib3==1.26.18

View File

@ -8,71 +8,79 @@ import requests
from requests.auth import HTTPBasicAuth
logger = logging.getLogger(__name__)
IP_PATTERN = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
IP_PATTERN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
def exclude_list(domain: str) -> bool:
exclude = ['google.com']
exclude = ["google.com"]
for e in exclude:
if domain.endswith(e):
return True
return False
def is_pure_ip(domain: str) -> bool:
return True if re.match(IP_PATTERN, domain) else False
def main():
auth = os.getenv('auth', None)
jsonurl = os.getenv('tw165json', None)
csvurl = os.getenv('tw165csv', None)
auth = os.getenv("auth", None)
jsonurl = os.getenv("tw165json", None)
csvurl = os.getenv("tw165csv", None)
if not jsonurl or not csvurl:
logger.critical('URL NOT SET')
logger.critical("URL NOT SET")
return
if not auth:
logger.critical('AUTH NOT SET')
logger.critical("AUTH NOT SET")
return
user, passwd = auth.split(':')
user, passwd = auth.split(":")
basic = HTTPBasicAuth(user, passwd)
def fetchdata(url):
r = requests.get(url, auth=basic)
if r.status_code != 200:
logger.critical('Fetch Data Err')
logger.critical("Fetch Data Err")
return
return r
r = fetchdata(jsonurl)
try:
r_json = r.json()['result']['records']
r_json = r.json()["result"]["records"]
except (JSONDecodeError, KeyError):
logger.critical('Parse JSON Err')
logger.critical("Parse JSON Err")
raise
domains = dict.fromkeys([
urlparse(row['WEBURL']).hostname if row['WEBURL'].startswith('http') else urlparse(
'http://' + row['WEBURL']).hostname
for row in r_json[1:]
])
domains = dict.fromkeys(
[
urlparse(row["WEBURL"]).hostname
if row["WEBURL"].startswith("http")
else urlparse("http://" + row["WEBURL"]).hostname
for row in r_json[1:]
]
)
r = fetchdata(csvurl)
domains.update(dict.fromkeys(
[
urlparse(x.split(',')[1]).hostname if x.split(',')[1].startswith('http') else urlparse(
'http://' + x.split(',')[1]).hostname
for x in r.text.splitlines()[2:]
]
))
domains.update(
dict.fromkeys(
[
urlparse(x.split(",")[1]).hostname
if x.split(",")[1].startswith("http")
else urlparse("http://" + x.split(",")[1]).hostname
for x in r.text.splitlines()[2:]
]
)
)
# 移除純 IP & 移除允許清單
domains = {k: v for k, v in domains.items() if not is_pure_ip(k) \
and not exclude_list(k)}
filename = 'TW165.txt'
with open(filename, 'w') as f:
f.write('\n'.join(domains.keys()))
domains = {
k: v for k, v in domains.items() if not is_pure_ip(k) and not exclude_list(k)
}
filename = "TW165.txt"
with open(filename, "w") as f:
f.write("\n".join(domains.keys()))
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@ -9,24 +9,24 @@ logger = logging.getLogger(__name__)
def main():
r = requests.get('https://rpz.twnic.tw/e.html')
r = requests.get("https://rpz.twnic.tw/e.html")
if r.status_code != 200:
logger.critical('Fetch Data Err')
logger.critical("Fetch Data Err")
sys.exit(1)
# split text from <script> tag
raw: str = r.text.split('<script>')[1].split(';')[0].split('= ')[1]
raw: str = r.text.split("<script>")[1].split(";")[0].split("= ")[1]
parse_data: List[dict] = [dict()]
try:
parse_data = json.loads(raw)
except JSONDecodeError:
logger.critical('Parse JSON Err')
logger.critical("Parse JSON Err")
sys.exit(1)
output = [domain for in_dic in parse_data for domain in in_dic['domains']]
with open('TWNIC-RPZ.txt', 'w') as f:
f.write('\n'.join(output))
output = [domain for in_dic in parse_data for domain in in_dic["domains"]]
with open("TWNIC-RPZ.txt", "w") as f:
f.write("\n".join(output))
if __name__ == '__main__':
if __name__ == "__main__":
main()