From ca719d4fc68867eaa075c78d5e3908eae5862d7b Mon Sep 17 00:00:00 2001 From: Douglas Coburn Date: Thu, 1 Jan 2026 12:59:31 -0800 Subject: [PATCH] feat: Add comprehensive SDK enhancements and new endpoint modules Major Features: - Add environment variable support for API token initialization (SOCKET_SECURITY_API_TOKEN, SOCKET_SECURITY_API_KEY, SOCKET_API_KEY, SOCKET_API_TOKEN) - Add 5 new endpoint modules: AlertFullScanSearch, Alerts, Fixes, SupportedFiles, Webhooks, Telemetry - Add archive upload functionality with support for .tar, .tar.gz, .tgz, and .zip files - Implement lazy file loading with FileDescriptorManager to prevent file descriptor exhaustion - Add full scan rescan and tar file download capabilities Enhancements: - Export: Add openvex_bom method for OpenVEX SBOM export - Historical: Add dependencies_trend method for historical dependency analysis - FullScans: Improve type safety with Optional types, fix integration_type handling, add archive/rescan/get_tar_files methods - Repos: Fix from_dict null handling for better robustness - Utils: Add create_tar_gz_from_files and prepare_archive_files_for_upload helper methods Bug Fixes: - Fix integration_type parameter handling to properly preserve None values - Fix dataclass from_dict methods to handle None data safely - Improve SocketArtifact deserialization with explicit field extraction - Add proper null checks in response parsing Developer Experience: - Make token parameter optional in socketdev constructor with automatic environment variable fallback - Add comprehensive error messages when token is missing - Improve file handling with lazy loading to support large workspaces --- socketdev/__init__.py | 31 ++++- socketdev/alertfullscansearch/__init__.py | 33 +++++ socketdev/alerts/__init__.py | 33 +++++ socketdev/export/__init__.py | 25 ++++ socketdev/fixes/__init__.py | 33 +++++ socketdev/fullscans/__init__.py | 162 ++++++++++++++++++++-- socketdev/historical/__init__.py | 19 +++ socketdev/repos/__init__.py | 3 +- socketdev/supportedfiles/__init__.py | 29 ++++ socketdev/telemetry/__init__.py | 53 +++++++ socketdev/utils/__init__.py | 102 +++++++++++++- socketdev/version.py | 2 +- socketdev/webhooks/__init__.py | 125 +++++++++++++++++ 13 files changed, 637 insertions(+), 13 deletions(-) create mode 100644 socketdev/alertfullscansearch/__init__.py create mode 100644 socketdev/alerts/__init__.py create mode 100644 socketdev/fixes/__init__.py create mode 100644 socketdev/supportedfiles/__init__.py create mode 100644 socketdev/telemetry/__init__.py create mode 100644 socketdev/webhooks/__init__.py diff --git a/socketdev/__init__.py b/socketdev/__init__.py index 0de575b..6326cb6 100644 --- a/socketdev/__init__.py +++ b/socketdev/__init__.py @@ -1,3 +1,4 @@ +import os from socketdev.core.api import API from socketdev.dependencies import Dependencies from socketdev.diffscans import DiffScans @@ -26,7 +27,14 @@ from socketdev.alerttypes import AlertTypes from socketdev.basics import Basics from socketdev.uploadmanifests import UploadManifests +from socketdev.alertfullscansearch import AlertFullScanSearch +from socketdev.alerts import Alerts +from socketdev.fixes import Fixes +from socketdev.supportedfiles import SupportedFiles +from socketdev.webhooks import Webhooks +from socketdev.telemetry import Telemetry from socketdev.log import log +from typing import Optional __author__ = "socket.dev" __version__ = __version__ @@ -44,7 +52,22 @@ class socketdev: - def __init__(self, token: str, timeout: int = 1200, allow_unverified: bool = False): + def __init__(self, token: Optional[str] = None, timeout: int = 1200, allow_unverified: bool = False): + # Try to get token from environment variables if not provided + if token is None: + token = ( + os.getenv("SOCKET_SECURITY_API_TOKEN") or + os.getenv("SOCKET_SECURITY_API_KEY") or + os.getenv("SOCKET_API_KEY") or + os.getenv("SOCKET_API_TOKEN") + ) + + if token is None: + raise ValueError( + "API token is required. Provide it as a parameter or set one of these environment variables: " + "SOCKET_SECURITY_API_TOKEN, SOCKET_SECURITY_API_KEY, SOCKET_API_KEY, SOCKET_API_TOKEN" + ) + self.api = API() self.token = token + ":" self.api.encode_key(self.token) @@ -77,6 +100,12 @@ def __init__(self, token: str, timeout: int = 1200, allow_unverified: bool = Fal self.alerttypes = AlertTypes(self.api) self.basics = Basics(self.api) self.uploadmanifests = UploadManifests(self.api) + self.alertfullscansearch = AlertFullScanSearch(self.api) + self.alerts = Alerts(self.api) + self.fixes = Fixes(self.api) + self.supportedfiles = SupportedFiles(self.api) + self.webhooks = Webhooks(self.api) + self.telemetry = Telemetry(self.api) @staticmethod def set_timeout(timeout: int): diff --git a/socketdev/alertfullscansearch/__init__.py b/socketdev/alertfullscansearch/__init__.py new file mode 100644 index 0000000..52a8a6c --- /dev/null +++ b/socketdev/alertfullscansearch/__init__.py @@ -0,0 +1,33 @@ +import logging +from urllib.parse import urlencode + +log = logging.getLogger("socketdev") + + +class AlertFullScanSearch: + def __init__(self, api): + self.api = api + + def search(self, org_slug: str, **query_params) -> dict: + """ + Search alerts across full scans. + + Args: + org_slug: Organization slug + **query_params: Optional query parameters for filtering + + Returns: + dict containing search results + """ + path = f"orgs/{org_slug}/alert-full-scan-search" + if query_params: + path += "?" + urlencode(query_params) + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error searching alerts: {response.status_code}") + log.error(response.text) + return {} diff --git a/socketdev/alerts/__init__.py b/socketdev/alerts/__init__.py new file mode 100644 index 0000000..6ce5682 --- /dev/null +++ b/socketdev/alerts/__init__.py @@ -0,0 +1,33 @@ +import logging +from urllib.parse import urlencode + +log = logging.getLogger("socketdev") + + +class Alerts: + def __init__(self, api): + self.api = api + + def get(self, org_slug: str, **query_params) -> dict: + """ + Get alerts for an organization. + + Args: + org_slug: Organization slug + **query_params: Optional query parameters for filtering + + Returns: + dict containing alerts data + """ + path = f"orgs/{org_slug}/alerts" + if query_params: + path += "?" + urlencode(query_params) + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error getting alerts: {response.status_code}") + log.error(response.text) + return {} diff --git a/socketdev/export/__init__.py b/socketdev/export/__init__.py index 8f27117..89f642e 100644 --- a/socketdev/export/__init__.py +++ b/socketdev/export/__init__.py @@ -73,3 +73,28 @@ def spdx_bom( log.error(f"Error exporting SPDX BOM: {response.status_code}") log.error(response.text) return {} + + def openvex_bom( + self, org_slug: str, id: str, query_params: Optional[ExportQueryParams] = None, use_types: bool = False + ) -> dict: + """ + Export a Socket SBOM as an OpenVEX SBOM + :param org_slug: String - The slug of the organization + :param id: String - The id of either a full scan or an sbom report + :param query_params: Optional[ExportQueryParams] - Query parameters for filtering + :param use_types: Optional[bool] - Whether to return typed responses + :return: dict + """ + path = f"orgs/{org_slug}/export/openvex/{id}" + if query_params: + path += query_params.to_query_params() + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + # TODO: Add typed response when types are defined + + log.error(f"Error exporting OpenVEX BOM: {response.status_code}") + log.error(response.text) + return {} + diff --git a/socketdev/fixes/__init__.py b/socketdev/fixes/__init__.py new file mode 100644 index 0000000..a03be01 --- /dev/null +++ b/socketdev/fixes/__init__.py @@ -0,0 +1,33 @@ +import logging +from urllib.parse import urlencode + +log = logging.getLogger("socketdev") + + +class Fixes: + def __init__(self, api): + self.api = api + + def get(self, org_slug: str, **query_params) -> dict: + """ + Get available fixes for an organization. + + Args: + org_slug: Organization slug + **query_params: Optional query parameters for filtering + + Returns: + dict containing available fixes + """ + path = f"orgs/{org_slug}/fixes" + if query_params: + path += "?" + urlencode(query_params) + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error getting fixes: {response.status_code}") + log.error(response.text) + return {} diff --git a/socketdev/fullscans/__init__.py b/socketdev/fullscans/__init__.py index d3e955b..0cc7023 100644 --- a/socketdev/fullscans/__init__.py +++ b/socketdev/fullscans/__init__.py @@ -117,7 +117,7 @@ def from_dict(cls, data: dict) -> "FullScanParams": commit_hash=data.get("commit_hash"), pull_request=data.get("pull_request"), committers=data.get("committers"), - integration_type=IntegrationType(integration_type) if integration_type else None, + integration_type=integration_type if integration_type is not None else None, integration_org_slug=data.get("integration_org_slug"), make_default_branch=data.get("make_default_branch"), set_as_pending_head=data.get("set_as_pending_head"), @@ -181,11 +181,12 @@ def to_dict(self): @classmethod def from_dict(cls, data: dict) -> "CreateFullScanResponse": + data_value = data.get("data") return cls( success=data["success"], status=data["status"], message=data.get("message"), - data=FullScanMetadata.from_dict(data.get("data")) if data.get("data") else None, + data=FullScanMetadata.from_dict(data_value) if data_value else None, ) @@ -204,11 +205,12 @@ def to_dict(self): @classmethod def from_dict(cls, data: dict) -> "GetFullScanMetadataResponse": + data_value = data.get("data") return cls( success=data["success"], status=data["status"], message=data.get("message"), - data=FullScanMetadata.from_dict(data.get("data")) if data.get("data") else None, + data=FullScanMetadata.from_dict(data_value) if data_value else None, ) @@ -619,11 +621,12 @@ def to_dict(self): @classmethod def from_dict(cls, data: dict) -> "StreamDiffResponse": + data_value = data.get("data") return cls( success=data["success"], status=data["status"], message=data.get("message"), - data=FullScanDiffReport.from_dict(data.get("data")) if data.get("data") else None, + data=FullScanDiffReport.from_dict(data_value) if data_value else None, ) @@ -631,7 +634,7 @@ def from_dict(cls, data: dict) -> "StreamDiffResponse": class SocketArtifact(SocketPURL, SocketArtifactLink): id: str alerts: List[SocketAlert] - score: SocketScore + score: Optional[SocketScore] = None author: Optional[List[str]] = field(default_factory=list) batchIndex: Optional[int] = None license: Optional[str] = None @@ -647,8 +650,25 @@ def to_dict(self): @classmethod def from_dict(cls, data: dict) -> "SocketArtifact": - purl_data = {k: data.get(k) for k in SocketPURL.__dataclass_fields__} - link_data = {k: data.get(k) for k in SocketArtifactLink.__dataclass_fields__} + # Extract PURL fields + purl_type = data.get("type") + purl_data = { + "type": SocketPURL_Type(purl_type) if purl_type else SocketPURL_Type.UNKNOWN, + "name": data.get("name"), + "namespace": data.get("namespace"), + "release": data.get("release"), + "subpath": data.get("subpath"), + "version": data.get("version"), + } + + # Extract Link fields + link_data = { + "topLevelAncestors": data.get("topLevelAncestors", []), + "direct": data.get("direct", False), + "artifact": data.get("artifact"), + "dependencies": data.get("dependencies"), + "manifestFiles": [SocketManifestReference.from_dict(m) for m in data["manifestFiles"]] if data.get("manifestFiles") else None, + } alerts = data.get("alerts") license_attrib = data.get("licenseAttrib") @@ -728,7 +748,17 @@ def get(self, org_slug: str, params: dict, use_types: bool = False) -> Union[dic ) return {} - def post(self, files: list, params: FullScanParams, use_types: bool = False, use_lazy_loading: bool = False, workspace: str = None, max_open_files: int = 100, base_path: str = None, base_paths: List[str] = None) -> Union[dict, CreateFullScanResponse]: + def post( + self, + files: list, + params: FullScanParams, + use_types: bool = False, + use_lazy_loading: bool = False, + workspace: Optional[str] = None, + max_open_files: int = 100, + base_path: Optional[str] = None, + base_paths: Optional[List[str]] = None + ) -> Union[dict, CreateFullScanResponse]: """ Create a new full scan by uploading manifest files. @@ -926,4 +956,118 @@ def finalize_tier1( if response.status_code in (200, 201, 204): return True - return False \ No newline at end of file + return False + + def archive(self, tar_files: Optional[Union[str, List[str]]] = None, files: Optional[List[str]] = None, workspace: Optional[str] = None, use_lazy_loading: bool = True, params: Optional[FullScanParams] = None) -> dict: + """ + Create a full scan by uploading one or more archives. + + Supported archive formats include .tar, .tar.gz/.tgz, and .zip. + + Args: + tar_files: Path(s) to archive file(s) to upload (.tar, .tar.gz, .tgz, or .zip) + Can be a single string or a list of strings + files: List of files to bundle into a .tar.gz and upload (alternative to tar_files) + workspace: Base directory path to make file paths relative to when creating tar.gz + use_lazy_loading: Whether to use lazy file loading (default: True) + params: FullScanParams object containing scan configuration (repo, org_slug, branch, + commit_message, commit_hash, pull_request, committers, integration_type, + integration_org_slug, make_default_branch, set_as_pending_head, tmp) + + Returns: + dict with the full scan creation response + + Raises: + ValueError: If neither tar_files nor files is provided, or if both are provided, + or if params is None + """ + if tar_files is None and files is None: + raise ValueError("Either tar_files or files must be provided") + + if tar_files is not None and files is not None: + raise ValueError("Cannot provide both tar_files and files - choose one") + + if params is None: + raise ValueError("params argument is required") + + Utils.validate_integration_type(params.integration_type if params.integration_type else "api") + org_slug = str(params.org_slug) + params_dict = params.to_dict() + params_dict.pop("org_slug") + params_arg = urllib.parse.urlencode(params_dict) + path = f"orgs/{org_slug}/full-scans/archive?" + str(params_arg) + + # Prepare files for upload + if tar_files: + # Archive file(s) - use lazy loading to prepare them + if use_lazy_loading: + upload_files = Utils.prepare_archive_files_for_upload(tar_files) + else: + # For backward compatibility, fall back to opening files directly + files_list = [tar_files] if isinstance(tar_files, str) else tar_files + upload_files = [] + for file_path in files_list: + filename = file_path.rsplit("/", 1)[-1] if "/" in file_path else file_path + with open(file_path, 'rb') as f: + upload_files.append(("file", (filename, f.read()))) + else: + # Multiple files - bundle into tar.gz + log.debug(f"Creating tar.gz archive from {len(files)} files") + tar_buffer = Utils.create_tar_gz_from_files(files, workspace) + + # Prepare the tar.gz for upload + archive_name = "archive.tar.gz" + upload_files = [("file", (archive_name, tar_buffer))] + + response = self.api.do_request(path=path, method="POST", files=upload_files) + + if response.status_code in (200, 201): + return response.json() + + error_message = response.json().get("error", {}).get("message", "Unknown error") + log.error(f"Error creating full scan from archive: {response.status_code}, message: {error_message}") + return {} + + def rescan(self, org_slug: str, full_scan_id: str) -> dict: + """ + Trigger a rescan of an existing full scan. + + Args: + org_slug: Organization slug + full_scan_id: The ID of the full scan to rescan + + Returns: + dict with the rescan response + """ + path = f"orgs/{org_slug}/full-scans/{full_scan_id}/rescan" + + response = self.api.do_request(path=path, method="POST", payload="{}") + + if response.status_code in (200, 201): + return response.json() + + error_message = response.json().get("error", {}).get("message", "Unknown error") + log.error(f"Error rescanning full scan: {response.status_code}, message: {error_message}") + return {} + + def get_tar_files(self, org_slug: str, full_scan_id: str) -> bytes: + """ + Download full scan files as a tar archive. + + Args: + org_slug: Organization slug + full_scan_id: The ID of the full scan + + Returns: + bytes containing the tar archive, or empty bytes on error + """ + path = f"orgs/{org_slug}/full-scans/{full_scan_id}/files/tar" + + response = self.api.do_request(path=path, method="GET") + + if response.status_code == 200: + return response.content + + error_message = response.json().get("error", {}).get("message", "Unknown error") if response.text else "Unknown error" + log.error(f"Error downloading tar files: {response.status_code}, message: {error_message}") + return b"" \ No newline at end of file diff --git a/socketdev/historical/__init__.py b/socketdev/historical/__init__.py index 565aff7..eac837d 100644 --- a/socketdev/historical/__init__.py +++ b/socketdev/historical/__init__.py @@ -47,6 +47,25 @@ def trend(self, org_slug: str, query_params: dict = None) -> dict: log.error(response.text) return {} + def dependencies_trend(self, org_slug: str, query_params: dict = None) -> dict: + """Get historical dependency trends data for an org. + + Args: + org_slug: Organization slug + query_params: Optional dictionary of query parameters + """ + path = f"orgs/{org_slug}/historical/dependencies/trend" + if query_params: + path += "?" + urlencode(query_params) + + response = self.api.do_request(path=path) + if response.status_code == 200: + return response.json() + + log.error(f"Error getting historical dependencies trend: {response.status_code}") + log.error(response.text) + return {} + class Snapshots: """Submodule for managing historical snapshots.""" diff --git a/socketdev/repos/__init__.py b/socketdev/repos/__init__.py index c4851c8..a4268db 100644 --- a/socketdev/repos/__init__.py +++ b/socketdev/repos/__init__.py @@ -58,11 +58,12 @@ def to_dict(self): @classmethod def from_dict(cls, data: dict) -> "GetRepoResponse": + data_field = data.get("data") return cls( success=data["success"], status=data["status"], message=data.get("message"), - data=RepositoryInfo.from_dict(data.get("data")) if data.get("data") else None, + data=RepositoryInfo.from_dict(data_field) if data_field is not None else None, ) diff --git a/socketdev/supportedfiles/__init__.py b/socketdev/supportedfiles/__init__.py new file mode 100644 index 0000000..4ecdd03 --- /dev/null +++ b/socketdev/supportedfiles/__init__.py @@ -0,0 +1,29 @@ +import logging + +log = logging.getLogger("socketdev") + + +class SupportedFiles: + def __init__(self, api): + self.api = api + + def get(self, org_slug: str) -> dict: + """ + Get list of supported manifest file types. + + Args: + org_slug: Organization slug + + Returns: + dict containing list of supported file types + """ + path = f"orgs/{org_slug}/supported-files" + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error getting supported files: {response.status_code}") + log.error(response.text) + return {} diff --git a/socketdev/telemetry/__init__.py b/socketdev/telemetry/__init__.py new file mode 100644 index 0000000..f182fb2 --- /dev/null +++ b/socketdev/telemetry/__init__.py @@ -0,0 +1,53 @@ +import logging +import json + +log = logging.getLogger("socketdev") + + +class Telemetry: + def __init__(self, api): + self.api = api + + def get_config(self, org_slug: str) -> dict: + """ + Get telemetry configuration. + + Args: + org_slug: Organization slug + + Returns: + dict containing telemetry configuration + """ + path = f"orgs/{org_slug}/telemetry/config" + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error getting telemetry config: {response.status_code}") + log.error(response.text) + return {} + + def update_config(self, org_slug: str, **kwargs) -> dict: + """ + Update telemetry configuration. + + Args: + org_slug: Organization slug + **kwargs: Configuration parameters to update + + Returns: + dict containing the updated telemetry configuration + """ + path = f"orgs/{org_slug}/telemetry/config" + payload = json.dumps(kwargs) if kwargs else "{}" + + response = self.api.do_request(path=path, method="PUT", payload=payload) + + if response.status_code == 200: + return response.json() + + log.error(f"Error updating telemetry config: {response.status_code}") + log.error(response.text) + return {} diff --git a/socketdev/utils/__init__.py b/socketdev/utils/__init__.py index 718eeee..74be447 100644 --- a/socketdev/utils/__init__.py +++ b/socketdev/utils/__init__.py @@ -1,8 +1,11 @@ -from typing import Literal, List, Tuple, Optional +from typing import Literal, List, Tuple, Optional, Union import logging import os import weakref from threading import Lock +import tarfile +import tempfile +import io log = logging.getLogger("socketdev") @@ -133,6 +136,7 @@ def _get_size(self): def read(self, size: int = -1): """Read from the file, opening it if needed.""" self._ensure_open() + assert self._file is not None data = self._file.read(size) self._position = self._file.tell() @@ -145,6 +149,7 @@ def read(self, size: int = -1): def readline(self, size: int = -1): """Read a line from the file.""" self._ensure_open() + assert self._file is not None data = self._file.readline(size) self._position = self._file.tell() return data @@ -162,6 +167,7 @@ def seek(self, offset: int, whence: int = 0): elif whence == 2: # SEEK_END # We need to open the file to get its size self._ensure_open() + assert self._file is not None result = self._file.seek(offset, whence) self._position = self._file.tell() return result @@ -340,3 +346,97 @@ def load_files_for_sending_lazy(files: List[str], workspace: Optional[str] = Non log.debug(f"Prepared {len(send_files)} files for lazy loading") return send_files + + @staticmethod + def create_tar_gz_from_files(files: List[str], workspace: Optional[str] = None) -> io.BytesIO: + """ + Create a tar.gz archive from a list of files. + + Args: + files: List of file paths to include in the archive + workspace: Base directory path to make paths relative to + + Returns: + io.BytesIO: In-memory tar.gz archive + """ + tar_buffer = io.BytesIO() + + # Normalize workspace path + if workspace and "\\" in workspace: + workspace = workspace.replace("\\", "/") + if workspace: + workspace = workspace.rstrip("/") + + with tarfile.open(fileobj=tar_buffer, mode='w:gz') as tar: + for file_path in files: + # Normalize file path + normalized_path = file_path.replace("\\", "/") if "\\" in file_path else file_path + + # Skip if file doesn't exist + if not os.path.exists(normalized_path): + log.warning(f"File not found, skipping: {normalized_path}") + continue + + # Skip directories + if os.path.isdir(normalized_path): + log.debug(f"Skipping directory: {normalized_path}") + continue + + # Calculate arcname (the name in the archive) + arcname = normalized_path + if workspace: + workspace_with_slash = workspace + "/" + if normalized_path.startswith(workspace_with_slash): + arcname = normalized_path[len(workspace_with_slash):] + elif normalized_path.startswith(workspace): + arcname = normalized_path[len(workspace):].lstrip("/") + + # Clean up relative path prefixes + while arcname.startswith("./"): + arcname = arcname[2:] + while arcname.startswith("../"): + arcname = arcname[3:] + arcname = arcname.lstrip("/") + + # Remove Windows drive letter if present + if len(arcname) > 2 and arcname[1] == ':' and (arcname[2] == '/' or arcname[2] == '\\'): + arcname = arcname[2:].lstrip("/") + + log.debug(f"Adding to archive: {normalized_path} as {arcname}") + tar.add(normalized_path, arcname=arcname) + + # Seek to beginning so it can be read + tar_buffer.seek(0) + log.debug(f"Created tar.gz archive with {len(files)} files") + return tar_buffer + + @staticmethod + def prepare_archive_files_for_upload(tar_files: Union[str, List[str]]) -> List[Tuple[str, Tuple[str, LazyFileLoader]]]: + """ + Prepare archive files for upload to the API. + + Args: + tar_files: Path or list of paths to archive files (.tar, .tar.gz, .tgz, .zip) + + Returns: + List of tuples formatted for requests multipart upload + """ + files_list = [tar_files] if isinstance(tar_files, str) else tar_files + prepared_files = [] + + for file_path in files_list: + # Normalize path + normalized_path = file_path.replace("\\", "/") if "\\" in file_path else file_path + + # Get filename + if "/" in normalized_path: + _, filename = normalized_path.rsplit("/", 1) + else: + filename = normalized_path + + # Create lazy file loader + lazy_file = LazyFileLoader(normalized_path, filename) + prepared_files.append(("file", (filename, lazy_file))) + + log.debug(f"Prepared {len(prepared_files)} archive files for upload") + return prepared_files diff --git a/socketdev/version.py b/socketdev/version.py index 3df1a32..fdaa99e 100644 --- a/socketdev/version.py +++ b/socketdev/version.py @@ -1 +1 @@ -__version__ = "3.0.22" +__version__ = "3.0.23" diff --git a/socketdev/webhooks/__init__.py b/socketdev/webhooks/__init__.py new file mode 100644 index 0000000..59e6864 --- /dev/null +++ b/socketdev/webhooks/__init__.py @@ -0,0 +1,125 @@ +import logging +import json +from urllib.parse import urlencode + +log = logging.getLogger("socketdev") + + +class Webhooks: + def __init__(self, api): + self.api = api + + def list(self, org_slug: str, **query_params) -> dict: + """ + List webhooks. + + Args: + org_slug: Organization slug + **query_params: Optional query parameters + + Returns: + dict containing list of webhooks + """ + path = f"orgs/{org_slug}/webhooks" + if query_params: + path += "?" + urlencode(query_params) + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error listing webhooks: {response.status_code}") + log.error(response.text) + return {} + + def create(self, org_slug: str, **kwargs) -> dict: + """ + Create a new webhook. + + Args: + org_slug: Organization slug + **kwargs: Webhook configuration parameters + + Returns: + dict containing the created webhook + """ + path = f"orgs/{org_slug}/webhooks" + payload = json.dumps(kwargs) if kwargs else "{}" + + response = self.api.do_request(path=path, method="POST", payload=payload) + + if response.status_code in (200, 201): + return response.json() + + log.error(f"Error creating webhook: {response.status_code}") + log.error(response.text) + return {} + + def get(self, org_slug: str, webhook_id: str) -> dict: + """ + Get a specific webhook. + + Args: + org_slug: Organization slug + webhook_id: Webhook ID + + Returns: + dict containing webhook details + """ + path = f"orgs/{org_slug}/webhooks/{webhook_id}" + + response = self.api.do_request(path=path) + + if response.status_code == 200: + return response.json() + + log.error(f"Error getting webhook: {response.status_code}") + log.error(response.text) + return {} + + def update(self, org_slug: str, webhook_id: str, **kwargs) -> dict: + """ + Update a webhook. + + Args: + org_slug: Organization slug + webhook_id: Webhook ID + **kwargs: Webhook configuration parameters to update + + Returns: + dict containing the updated webhook + """ + path = f"orgs/{org_slug}/webhooks/{webhook_id}" + payload = json.dumps(kwargs) if kwargs else "{}" + + response = self.api.do_request(path=path, method="PUT", payload=payload) + + if response.status_code == 200: + return response.json() + + log.error(f"Error updating webhook: {response.status_code}") + log.error(response.text) + return {} + + def delete(self, org_slug: str, webhook_id: str) -> dict: + """ + Delete a webhook. + + Args: + org_slug: Organization slug + webhook_id: Webhook ID + + Returns: + dict containing the deletion response + """ + path = f"orgs/{org_slug}/webhooks/{webhook_id}" + + response = self.api.do_request(path=path, method="DELETE") + + if response.status_code == 200: + return response.json() + + log.error(f"Error deleting webhook: {response.status_code}") + log.error(response.text) + return {}