diff --git a/app/main.py b/app/main.py index f11ba5c..b3d5247 100644 --- a/app/main.py +++ b/app/main.py @@ -412,6 +412,206 @@ async def auth_v1_token(request: Request): return JSONr(response) +# NLS 3.4.0 - venv/lib/python3.12/site-packages/nls_services_lease/test/test_lease_single_controller.py +@app.post('/leasing/v1/config-token', description='request to get config token for lease operations') +async def leasing_v1_config_token(request: Request): + j, cur_time = json_loads((await request.body()).decode('utf-8')), datetime.now(UTC) + + logger.debug(f'CALLED /leasing/v1/config-token') + logger.debug(f'Headers: {request.headers}') + logger.debug(f'Request: {j}') + + # todo: THIS IS A DEMO ONLY - THIS ENDPOINT GENERATES A NEW ROOT-CA EVERY TIME IT IS CALLED !!! + + ### + # + # https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py + # + ### + + from cryptography import x509 + from cryptography.hazmat._oid import NameOID + from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key + from cryptography.hazmat.primitives.serialization import Encoding + + """ Create Root Key and Certificate """ + + # create root keypair + my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096) + my_root_public_key = my_root_private_key.public_key() + + # create root-certificate subject + my_root_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), + x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'), + ]) + + # create self-signed root-certificate + my_root_certificate = ( + x509.CertificateBuilder() + .subject_name(my_root_subject) + .issuer_name(my_root_subject) + .public_key(my_root_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False) + .sign(my_root_private_key, hashes.SHA256())) + + """ Create CA (Intermediate) Key and Certificate """ + + # create ca keypair + my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096) + my_ca_public_key = my_ca_private_key.public_key() + + # create ca-certificate subject + my_ca_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), + x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'), + ]) + + # create self-signed ca-certificate + my_ca_certificate = ( + x509.CertificateBuilder() + .subject_name(my_ca_subject) + .issuer_name(my_root_subject) + .public_key(my_ca_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.KeyUsage(digital_signature=False, key_encipherment=False, key_cert_sign=True, + key_agreement=False, content_commitment=False, data_encipherment=False, + crl_sign=True, encipher_only=False, decipher_only=False), critical=True) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False) + # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False) + .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value + ), critical=False) + .sign(my_root_private_key, hashes.SHA256())) + + # with open('caChain_my.pem', 'wb') as f: + # f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM)) + + """ Create Service-Instance Key and Certificate """ + + # create si keypair + my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048) + my_si_public_key = my_si_private_key.public_key() + + my_si_private_key_as_pem = my_si_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + my_si_public_key_as_pem = my_si_public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + # with open('instance.private.pem', 'wb') as f: + # f.write(my_si_private_key_as_pem) + + # with open('instance.public.pem', 'wb') as f: + # f.write(my_si_public_key_as_pem) + + # create si-certificate subject + my_si_subject = x509.Name([ + #x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF), + x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')), + ]) + + # create self-signed si-certificate + my_si_certificate = ( + x509.CertificateBuilder() + .subject_name(my_si_subject) + .issuer_name(my_ca_subject) + .public_key(my_si_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False, + key_agreement=True, content_commitment=False, data_encipherment=False, + crl_sign=False, encipher_only=False, decipher_only=False), critical=True) + .add_extension(x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH] + ), critical=False) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False) + # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False) + .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value + ), critical=False) + .add_extension(x509.SubjectAlternativeName([ + #x509.DNSName(INSTANCE_REF) + x509.DNSName(j.get('service_instance_ref')) + ]), critical=False) + .sign(my_ca_private_key, hashes.SHA256())) + + my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e + my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix + + # with open('cert_my.pem', 'wb') as f: + # f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM)) + + """ build out payload """ + + cur_time = datetime.now(UTC) + exp_time = cur_time + CLIENT_TOKEN_EXPIRE_DELTA + + payload = { + "iss": "NLS Service Instance", + "aud": "NLS Licensed Client", + "iat": timegm(cur_time.timetuple()), + "nbf": timegm(cur_time.timetuple()), + "exp": timegm(exp_time.timetuple()), + "protocol_version": "2.0", + "d_name": "DLS", + "service_instance_ref": j.get('service_instance_ref'), + "service_instance_public_key_configuration": { + "service_instance_public_key_me": { + "mod": hex(my_si_public_key.public_numbers().n)[2:], + "exp": int(my_si_public_key.public_numbers().e), + }, + # 64 chars per line (pem default) + "service_instance_public_key_pem": my_si_public_key_as_pem.decode('utf-8').strip(), + "key_retention_mode": "LATEST_ONLY" + }, + } + + my_jwt_encode_key = jwk.construct(my_si_private_key_as_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) + config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256) + + response_ca_chain = my_ca_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8') + response_si_certificate = my_si_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8') + + response = { + "certificateConfiguration": { + # 76 chars per line + "caChain": [response_ca_chain], + # 76 chars per line + "publicCert": response_si_certificate, + "publicKey": { + "exp": int(my_si_certificate.public_key().public_numbers().e), + "mod": [hex(my_si_certificate.public_key().public_numbers().n)[2:]], + }, + }, + "configToken": config_token, + } + + logging.debug(response) + + return JSONr(response, status_code=200) + + # venv/lib/python3.9/site-packages/nls_services_lease/test/test_lease_multi_controller.py @app.post('/leasing/v1/lessor', description='request multiple leases (borrow) for current origin') async def leasing_v1_lessor(request: Request): diff --git a/test/main.py b/test/main.py index 653548f..014a86f 100644 --- a/test/main.py +++ b/test/main.py @@ -1,3 +1,4 @@ +import json import sys from base64 import b64encode as b64enc from calendar import timegm @@ -7,7 +8,7 @@ from os.path import dirname, join from uuid import uuid4, UUID from dateutil.relativedelta import relativedelta -from jose import jwt, jwk +from jose import jwt, jwk, jws from jose.constants import ALGORITHMS from starlette.testclient import TestClient @@ -20,6 +21,7 @@ from util import PrivateKey, PublicKey client = TestClient(main.app) +INSTANCE_REF = '10000000-0000-0000-0000-000000000001' ORIGIN_REF, ALLOTMENT_REF, SECRET = str(uuid4()), '20000000-0000-0000-0000-000000000001', 'HelloWorld' # INSTANCE_KEY_RSA = generate_key() @@ -69,6 +71,31 @@ def test_client_token(): assert response.status_code == 200 +def test_config_token(): # todo: /leasing/v1/config-token + # https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py + + response = client.post('/leasing/v1/config-token', json={"service_instance_ref": INSTANCE_REF}) + assert response.status_code == 200 + + nv_response_certificate_configuration = response.json().get('certificateConfiguration') + nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8') + nv_jwt_decode_key = jwk.construct(nv_response_public_cert, algorithm=ALGORITHMS.RS256) + + nv_response_config_token = response.json().get('configToken') + + payload = jws.verify(nv_response_config_token, key=nv_jwt_decode_key, algorithms=ALGORITHMS.RS256) + payload = json.loads(payload) + assert payload.get('iss') == 'NLS Service Instance' + assert payload.get('aud') == 'NLS Licensed Client' + assert payload.get('service_instance_ref') == INSTANCE_REF + + nv_si_public_key_configuration = payload.get('service_instance_public_key_configuration') + nv_si_public_key_me = nv_si_public_key_configuration.get('service_instance_public_key_me') + # assert nv_si_public_key_me.get('mod') == 1 #nv_si_public_key_mod + assert len(nv_si_public_key_me.get('mod')) == 512 + assert nv_si_public_key_me.get('exp') == 65537 # nv_si_public_key_exp + + def test_origins(): pass