diff --git a/.gitignore b/.gitignore index cb79de0..af5a240 100644 --- a/.gitignore +++ b/.gitignore @@ -67,3 +67,5 @@ target/ # IDEs .idea + +.env \ No newline at end of file diff --git a/README.md b/README.md index 5be3f5b..fdc8e67 100644 --- a/README.md +++ b/README.md @@ -45,12 +45,29 @@ client.auth.universal_auth.login( secrets = client.secrets.list_secrets(project_id="", environment_slug="dev", secret_path="/") ``` +## InfisicalSDKClient Parameters + +The `InfisicalSDKClient` takes the following parameters, which are used as a global configuration for the lifetime of the SDK instance. + +- **host** (`str`, _Optional_): The host URL for your Infisical instance. Defaults to `https://app.infisical.com`. +- **token** (`str`, _Optional_): Specify an authentication token to use for all requests. If provided, you will not need to call any of the `auth` methods. Defaults to `None` +- **cache_ttl** (`int`, _Optional_): The SDK has built-in client-side caching for secrets, greatly improving response times. By default, secrets are cached for 1 minute (60 seconds). You can disable caching by setting `cache_ttl` to `None`, or adjust the duration in seconds as needed. + +```python +client = InfisicalSDKClient( + host="https://app.infisical.com", # Defaults to https://app.infisical.com + token="", # If not set, use the client.auth() methods. + cache_ttl = 300 # `None` to disable caching +) +``` + ## Core Methods The SDK methods are organized into the following high-level categories: 1. `auth`: Handles authentication methods. 2. `secrets`: Manages CRUD operations for secrets. +3. `kms`: Perform cryptographic operations with Infisical KMS. ### `auth` diff --git a/example.py b/example.py deleted file mode 100644 index ac820b8..0000000 --- a/example.py +++ /dev/null @@ -1,48 +0,0 @@ -from infisical_sdk import InfisicalSDKClient - -sdkInstance = InfisicalSDKClient(host="https://app.infisical.com") - -sdkInstance.auth.universal_auth.login("<>", "<>") - -# new_secret = sdkInstance.secrets.create_secret_by_name( -# secret_name="NEW_SECRET", -# project_id="d7b2b891-2c07-4bc8-bb3f-d29ca4c7187b", -# secret_path="/", -# environment_slug="dev", -# secret_value="secret_value", -# secret_comment="Optional comment", -# skip_multiline_encoding=False, -# secret_reminder_repeat_days=30, # Optional -# secret_reminder_note="Remember to update this secret" # Optional -# ) - -# updated_secret = sdkInstance.secrets.update_secret_by_name( -# current_secret_name="NEW_SECRET", -# project_id="d7b2b891-2c07-4bc8-bb3f-d29ca4c7187b", -# secret_path="/", -# environment_slug="dev", -# secret_value="new_secret_value", -# secret_comment="Updated comment", # Optional -# skip_multiline_encoding=False, -# secret_reminder_repeat_days=10, # Optional -# secret_reminder_note="Updated reminder note", # Optional -# new_secret_name="NEW_NAME_2" # Optional -# ) - - -# secret = sdkInstance.secrets.get_secret_by_name( -# secret_name="NEW_NAME_2", -# project_id="d7b2b891-2c07-4bc8-bb3f-d29ca4c7187b", -# environment_slug="dev", -# secret_path="/", -# expand_secret_references=True, -# include_imports=True, -# version=None # Optional -# ) - -# deleted_secret = sdkInstance.secrets.delete_secret_by_name( -# secret_name="NEW_NAME_2", -# project_id="d7b2b891-2c07-4bc8-bb3f-d29ca4c7187b", -# environment_slug="dev", -# secret_path="/" -# ) diff --git a/infisical_sdk/__init__.py b/infisical_sdk/__init__.py index 508a560..6e7e124 100644 --- a/infisical_sdk/__init__.py +++ b/infisical_sdk/__init__.py @@ -1,3 +1,3 @@ from .client import InfisicalSDKClient # noqa from .infisical_requests import InfisicalError # noqa -from .api_types import SingleSecretResponse, ListSecretsResponse, BaseSecret # noqa \ No newline at end of file +from .api_types import SingleSecretResponse, ListSecretsResponse, BaseSecret, SymmetricEncryption # noqa \ No newline at end of file diff --git a/infisical_sdk/client.py b/infisical_sdk/client.py index 1cdac38..9913f12 100644 --- a/infisical_sdk/client.py +++ b/infisical_sdk/client.py @@ -1,34 +1,29 @@ -import base64 -import json -from typing import List, Union -import os -import datetime -from typing import Dict, Any - -import requests -import boto3 -from botocore.auth import SigV4Auth -from botocore.awsrequest import AWSRequest -from botocore.exceptions import NoCredentialsError - from .infisical_requests import InfisicalRequests -from .api_types import ListSecretsResponse, SingleSecretResponse, BaseSecret -from .api_types import SymmetricEncryption, KmsKeysOrderBy, OrderDirection -from .api_types import ListKmsKeysResponse, SingleKmsKeyResponse, MachineIdentityLoginResponse -from .api_types import KmsKey, KmsKeyEncryptDataResponse, KmsKeyDecryptDataResponse +from infisical_sdk.resources import Auth +from infisical_sdk.resources import V3RawSecrets +from infisical_sdk.resources import KMS +from infisical_sdk.util import SecretsCache class InfisicalSDKClient: - def __init__(self, host: str, token: str = None): + def __init__(self, host: str, token: str = None, cache_ttl: int = 60): + """ + Initialize the Infisical SDK client. + + :param str host: The host URL for your Infisical instance. Will default to `https://app.infisical.com` if not specified. + :param str token: The authentication token for the client. If not specified, you can use the `auth` methods to authenticate. + :param int cache_ttl: The time to live for the secrets cache. This is the number of seconds that secrets fetched from the API will be cached for. Set to `None` to disable caching. Defaults to `60` seconds. + """ + self.host = host self.access_token = token self.api = InfisicalRequests(host=host, token=token) - - self.auth = Auth(self) - self.secrets = V3RawSecrets(self) - self.kms = KMS(self) + self.cache = SecretsCache(cache_ttl) + self.auth = Auth(self.api, self.set_token) + self.secrets = V3RawSecrets(self.api, self.cache) + self.kms = KMS(self.api) def set_token(self, token: str): """ @@ -43,483 +38,3 @@ def get_token(self): """ return self.access_token - -class UniversalAuth: - def __init__(self, client: InfisicalSDKClient): - self.client = client - - def login(self, client_id: str, client_secret: str) -> MachineIdentityLoginResponse: - """ - Login with Universal Auth. - - Args: - client_id (str): Your Machine Identity Client ID. - client_secret (str): Your Machine Identity Client Secret. - - Returns: - Dict: A dictionary containing the access token and related information. - """ - - requestBody = { - "clientId": client_id, - "clientSecret": client_secret - } - - result = self.client.api.post( - path="/api/v1/auth/universal-auth/login", - json=requestBody, - model=MachineIdentityLoginResponse - ) - - self.client.set_token(result.data.accessToken) - - return result.data - - -class AWSAuth: - def __init__(self, client: InfisicalSDKClient) -> None: - self.client = client - - def login(self, identity_id: str) -> MachineIdentityLoginResponse: - """ - Login with AWS Authentication. - - Args: - identity_id (str): Your Machine Identity ID that has AWS Auth configured. - - Returns: - Dict: A dictionary containing the access token and related information. - """ - - identity_id = identity_id or os.getenv("INFISICAL_AWS_IAM_AUTH_IDENTITY_ID") - if not identity_id: - raise ValueError( - "Identity ID must be provided or set in the environment variable" + - "INFISICAL_AWS_IAM_AUTH_IDENTITY_ID." - ) - - aws_region = self.get_aws_region() - session = boto3.Session(region_name=aws_region) - - credentials = self._get_aws_credentials(session) - - iam_request_url = f"https://sts.{aws_region}.amazonaws.com/" - iam_request_body = "Action=GetCallerIdentity&Version=2011-06-15" - - request_headers = self._prepare_aws_request( - iam_request_url, - iam_request_body, - credentials, - aws_region - ) - - requestBody = { - "identityId": identity_id, - "iamRequestBody": base64.b64encode(iam_request_body.encode()).decode(), - "iamRequestHeaders": base64.b64encode(json.dumps(request_headers).encode()).decode(), - "iamHttpRequestMethod": "POST" - } - - result = self.client.api.post( - path="/api/v1/auth/aws-auth/login", - json=requestBody, - model=MachineIdentityLoginResponse - ) - - self.client.set_token(result.data.accessToken) - - return result.data - - def _get_aws_credentials(self, session: boto3.Session) -> Any: - try: - credentials = session.get_credentials() - if credentials is None: - raise NoCredentialsError("AWS credentials not found.") - return credentials.get_frozen_credentials() - except NoCredentialsError as e: - raise RuntimeError(f"AWS IAM Auth Login failed: {str(e)}") - - def _prepare_aws_request( - self, - url: str, - body: str, - credentials: Any, - region: str) -> Dict[str, str]: - - current_time = datetime.datetime.now(datetime.timezone.utc) - amz_date = current_time.strftime('%Y%m%dT%H%M%SZ') - - request = AWSRequest(method="POST", url=url, data=body) - request.headers["X-Amz-Date"] = amz_date - request.headers["Host"] = f"sts.{region}.amazonaws.com" - request.headers["Content-Type"] = "application/x-www-form-urlencoded; charset=utf-8" - request.headers["Content-Length"] = str(len(body)) - - signer = SigV4Auth(credentials, "sts", region) - signer.add_auth(request) - - return {k: v for k, v in request.headers.items() if k.lower() != "content-length"} - - @staticmethod - def get_aws_region() -> str: - region = os.getenv("AWS_REGION") # Typically found in lambda runtime environment - if region: - return region - - try: - return AWSAuth._get_aws_ec2_identity_document_region() - except Exception as e: - raise Exception("Failed to retrieve AWS region") from e - - @staticmethod - def _get_aws_ec2_identity_document_region(timeout: int = 5000) -> str: - session = requests.Session() - token_response = session.put( - "http://169.254.169.254/latest/api/token", - headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, - timeout=timeout / 1000 - ) - token_response.raise_for_status() - metadata_token = token_response.text - - identity_response = session.get( - "http://169.254.169.254/latest/dynamic/instance-identity/document", - headers={"X-aws-ec2-metadata-token": metadata_token, "Accept": "application/json"}, - timeout=timeout / 1000 - ) - - identity_response.raise_for_status() - return identity_response.json().get("region") - - -class Auth: - def __init__(self, client): - self.client = client - self.aws_auth = AWSAuth(client) - self.universal_auth = UniversalAuth(client) - - -class V3RawSecrets: - def __init__(self, client: InfisicalSDKClient) -> None: - self.client = client - - def list_secrets( - self, - project_id: str, - environment_slug: str, - secret_path: str, - expand_secret_references: bool = True, - view_secret_value: bool = True, - recursive: bool = False, - include_imports: bool = True, - tag_filters: List[str] = []) -> ListSecretsResponse: - - params = { - "workspaceId": project_id, - "environment": environment_slug, - "secretPath": secret_path, - "viewSecretValue": str(view_secret_value).lower(), - "expandSecretReferences": str(expand_secret_references).lower(), - "recursive": str(recursive).lower(), - "include_imports": str(include_imports).lower(), - } - - if tag_filters: - params["tagSlugs"] = ",".join(tag_filters) - - result = self.client.api.get( - path="/api/v3/secrets/raw", - params=params, - model=ListSecretsResponse - ) - - return result.data - - def get_secret_by_name( - self, - secret_name: str, - project_id: str, - environment_slug: str, - secret_path: str, - expand_secret_references: bool = True, - include_imports: bool = True, - view_secret_value: bool = True, - version: str = None) -> BaseSecret: - - params = { - "workspaceId": project_id, - "viewSecretValue": str(view_secret_value).lower(), - "environment": environment_slug, - "secretPath": secret_path, - "expandSecretReferences": str(expand_secret_references).lower(), - "include_imports": str(include_imports).lower(), - "version": version - } - - result = self.client.api.get( - path=f"/api/v3/secrets/raw/{secret_name}", - params=params, - model=SingleSecretResponse - ) - - return result.data.secret - - def create_secret_by_name( - self, - secret_name: str, - project_id: str, - secret_path: str, - environment_slug: str, - secret_value: str = None, - secret_comment: str = None, - skip_multiline_encoding: bool = False, - secret_reminder_repeat_days: Union[float, int] = None, - secret_reminder_note: str = None) -> BaseSecret: - - requestBody = { - "workspaceId": project_id, - "environment": environment_slug, - "secretPath": secret_path, - "secretValue": secret_value, - "secretComment": secret_comment, - "tagIds": None, - "skipMultilineEncoding": skip_multiline_encoding, - "type": "shared", - "secretReminderRepeatDays": secret_reminder_repeat_days, - "secretReminderNote": secret_reminder_note - } - result = self.client.api.post( - path=f"/api/v3/secrets/raw/{secret_name}", - json=requestBody, - model=SingleSecretResponse - ) - - return result.data.secret - - def update_secret_by_name( - self, - current_secret_name: str, - project_id: str, - secret_path: str, - environment_slug: str, - secret_value: str = None, - secret_comment: str = None, - skip_multiline_encoding: bool = False, - secret_reminder_repeat_days: Union[float, int] = None, - secret_reminder_note: str = None, - new_secret_name: str = None) -> BaseSecret: - - requestBody = { - "workspaceId": project_id, - "environment": environment_slug, - "secretPath": secret_path, - "secretValue": secret_value, - "secretComment": secret_comment, - "newSecretName": new_secret_name, - "tagIds": None, - "skipMultilineEncoding": skip_multiline_encoding, - "type": "shared", - "secretReminderRepeatDays": secret_reminder_repeat_days, - "secretReminderNote": secret_reminder_note - } - - result = self.client.api.patch( - path=f"/api/v3/secrets/raw/{current_secret_name}", - json=requestBody, - model=SingleSecretResponse - ) - return result.data.secret - - def delete_secret_by_name( - self, - secret_name: str, - project_id: str, - secret_path: str, - environment_slug: str) -> BaseSecret: - - requestBody = { - "workspaceId": project_id, - "environment": environment_slug, - "secretPath": secret_path, - "type": "shared", - } - - result = self.client.api.delete( - path=f"/api/v3/secrets/raw/{secret_name}", - json=requestBody, - model=SingleSecretResponse - ) - - return result.data.secret - - -class KMS: - def __init__(self, client: InfisicalSDKClient) -> None: - self.client = client - - def list_keys( - self, - project_id: str, - offset: int = 0, - limit: int = 100, - order_by: KmsKeysOrderBy = KmsKeysOrderBy.NAME, - order_direction: OrderDirection = OrderDirection.ASC, - search: str = None) -> ListKmsKeysResponse: - - params = { - "projectId": project_id, - "search": search, - "offset": offset, - "limit": limit, - "orderBy": order_by, - "orderDirection": order_direction, - } - - result = self.client.api.get( - path="/api/v1/kms/keys", - params=params, - model=ListKmsKeysResponse - ) - - return result.data - - def get_key_by_id( - self, - key_id: str) -> KmsKey: - - result = self.client.api.get( - path=f"/api/v1/kms/keys/{key_id}", - model=SingleKmsKeyResponse - ) - - return result.data.key - - def get_key_by_name( - self, - key_name: str, - project_id: str) -> KmsKey: - - params = { - "projectId": project_id, - } - - result = self.client.api.get( - path=f"/api/v1/kms/keys/key-name/{key_name}", - params=params, - model=SingleKmsKeyResponse - ) - - return result.data.key - - def create_key( - self, - name: str, - project_id: str, - encryption_algorithm: SymmetricEncryption, - description: str = None) -> KmsKey: - - request_body = { - "name": name, - "projectId": project_id, - "encryptionAlgorithm": encryption_algorithm, - "description": description, - } - - result = self.client.api.post( - path="/api/v1/kms/keys", - json=request_body, - model=SingleKmsKeyResponse - ) - - return result.data.key - - def update_key( - self, - key_id: str, - name: str = None, - is_disabled: bool = None, - description: str = None) -> KmsKey: - - request_body = { - "name": name, - "isDisabled": is_disabled, - "description": description, - } - - result = self.client.api.patch( - path=f"/api/v1/kms/keys/{key_id}", - json=request_body, - model=SingleKmsKeyResponse - ) - - return result.data.key - - def delete_key( - self, - key_id: str) -> KmsKey: - - result = self.client.api.delete( - path=f"/api/v1/kms/keys/{key_id}", - json={}, - model=SingleKmsKeyResponse - ) - - return result.data.key - - def encrypt_data( - self, - key_id: str, - base64EncodedPlaintext: str) -> str: - """ - Encrypt data with the specified KMS key. - - :param key_id: The ID of the key to decrypt the ciphertext with - :type key_id: str - :param base64EncodedPlaintext: The base64 encoded plaintext to encrypt - :type plaintext: str - - - :return: The encrypted base64 encoded plaintext (ciphertext) - :rtype: str - """ - - request_body = { - "plaintext": base64EncodedPlaintext - } - - result = self.client.api.post( - path=f"/api/v1/kms/keys/{key_id}/encrypt", - json=request_body, - model=KmsKeyEncryptDataResponse - ) - - return result.data.ciphertext - - def decrypt_data( - self, - key_id: str, - ciphertext: str) -> str: - """ - Decrypt data with the specified KMS key. - - :param key_id: The ID of the key to decrypt the ciphertext with - :type key_id: str - :param ciphertext: The encrypted base64 plaintext to decrypt - :type ciphertext: str - - - :return: The base64 encoded plaintext - :rtype: str - """ - - request_body = { - "ciphertext": ciphertext - } - - result = self.client.api.post( - path=f"/api/v1/kms/keys/{key_id}/decrypt", - json=request_body, - model=KmsKeyDecryptDataResponse - ) - - return result.data.plaintext diff --git a/infisical_sdk/resources/__init__.py b/infisical_sdk/resources/__init__.py new file mode 100644 index 0000000..ee1bcb2 --- /dev/null +++ b/infisical_sdk/resources/__init__.py @@ -0,0 +1,3 @@ +from .secrets import V3RawSecrets +from .kms import KMS +from .auth import Auth \ No newline at end of file diff --git a/infisical_sdk/resources/auth.py b/infisical_sdk/resources/auth.py new file mode 100644 index 0000000..1117bc5 --- /dev/null +++ b/infisical_sdk/resources/auth.py @@ -0,0 +1,10 @@ +from infisical_sdk.infisical_requests import InfisicalRequests +from infisical_sdk.resources.auth_methods import AWSAuth +from infisical_sdk.resources.auth_methods import UniversalAuth + +from typing import Callable +class Auth: + def __init__(self, requests: InfisicalRequests, setToken: Callable[[str], None]): + self.requests = requests + self.aws_auth = AWSAuth(requests, setToken) + self.universal_auth = UniversalAuth(requests, setToken) \ No newline at end of file diff --git a/infisical_sdk/resources/auth_methods/__init__.py b/infisical_sdk/resources/auth_methods/__init__.py new file mode 100644 index 0000000..151ae79 --- /dev/null +++ b/infisical_sdk/resources/auth_methods/__init__.py @@ -0,0 +1,2 @@ +from .aws_auth import AWSAuth +from .universal_auth import UniversalAuth diff --git a/infisical_sdk/resources/auth_methods/aws_auth.py b/infisical_sdk/resources/auth_methods/aws_auth.py new file mode 100644 index 0000000..77e0d66 --- /dev/null +++ b/infisical_sdk/resources/auth_methods/aws_auth.py @@ -0,0 +1,134 @@ +from botocore.auth import SigV4Auth +from botocore.awsrequest import AWSRequest +from botocore.exceptions import NoCredentialsError + +from infisical_sdk.infisical_requests import InfisicalRequests +from infisical_sdk.api_types import MachineIdentityLoginResponse + +from typing import Callable + +import requests +import boto3 +import base64 +import json +import os +import datetime + +from typing import Dict, Any + + +class AWSAuth: + def __init__(self, requests: InfisicalRequests, setToken: Callable[[str], None]) -> None: + self.requests = requests + self.setToken = setToken + + def login(self, identity_id: str) -> MachineIdentityLoginResponse: + """ + Login with AWS Authentication. + + Args: + identity_id (str): Your Machine Identity ID that has AWS Auth configured. + + Returns: + Dict: A dictionary containing the access token and related information. + """ + + identity_id = identity_id or os.getenv("INFISICAL_AWS_IAM_AUTH_IDENTITY_ID") + if not identity_id: + raise ValueError( + "Identity ID must be provided or set in the environment variable" + + "INFISICAL_AWS_IAM_AUTH_IDENTITY_ID." + ) + + aws_region = self.get_aws_region() + session = boto3.Session(region_name=aws_region) + + credentials = self._get_aws_credentials(session) + + iam_request_url = f"https://sts.{aws_region}.amazonaws.com/" + iam_request_body = "Action=GetCallerIdentity&Version=2011-06-15" + + request_headers = self._prepare_aws_request( + iam_request_url, + iam_request_body, + credentials, + aws_region + ) + + requestBody = { + "identityId": identity_id, + "iamRequestBody": base64.b64encode(iam_request_body.encode()).decode(), + "iamRequestHeaders": base64.b64encode(json.dumps(request_headers).encode()).decode(), + "iamHttpRequestMethod": "POST" + } + + result = self.requests.post( + path="/api/v1/auth/aws-auth/login", + json=requestBody, + model=MachineIdentityLoginResponse + ) + + self.setToken(result.data.accessToken) + + return result.data + + def _get_aws_credentials(self, session: boto3.Session) -> Any: + try: + credentials = session.get_credentials() + if credentials is None: + raise NoCredentialsError("AWS credentials not found.") + return credentials.get_frozen_credentials() + except NoCredentialsError as e: + raise RuntimeError(f"AWS IAM Auth Login failed: {str(e)}") + + def _prepare_aws_request( + self, + url: str, + body: str, + credentials: Any, + region: str) -> Dict[str, str]: + + current_time = datetime.datetime.now(datetime.timezone.utc) + amz_date = current_time.strftime('%Y%m%dT%H%M%SZ') + + request = AWSRequest(method="POST", url=url, data=body) + request.headers["X-Amz-Date"] = amz_date + request.headers["Host"] = f"sts.{region}.amazonaws.com" + request.headers["Content-Type"] = "application/x-www-form-urlencoded; charset=utf-8" + request.headers["Content-Length"] = str(len(body)) + + signer = SigV4Auth(credentials, "sts", region) + signer.add_auth(request) + + return {k: v for k, v in request.headers.items() if k.lower() != "content-length"} + + @staticmethod + def get_aws_region() -> str: + region = os.getenv("AWS_REGION") # Typically found in lambda runtime environment + if region: + return region + + try: + return AWSAuth._get_aws_ec2_identity_document_region() + except Exception as e: + raise Exception("Failed to retrieve AWS region") from e + + @staticmethod + def _get_aws_ec2_identity_document_region(timeout: int = 5000) -> str: + session = requests.Session() + token_response = session.put( + "http://169.254.169.254/latest/api/token", + headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, + timeout=timeout / 1000 + ) + token_response.raise_for_status() + metadata_token = token_response.text + + identity_response = session.get( + "http://169.254.169.254/latest/dynamic/instance-identity/document", + headers={"X-aws-ec2-metadata-token": metadata_token, "Accept": "application/json"}, + timeout=timeout / 1000 + ) + + identity_response.raise_for_status() + return identity_response.json().get("region") \ No newline at end of file diff --git a/infisical_sdk/resources/auth_methods/universal_auth.py b/infisical_sdk/resources/auth_methods/universal_auth.py new file mode 100644 index 0000000..1aae7a8 --- /dev/null +++ b/infisical_sdk/resources/auth_methods/universal_auth.py @@ -0,0 +1,35 @@ +from infisical_sdk.api_types import MachineIdentityLoginResponse + +from typing import Callable +from infisical_sdk.infisical_requests import InfisicalRequests +class UniversalAuth: + def __init__(self, requests: InfisicalRequests, setToken: Callable[[str], None]): + self.requests = requests + self.setToken = setToken + + def login(self, client_id: str, client_secret: str) -> MachineIdentityLoginResponse: + """ + Login with Universal Auth. + + Args: + client_id (str): Your Machine Identity Client ID. + client_secret (str): Your Machine Identity Client Secret. + + Returns: + Dict: A dictionary containing the access token and related information. + """ + + requestBody = { + "clientId": client_id, + "clientSecret": client_secret + } + + result = self.requests.post( + path="/api/v1/auth/universal-auth/login", + json=requestBody, + model=MachineIdentityLoginResponse + ) + + self.setToken(result.data.accessToken) + + return result.data \ No newline at end of file diff --git a/infisical_sdk/resources/kms.py b/infisical_sdk/resources/kms.py new file mode 100644 index 0000000..901df3a --- /dev/null +++ b/infisical_sdk/resources/kms.py @@ -0,0 +1,177 @@ +from infisical_sdk.api_types import SymmetricEncryption, KmsKeysOrderBy, OrderDirection +from infisical_sdk.api_types import ListKmsKeysResponse, SingleKmsKeyResponse +from infisical_sdk.api_types import KmsKey, KmsKeyEncryptDataResponse, KmsKeyDecryptDataResponse + +from infisical_sdk.infisical_requests import InfisicalRequests + + +class KMS: + def __init__(self, requests: InfisicalRequests) -> None: + self.requests = requests + + def list_keys( + self, + project_id: str, + offset: int = 0, + limit: int = 100, + order_by: KmsKeysOrderBy = KmsKeysOrderBy.NAME, + order_direction: OrderDirection = OrderDirection.ASC, + search: str = None) -> ListKmsKeysResponse: + + params = { + "projectId": project_id, + "search": search, + "offset": offset, + "limit": limit, + "orderBy": order_by, + "orderDirection": order_direction, + } + + result = self.requests.get( + path="/api/v1/kms/keys", + params=params, + model=ListKmsKeysResponse + ) + + return result.data + + def get_key_by_id( + self, + key_id: str) -> KmsKey: + + result = self.requests.get( + path=f"/api/v1/kms/keys/{key_id}", + model=SingleKmsKeyResponse + ) + + return result.data.key + + def get_key_by_name( + self, + key_name: str, + project_id: str) -> KmsKey: + + params = { + "projectId": project_id, + } + + result = self.requests.get( + path=f"/api/v1/kms/keys/key-name/{key_name}", + params=params, + model=SingleKmsKeyResponse + ) + + return result.data.key + + def create_key( + self, + name: str, + project_id: str, + encryption_algorithm: SymmetricEncryption, + description: str = None) -> KmsKey: + + request_body = { + "name": name, + "projectId": project_id, + "encryptionAlgorithm": encryption_algorithm, + "description": description, + } + + result = self.requests.post( + path="/api/v1/kms/keys", + json=request_body, + model=SingleKmsKeyResponse + ) + + return result.data.key + + def update_key( + self, + key_id: str, + name: str = None, + is_disabled: bool = None, + description: str = None) -> KmsKey: + + request_body = { + "name": name, + "isDisabled": is_disabled, + "description": description, + } + + result = self.requests.patch( + path=f"/api/v1/kms/keys/{key_id}", + json=request_body, + model=SingleKmsKeyResponse + ) + + return result.data.key + + def delete_key( + self, + key_id: str) -> KmsKey: + + result = self.requests.delete( + path=f"/api/v1/kms/keys/{key_id}", + json={}, + model=SingleKmsKeyResponse + ) + + return result.data.key + + def encrypt_data( + self, + key_id: str, + base64EncodedPlaintext: str) -> str: + """ + Encrypt data with the specified KMS key. + + :param key_id: The ID of the key to decrypt the ciphertext with + :type key_id: str + :param base64EncodedPlaintext: The base64 encoded plaintext to encrypt + :type plaintext: str + + + :return: The encrypted base64 encoded plaintext (ciphertext) + :rtype: str + """ + + request_body = { + "plaintext": base64EncodedPlaintext + } + + result = self.requests.post( + path=f"/api/v1/kms/keys/{key_id}/encrypt", + json=request_body, + model=KmsKeyEncryptDataResponse + ) + + return result.data.ciphertext + + def decrypt_data( + self, + key_id: str, + ciphertext: str) -> str: + """ + Decrypt data with the specified KMS key. + + :param key_id: The ID of the key to decrypt the ciphertext with + :type key_id: str + :param ciphertext: The encrypted base64 plaintext to decrypt + :type ciphertext: str + + + :return: The base64 encoded plaintext + :rtype: str + """ + + request_body = { + "ciphertext": ciphertext + } + + result = self.requests.post( + path=f"/api/v1/kms/keys/{key_id}/decrypt", + json=request_body, + model=KmsKeyDecryptDataResponse + ) + + return result.data.plaintext diff --git a/infisical_sdk/resources/secrets.py b/infisical_sdk/resources/secrets.py new file mode 100644 index 0000000..d5783bc --- /dev/null +++ b/infisical_sdk/resources/secrets.py @@ -0,0 +1,235 @@ +from typing import List, Union + +from infisical_sdk.infisical_requests import InfisicalRequests +from infisical_sdk.api_types import ListSecretsResponse, SingleSecretResponse, BaseSecret +from infisical_sdk.util import SecretsCache + +CACHE_KEY_LIST_SECRETS = "cache-list-secrets" +CACHE_KEY_SINGLE_SECRET = "cache-single-secret" + +class V3RawSecrets: + def __init__(self, requests: InfisicalRequests, cache: SecretsCache) -> None: + self.requests = requests + self.cache = cache + + def list_secrets( + self, + project_id: str, + environment_slug: str, + secret_path: str, + expand_secret_references: bool = True, + view_secret_value: bool = True, + recursive: bool = False, + include_imports: bool = True, + tag_filters: List[str] = []) -> ListSecretsResponse: + + params = { + "workspaceId": project_id, + "environment": environment_slug, + "secretPath": secret_path, + "viewSecretValue": str(view_secret_value).lower(), + "expandSecretReferences": str(expand_secret_references).lower(), + "recursive": str(recursive).lower(), + "include_imports": str(include_imports).lower(), + } + + if tag_filters: + params["tagSlugs"] = ",".join(tag_filters) + + + cache_key = self.cache.compute_cache_key(CACHE_KEY_LIST_SECRETS, **params) + if self.cache.enabled: + cached_response = self.cache.get(cache_key) + + if cached_response is not None and isinstance(cached_response, ListSecretsResponse): + return cached_response + + result = self.requests.get( + path="/api/v3/secrets/raw", + params=params, + model=ListSecretsResponse + ) + + if self.cache.enabled: + self.cache.set(cache_key, result.data) + + return result.data + + def get_secret_by_name( + self, + secret_name: str, + project_id: str, + environment_slug: str, + secret_path: str, + expand_secret_references: bool = True, + include_imports: bool = True, + view_secret_value: bool = True, + version: str = None) -> BaseSecret: + + params = { + "workspaceId": project_id, + "viewSecretValue": str(view_secret_value).lower(), + "environment": environment_slug, + "secretPath": secret_path, + "expandSecretReferences": str(expand_secret_references).lower(), + "include_imports": str(include_imports).lower(), + "version": version + } + + cache_params = { + "project_id": project_id, + "environment_slug": environment_slug, + "secret_path": secret_path, + "secret_name": secret_name, + } + + cache_key = self.cache.compute_cache_key(CACHE_KEY_SINGLE_SECRET, **cache_params) + + if self.cache.enabled: + cached_response = self.cache.get(cache_key) + + if cached_response is not None and isinstance(cached_response, BaseSecret): + return cached_response + + result = self.requests.get( + path=f"/api/v3/secrets/raw/{secret_name}", + params=params, + model=SingleSecretResponse + ) + + if self.cache.enabled: + self.cache.set(cache_key, result.data.secret) + + return result.data.secret + + def create_secret_by_name( + self, + secret_name: str, + project_id: str, + secret_path: str, + environment_slug: str, + secret_value: str = None, + secret_comment: str = None, + skip_multiline_encoding: bool = False, + secret_reminder_repeat_days: Union[float, int] = None, + secret_reminder_note: str = None) -> BaseSecret: + + requestBody = { + "workspaceId": project_id, + "environment": environment_slug, + "secretPath": secret_path, + "secretValue": secret_value, + "secretComment": secret_comment, + "tagIds": None, + "skipMultilineEncoding": skip_multiline_encoding, + "type": "shared", + "secretReminderRepeatDays": secret_reminder_repeat_days, + "secretReminderNote": secret_reminder_note + } + result = self.requests.post( + path=f"/api/v3/secrets/raw/{secret_name}", + json=requestBody, + model=SingleSecretResponse + ) + + + if self.cache.enabled: + cache_params = { + "project_id": project_id, + "environment_slug": environment_slug, + "secret_path": secret_path, + "secret_name": secret_name, + } + + cache_key = self.cache.compute_cache_key(CACHE_KEY_SINGLE_SECRET, **cache_params) + self.cache.set(cache_key, result.data.secret) + + # Invalidates all list secret cache + self.cache.invalidate_operation(CACHE_KEY_LIST_SECRETS) + + return result.data.secret + + def update_secret_by_name( + self, + current_secret_name: str, + project_id: str, + secret_path: str, + environment_slug: str, + secret_value: str = None, + secret_comment: str = None, + skip_multiline_encoding: bool = False, + secret_reminder_repeat_days: Union[float, int] = None, + secret_reminder_note: str = None, + new_secret_name: str = None) -> BaseSecret: + + requestBody = { + "workspaceId": project_id, + "environment": environment_slug, + "secretPath": secret_path, + "secretValue": secret_value, + "secretComment": secret_comment, + "newSecretName": new_secret_name, + "tagIds": None, + "skipMultilineEncoding": skip_multiline_encoding, + "type": "shared", + "secretReminderRepeatDays": secret_reminder_repeat_days, + "secretReminderNote": secret_reminder_note + } + + result = self.requests.patch( + path=f"/api/v3/secrets/raw/{current_secret_name}", + json=requestBody, + model=SingleSecretResponse + ) + + if self.cache.enabled: + cache_params = { + "project_id": project_id, + "environment_slug": environment_slug, + "secret_path": secret_path, + "secret_name": current_secret_name, + } + + cache_key = self.cache.compute_cache_key(CACHE_KEY_SINGLE_SECRET, **cache_params) + self.cache.unset(cache_key) + + # Invalidates all list secret cache + self.cache.invalidate_operation(CACHE_KEY_LIST_SECRETS) + + return result.data.secret + + def delete_secret_by_name( + self, + secret_name: str, + project_id: str, + secret_path: str, + environment_slug: str) -> BaseSecret: + + requestBody = { + "workspaceId": project_id, + "environment": environment_slug, + "secretPath": secret_path, + "type": "shared", + } + + result = self.requests.delete( + path=f"/api/v3/secrets/raw/{secret_name}", + json=requestBody, + model=SingleSecretResponse + ) + + if self.cache.enabled: + cache_params = { + "project_id": project_id, + "environment_slug": environment_slug, + "secret_path": secret_path, + "secret_name": secret_name, + } + + cache_key = self.cache.compute_cache_key(CACHE_KEY_SINGLE_SECRET, **cache_params) + self.cache.unset(cache_key) + + # Invalidates all list secret cache + self.cache.invalidate_operation(CACHE_KEY_LIST_SECRETS) + + return result.data.secret \ No newline at end of file diff --git a/infisical_sdk/util/__init__.py b/infisical_sdk/util/__init__.py new file mode 100644 index 0000000..96114a6 --- /dev/null +++ b/infisical_sdk/util/__init__.py @@ -0,0 +1 @@ +from .secrets_cache import SecretsCache \ No newline at end of file diff --git a/infisical_sdk/util/secrets_cache.py b/infisical_sdk/util/secrets_cache.py new file mode 100644 index 0000000..87218c7 --- /dev/null +++ b/infisical_sdk/util/secrets_cache.py @@ -0,0 +1,106 @@ +from typing import Dict, Tuple, Any + +from infisical_sdk.api_types import BaseSecret +import json +import time +import threading +from hashlib import sha256 +import pickle + +MAX_CACHE_SIZE = 1000 + +class SecretsCache: + def __init__(self, ttl_seconds: int = 60) -> None: + if ttl_seconds is None or ttl_seconds <= 0: + self.enabled = False + return + + self.enabled = True + self.ttl = ttl_seconds + self.cleanup_interval = 60 + + self.cache: Dict[str, Tuple[bytes, float]] = {} + + self.lock = threading.RLock() + + self.stop_cleanup_thread = False + self.cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True) + self.cleanup_thread.start() + + def compute_cache_key(self, operation_name: str, **kwargs) -> str: + sorted_kwargs = sorted(kwargs.items()) + json_str = json.dumps(sorted_kwargs) + + return f"{operation_name}-{sha256(json_str.encode()).hexdigest()}" + + def get(self, cache_key: str) -> Any: + if not self.enabled: + return None + + with self.lock: + if cache_key in self.cache: + serialized_value, timestamp = self.cache[cache_key] + if time.time() - timestamp <= self.ttl: + return pickle.loads(serialized_value) + else: + self.cache.pop(cache_key, None) + return None + else: + return None + + + def set(self, cache_key: str, value: Any) -> None: + if not self.enabled: + return + + with self.lock: + serialized_value = pickle.dumps(value) + self.cache[cache_key] = (serialized_value, time.time()) + + if len(self.cache) > MAX_CACHE_SIZE: + oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1]) # oldest key based on timestamp + self.cache.pop(oldest_key) + + + + def unset(self, cache_key: str) -> None: + if not self.enabled: + return + + with self.lock: + self.cache.pop(cache_key, None) + + def invalidate_operation(self, operation_name: str) -> None: + if not self.enabled: + return + + with self.lock: + for key in list(self.cache.keys()): + if key.startswith(operation_name): + self.cache.pop(key, None) + + + def _cleanup_expired_items(self) -> None: + """Remove all expired items from the cache.""" + current_time = time.time() + with self.lock: + expired_keys = [ + key for key, (_, timestamp) in self.cache.items() + if current_time - timestamp > self.ttl + ] + for key in expired_keys: + self.cache.pop(key, None) + + def _cleanup_worker(self) -> None: + """Background worker that periodically cleans up expired items.""" + while not self.stop_cleanup_thread: + time.sleep(self.cleanup_interval) + self._cleanup_expired_items() + + def __del__(self) -> None: + """Ensure thread is properly stopped when the object is garbage collected.""" + self.stop_cleanup_thread = True + if self.enabled and self.cleanup_thread.is_alive(): + self.cleanup_thread.join(timeout=1.0) + + \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 3d094da..7af72a5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,3 @@ [flake8] max-line-length=99 -exclude = .venv,test.py \ No newline at end of file +exclude = .venv,test.py,sink \ No newline at end of file diff --git a/sink/.env.example b/sink/.env.example new file mode 100644 index 0000000..1c58a6d --- /dev/null +++ b/sink/.env.example @@ -0,0 +1,6 @@ +SECRETS_PROJECT_ID= +SECRETS_ENVIRONMENT_SLUG= +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID= +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET= +SITE_URL= +KMS_PROJECT_ID= \ No newline at end of file diff --git a/sink/README.md b/sink/README.md new file mode 100644 index 0000000..1bed823 --- /dev/null +++ b/sink/README.md @@ -0,0 +1,3 @@ +**Developer note:** + +To run the sink files, you must first run `pip install -e .` from inside the project root, to install the SDK in development mode. \ No newline at end of file diff --git a/sink/cache_deletion_test.py b/sink/cache_deletion_test.py new file mode 100644 index 0000000..f51dc4d --- /dev/null +++ b/sink/cache_deletion_test.py @@ -0,0 +1,78 @@ +from infisical_sdk import InfisicalSDKClient +import time +import os +import random +import string + + +def loadEnvVarsFromFileIntoEnv(): + d = dict() + with open("./.env", "r") as fp: + for line in fp: + line = line.strip() + if line and not line.startswith("#"): + line = line.split("=", 1) + d[line[0]] = line[1] + + for key, value in d.items(): + os.environ[key] = value + +loadEnvVarsFromFileIntoEnv() + +SECRETS_PROJECT_ID = os.getenv("SECRETS_PROJECT_ID") +SECRETS_ENVIRONMENT_SLUG = os.getenv("SECRETS_ENVIRONMENT_SLUG") + +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID") +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET") +SITE_URL = os.getenv("SITE_URL") + +cache_enabled_client = InfisicalSDKClient(host=SITE_URL, cache_ttl=10) +cache_enabled_client.auth.universal_auth.login(MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID, MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET) + + +time_start_cache_disabled = time.time() + +def randomStringNoSpecialChars(length: int = 10) -> str: + return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) + +created_sec = cache_enabled_client.secrets.create_secret_by_name( + secret_name=f"TEST_{randomStringNoSpecialChars()}", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + secret_value=f"secret_value_{randomStringNoSpecialChars()}", +) + + +single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name=created_sec.secretKey, + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) + +print(single_secret_cached) + + +deleted_secret = cache_enabled_client.secrets.delete_secret_by_name( + secret_name=created_sec.secretKey, + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", +) + +print(deleted_secret) + +# Should error +try: + single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name=created_sec.secretKey, + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) +except Exception as e: + print(e) + print("Good, we errored as expected!") diff --git a/sink/cache_expire_test.py b/sink/cache_expire_test.py new file mode 100644 index 0000000..4ad6fb4 --- /dev/null +++ b/sink/cache_expire_test.py @@ -0,0 +1,72 @@ +from infisical_sdk import InfisicalSDKClient + +import time +import os + +def loadEnvVarsFromFileIntoEnv(): + d = dict() + with open("./.env", "r") as fp: + for line in fp: + line = line.strip() + if line and not line.startswith("#"): + line = line.split("=", 1) + d[line[0]] = line[1] + + for key, value in d.items(): + os.environ[key] = value + +loadEnvVarsFromFileIntoEnv() + +SECRETS_PROJECT_ID = os.getenv("SECRETS_PROJECT_ID") +SECRETS_ENVIRONMENT_SLUG = os.getenv("SECRETS_ENVIRONMENT_SLUG") + +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID") +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET") +SITE_URL = os.getenv("SITE_URL") + +cache_enabled_client = InfisicalSDKClient(host=SITE_URL, cache_ttl=10) +cache_enabled_client.auth.universal_auth.login(MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID, MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET) + + + +single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name="TEST", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) + + +time_start_cache_enabled = time.time() +# Running in loop 10 times or the time is so small that python messes up the print (which is a great sign for us!) +for i in range(10): + single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name="TEST", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) +time_end_cache_enabled = time.time() +print(f"[CACHE ENABLED] Time taken: {time_end_cache_enabled - time_start_cache_enabled} seconds") + + +print("Sleeping for 10 seconds") +time.sleep(10) + + +print("Getting secret again") +time_start_cache_enabled = time.time() +single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name="TEST", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) +time_end_cache_enabled = time.time() +print(f"[CACHE EXPIRED] Time taken: {time_end_cache_enabled - time_start_cache_enabled} seconds") + + + diff --git a/sink/cache_test.py b/sink/cache_test.py new file mode 100644 index 0000000..46e9e7a --- /dev/null +++ b/sink/cache_test.py @@ -0,0 +1,91 @@ +from infisical_sdk import InfisicalSDKClient +import time +import os + + +def loadEnvVarsFromFileIntoEnv(): + d = dict() + with open("./.env", "r") as fp: + for line in fp: + line = line.strip() + if line and not line.startswith("#"): + line = line.split("=", 1) + d[line[0]] = line[1] + + for key, value in d.items(): + os.environ[key] = value + +loadEnvVarsFromFileIntoEnv() + +SECRETS_PROJECT_ID = os.getenv("SECRETS_PROJECT_ID") +SECRETS_ENVIRONMENT_SLUG = os.getenv("SECRETS_ENVIRONMENT_SLUG") + +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID") +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET") +SITE_URL = os.getenv("SITE_URL") + +cache_disabled_client = InfisicalSDKClient(host=SITE_URL, cache_ttl=None) +cache_disabled_client.auth.universal_auth.login(MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID, MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET) + +cache_enabled_client = InfisicalSDKClient(host=SITE_URL, cache_ttl=10) +cache_enabled_client.auth.universal_auth.login(MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID, MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET) + + +time_start_cache_disabled = time.time() + + +for i in range(100): + all_secrets = cache_disabled_client.secrets.list_secrets( + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True + ) + +time_end_cache_disabled = time.time() + +print(f"[CACHE DISABLED] Time taken: {time_end_cache_disabled - time_start_cache_disabled} seconds") + + +time_start_cache_enabled = time.time() + +for i in range(100): + all_secrets = cache_enabled_client.secrets.list_secrets( + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True + ) + +time_end_cache_enabled = time.time() + +print(f"[CACHE ENABLED] Time taken: {time_end_cache_enabled - time_start_cache_enabled} seconds") + + + +single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name="TEST", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) + +single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name="TEST", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=False, + include_imports=False) + + +single_secret_cached = cache_enabled_client.secrets.get_secret_by_name( + secret_name="TEST", + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True) \ No newline at end of file diff --git a/sink/example.py b/sink/example.py new file mode 100644 index 0000000..ae252e4 --- /dev/null +++ b/sink/example.py @@ -0,0 +1,167 @@ +from infisical_sdk import InfisicalSDKClient, SymmetricEncryption + +import random +import base64 +import os +import string + +def loadEnvVarsFromFileIntoEnv(): + d = dict() + with open("./.env", "r") as fp: + for line in fp: + line = line.strip() + if line and not line.startswith("#"): + line = line.split("=", 1) + d[line[0]] = line[1] + + for key, value in d.items(): + os.environ[key] = value + +loadEnvVarsFromFileIntoEnv() + +sdkInstance = InfisicalSDKClient(host=os.getenv("SITE_URL")) + + +SECRETS_PROJECT_ID = os.getenv("SECRETS_PROJECT_ID") +KMS_PROJECT_ID = os.getenv("KMS_PROJECT_ID") +SECRETS_ENVIRONMENT_SLUG = os.getenv("SECRETS_ENVIRONMENT_SLUG") + +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID") +MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET = os.getenv("MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET") +SITE_URL = os.getenv("SITE_URL") + + +sdkInstance.auth.universal_auth.login(MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_ID, MACHINE_IDENTITY_UNIVERSAL_AUTH_CLIENT_SECRET) + + +def random_string(length: int = 10) -> str: + # Use only lowercase letters, numbers, and hyphens + allowed_chars = string.ascii_lowercase + string.digits + '-' + return ''.join(random.choices(allowed_chars, k=length)) + + + + +################################################# SECRET TESTS ################################################# + +new_secret = sdkInstance.secrets.create_secret_by_name( + secret_name=f"TEST_{random_string()}", + project_id=SECRETS_PROJECT_ID, + secret_path="/", + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_value=f"secret_value_{random_string()}", + secret_comment=f"Optional comment_{random_string()}", + skip_multiline_encoding=False, + secret_reminder_repeat_days=30, # Optional + secret_reminder_note=f"Remember to update this secret_{random_string()}" # Optional +) + +print(f"Created secret: [key={new_secret.secretKey}] | [value={new_secret.secretValue}]") + +updated_secret = sdkInstance.secrets.update_secret_by_name( + current_secret_name=new_secret.secretKey, + project_id=SECRETS_PROJECT_ID, + secret_path="/", + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_value=f"new_secret_value_{random_string()}", + secret_comment=f"Updated comment_{random_string()}", # Optional + skip_multiline_encoding=False, + secret_reminder_repeat_days=10, # Optional + secret_reminder_note=f"Updated reminder note_{random_string()}", # Optional + new_secret_name=f"NEW_NAME_{random_string()}" # Optional +) + +print(f"Updated secret: [key={updated_secret.secretKey}] | [value={updated_secret.secretValue}]") +secret = sdkInstance.secrets.get_secret_by_name( + secret_name=updated_secret.secretKey, + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True, + version=None # Optional +) + +print(f"Retrieved secret: [key={secret.secretKey}] | [value={secret.secretValue}]") + + +all_secrets = sdkInstance.secrets.list_secrets( + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/", + expand_secret_references=True, + include_imports=True +) + + +all_secrets.secrets = [secret for secret in all_secrets.secrets if secret.secretKey != "TEST"] +if len(all_secrets.secrets) != 1: + raise Exception("Expected 1 secret, got {}".format(len(all_secrets.secrets))) + + +# Print all secret keys +for idx, secret in enumerate(all_secrets.secrets): + print(f"Listed secrets key {idx}: [{secret.secretKey}] | [value={secret.secretValue}]") + +deleted_secret = sdkInstance.secrets.delete_secret_by_name( + secret_name=updated_secret.secretKey, + project_id=SECRETS_PROJECT_ID, + environment_slug=SECRETS_ENVIRONMENT_SLUG, + secret_path="/" +) + +print(f"Deleted secret: [key={deleted_secret.secretKey}] | [value={deleted_secret.secretValue}]") + +################################################# KMS TESTS ################################################# + +kms_key = sdkInstance.kms.create_key( + name=f"test-key-{random_string()}", + project_id=KMS_PROJECT_ID, + encryption_algorithm=SymmetricEncryption.AES_GCM_256, + description=f"Optional description_{random_string()}" +) + +print(f"Created KMS key: [key={kms_key.id}] | [name={kms_key.name}]") + + +plantext = "Hello, world!" + +encrypted = sdkInstance.kms.encrypt_data( + key_id=kms_key.id, + base64EncodedPlaintext=base64.b64encode(plantext.encode()).decode() +) + +print(f"Encrypted: {encrypted}") + +decrypted = sdkInstance.kms.decrypt_data( + key_id=kms_key.id, + ciphertext=encrypted +) + +print(f"Decrypted: {base64.b64decode(decrypted.encode()).decode()}") + +key_by_id = sdkInstance.kms.get_key_by_id( + key_id=kms_key.id +) + +print(f"Key by ID: {key_by_id}") + +key_by_name = sdkInstance.kms.get_key_by_name( + key_name=kms_key.name, + project_id=KMS_PROJECT_ID +) + +print(f"Key by Name: {key_by_name}") + +list_keys = sdkInstance.kms.list_keys( + project_id=KMS_PROJECT_ID +) + +print(f"List keys: {list_keys}") + + +deleted_key = sdkInstance.kms.delete_key( + key_id=kms_key.id +) + +print(f"Deleted key: {deleted_key}") \ No newline at end of file