Compare commits

..

1 Commits

Author SHA1 Message Date
Oscar Krause
d56456d2f5 Merge branch 'v18.x-support' into 'main'
v18.x support / NLS 3.4.x compatibility

See merge request oscar.krause/fastapi-dls!46
2025-04-10 09:02:19 +02:00
2 changed files with 122 additions and 190 deletions

View File

@ -6,7 +6,7 @@ from datetime import datetime, timedelta, UTC
from hashlib import sha256
from json import loads as json_loads
from os import getenv as env
from os.path import join, dirname, isfile
from os.path import join, dirname
from uuid import uuid4
from dateutil.relativedelta import relativedelta
@ -18,11 +18,10 @@ from jose.constants import ALGORITHMS
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLResponse as HTMLr, Response, \
RedirectResponse
from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLResponse as HTMLr, Response, RedirectResponse
from orm import Origin, Lease, init as db_init, migrate
from util import PrivateKey, PublicKey, load_file, Cert
from util import PrivateKey, PublicKey, load_file
# Load variables
load_dotenv('../version.env')
@ -422,7 +421,7 @@ async def leasing_v1_config_token(request: Request):
logger.debug(f'Headers: {request.headers}')
logger.debug(f'Request: {j}')
# todo: THIS IS A DEMO ONLY
# todo: THIS IS A DEMO ONLY - THIS ENDPOINT GENERATES A NEW ROOT-CA EVERY TIME IT IS CALLED !!!
###
#
@ -430,14 +429,6 @@ async def leasing_v1_config_token(request: Request):
#
###
root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem')
root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem')
ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem')
ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem')
si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem')
si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem')
def init_config_token_demo():
from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
@ -472,18 +463,6 @@ async def leasing_v1_config_token(request: Request):
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_root_private_key_as_pem = my_root_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(root_private_key_filename, 'wb') as f:
f.write(my_root_private_key_as_pem)
with open(root_certificate_filename, 'wb') as f:
f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM))
""" Create CA (Intermediate) Key and Certificate """
# create ca keypair
@ -519,17 +498,8 @@ async def leasing_v1_config_token(request: Request):
), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_ca_private_key_as_pem = my_ca_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(ca_private_key_filename, 'wb') as f:
f.write(my_ca_private_key_as_pem)
with open(ca_certificate_filename, 'wb') as f:
f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM))
# with open('caChain_my.pem', 'wb') as f:
# f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM))
""" Create Service-Instance Key and Certificate """
@ -547,8 +517,8 @@ async def leasing_v1_config_token(request: Request):
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
with open(si_private_key_filename, 'wb') as f:
f.write(my_si_private_key_as_pem)
# with open('instance.private.pem', 'wb') as f:
# f.write(my_si_private_key_as_pem)
# with open('instance.public.pem', 'wb') as f:
# f.write(my_si_public_key_as_pem)
@ -589,22 +559,8 @@ async def leasing_v1_config_token(request: Request):
my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e
my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix
with open(si_certificate_filename, 'wb') as f:
f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM))
if not (isfile(root_private_key_filename)
and isfile(ca_private_key_filename)
and isfile(ca_certificate_filename)
and isfile(si_private_key_filename)
and isfile(si_certificate_filename)):
init_config_token_demo()
my_ca_certificate = Cert.from_file(ca_certificate_filename)
my_si_certificate = Cert.from_file(si_certificate_filename)
my_si_private_key = PrivateKey.from_file(si_private_key_filename)
my_si_private_key_as_pem = my_si_private_key.pem()
my_si_public_key = my_si_private_key.public_key().raw()
my_si_public_key_as_pem = my_si_private_key.public_key().pem()
# with open('cert_my.pem', 'wb') as f:
# f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM))
""" build out payload """
@ -634,8 +590,8 @@ async def leasing_v1_config_token(request: Request):
my_jwt_encode_key = jwk.construct(my_si_private_key_as_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256)
config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
response_ca_chain = my_ca_certificate.pem().decode('utf-8')
response_si_certificate = my_si_certificate.pem().decode('utf-8')
response_ca_chain = my_ca_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8')
response_si_certificate = my_si_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8')
response = {
"certificateConfiguration": {
@ -644,8 +600,8 @@ async def leasing_v1_config_token(request: Request):
# 76 chars per line
"publicCert": response_si_certificate,
"publicKey": {
"exp": int(my_si_certificate.raw().public_key().public_numbers().e),
"mod": [hex(my_si_certificate.raw().public_key().public_numbers().n)[2:]],
"exp": int(my_si_certificate.public_key().public_numbers().e),
"mod": [hex(my_si_certificate.public_key().public_numbers().n)[2:]],
},
},
"configToken": config_token,

View File

@ -3,7 +3,6 @@ import logging
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey, generate_private_key
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.x509 import load_pem_x509_certificate, Certificate
logging.basicConfig()
@ -77,29 +76,6 @@ class PublicKey:
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
class Cert:
def __init__(self, data: bytes):
self.__cert = load_pem_x509_certificate(data)
@staticmethod
def from_file(filename: str) -> "Cert":
log = logging.getLogger(__name__)
log.debug(f'Importing Certificate from "{filename}"')
with open(filename, 'rb') as f:
data = f.read()
return Cert(data=data.strip())
def raw(self) -> Certificate:
return self.__cert
def pem(self) -> bytes:
return self.__cert.public_bytes(encoding=serialization.Encoding.PEM)
def load_file(filename: str) -> bytes:
log = logging.getLogger(f'{__name__}')
log.debug(f'Loading contents of file "{filename}')