Skip to content

Commit 2869b11

Browse files
committed
Added new endpoints
1 parent ca54c61 commit 2869b11

File tree

4 files changed

+164
-4
lines changed

4 files changed

+164
-4
lines changed

socketdev/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from socketdev.triage import Triage
1717
from socketdev.utils import Utils, IntegrationType, INTEGRATION_TYPES
1818
from socketdev.version import __version__
19+
from socketdev.labels import Labels
1920
from socketdev.log import log
2021

2122

@@ -57,6 +58,7 @@ def __init__(self, token: str, timeout: int = 1200):
5758
self.settings = Settings(self.api)
5859
self.triage = Triage(self.api)
5960
self.utils = Utils()
61+
self.labels = Labels(self.api)
6062

6163
@staticmethod
6264
def set_timeout(timeout: int):

socketdev/historical/__init__.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
class Historical:
88
def __init__(self, api):
99
self.api = api
10+
self.snapshots = self.Snapshots(api)
1011

1112
def list(self, org_slug: str, query_params: dict = None) -> dict:
1213
"""Get historical alerts list for an organization.
@@ -15,7 +16,7 @@ def list(self, org_slug: str, query_params: dict = None) -> dict:
1516
org_slug: Organization slug
1617
query_params: Optional dictionary of query parameters
1718
"""
18-
path = f"orgs/{org_slug}/alerts/historical"
19+
path = f"orgs/{org_slug}/historical/alerts"
1920
if query_params:
2021
path += "?" + urlencode(query_params)
2122

@@ -28,13 +29,13 @@ def list(self, org_slug: str, query_params: dict = None) -> dict:
2829
return {}
2930

3031
def trend(self, org_slug: str, query_params: dict = None) -> dict:
31-
"""Get historical alerts trend data for an organization.
32+
"""Get historical alert trends data for an org.
3233
3334
Args:
3435
org_slug: Organization slug
3536
query_params: Optional dictionary of query parameters
3637
"""
37-
path = f"orgs/{org_slug}/alerts/historical/trend"
38+
path = f"orgs/{org_slug}/historical/alerts/trend"
3839
if query_params:
3940
path += "?" + urlencode(query_params)
4041

@@ -45,3 +46,39 @@ def trend(self, org_slug: str, query_params: dict = None) -> dict:
4546
log.error(f"Error getting historical trend: {response.status_code}")
4647
log.error(response.text)
4748
return {}
49+
50+
class Snapshots:
51+
"""Submodule for managing historical snapshots."""
52+
53+
def __init__(self, api):
54+
self.api = api
55+
56+
def create(self, org_slug: str) -> dict:
57+
"""Create a new snapshot for an organization.
58+
59+
Args:
60+
org_slug: Organization slug
61+
data: Dictionary containing snapshot data
62+
"""
63+
path = f"orgs/{org_slug}/historical/snapshots"
64+
response = self.api.do_request(path=path, method="POST")
65+
if response.status_code == 200:
66+
return response.json()
67+
68+
log.error(f"Error creating snapshot: {response.status_code}")
69+
log.error(response.text)
70+
return {}
71+
72+
def list(self, org_slug: str, query_params: dict = None) -> dict:
73+
"""List historical snapshots for an organization."""
74+
path = f"orgs/{org_slug}/historical/snapshots"
75+
if query_params:
76+
path += "?" + urlencode(query_params)
77+
78+
response = self.api.do_request(path=path)
79+
if response.status_code == 200:
80+
return response.json()
81+
82+
log.error(f"Error listing snapshots: {response.status_code}")
83+
log.error(response.text)
84+
return {}

socketdev/labels/__init__.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import json
2+
import logging
3+
from typing import Any
4+
from urllib.parse import urlencode
5+
6+
7+
log = logging.getLogger("socketdev")
8+
9+
class Setting:
10+
def __init__(self, api):
11+
self.api = api
12+
13+
def create_url(self, org_slug: str, label_id: int):
14+
return "orgs/" + org_slug + f"/repos/labels/{label_id}/label-setting"
15+
16+
def get(self, org_slug: str, label_id: int, setting_key: str):
17+
url = self.create_url(org_slug, label_id)
18+
path = f"{url}?setting_key={setting_key}"
19+
response = self.api.do_request(path=path)
20+
if response.status_code == 201:
21+
return response.json()
22+
23+
error_message = response.json().get("error", {}).get("message", "Unknown error")
24+
log.error(f"Error getting label setting {setting_key} for {label_id}: {response.status_code}, message: {error_message}")
25+
return {}
26+
27+
def put(self, org_slug: str, label_id: int, settings: dict[str, dict[str, dict[str, str]]]):
28+
path = self.create_url(org_slug, label_id)
29+
response = self.api.do_request(method="PUT", path=path, payload=json.dumps(settings))
30+
31+
if response.status_code == 201:
32+
return response.json()
33+
34+
error_message = response.json().get("error", {}).get("message", "Unknown error")
35+
log.error(f"Error updating label settings for {label_id}: {response.status_code}, message: {error_message}")
36+
return {}
37+
38+
def delete(self, org_slug: str, label_id: int, settings_key: str):
39+
path = self.create_url(org_slug, label_id)
40+
path += "?setting_key=" + settings_key
41+
response = self.api.do_request(path=path, method="DELETE")
42+
43+
if response.status_code == 201:
44+
return response.json()
45+
46+
error_message = response.json().get("error", {}).get("message", "Unknown error")
47+
log.error(f"Error updating label settings for {label_id}: {response.status_code}, message: {error_message}")
48+
return {}
49+
50+
51+
class Labels:
52+
def __init__(self, api):
53+
self.api = api
54+
self.setting = Setting(api)
55+
56+
def list(self, org_slug: str):
57+
path = f"orgs/" + org_slug + f"/repos/labels"
58+
response = self.api.do_request(path=path)
59+
if response.status_code == 200:
60+
return response.json()
61+
62+
error_message = response.json().get("error", {}).get("message", "Unknown error")
63+
log.error(f"Error getting labels: {response.status_code}, message: {error_message}")
64+
return {}
65+
66+
def post(self, org_slug: str, label_name: str) -> dict:
67+
path = f"orgs/{org_slug}/repos/labels"
68+
payload = json.dumps({"name": label_name})
69+
response = self.api.do_request(path=path, method="POST", payload=payload)
70+
71+
if response.status_code == 201:
72+
result = response.json()
73+
return result
74+
75+
error_message = response.json().get("error", {}).get("message", "Unknown error")
76+
print(f"Failed to create repository label: {response.status_code}, message: {error_message}")
77+
return {}
78+
79+
def get(self, org_slug: str, label_id: str) -> dict:
80+
path = f"orgs/{org_slug}/repos/labels/{label_id}"
81+
response = self.api.do_request(path=path)
82+
if response.status_code == 200:
83+
result = response.json()
84+
return result
85+
86+
error_message = response.json().get("error", {}).get("message", "Unknown error")
87+
print(f"Failed to get repository label: {response.status_code}, message: {error_message}")
88+
return {}
89+
90+
def delete(self, org_slug: str, label_id: str) -> dict:
91+
path = f"orgs/{org_slug}/repos/labels/{label_id}"
92+
response = self.api.do_request(path=path, method="DELETE")
93+
if response.status_code == 200:
94+
return response.json()
95+
96+
error_message = response.json().get("error", {}).get("message", "Unknown error")
97+
log.error(f"Error deleting repository label: {response.status_code}, message: {error_message}")
98+
return {}
99+
100+
101+
def associate(self, org_slug: str, label_id: int, repo_id: str) -> dict:
102+
path = f"orgs/{org_slug}/repos/labels/{label_id}/associate"
103+
payload = json.dumps({"repository_id": repo_id})
104+
response = self.api.do_request(path=path, method="POST", payload=payload)
105+
if response.status_code == 200:
106+
return response.json()
107+
108+
error_message = response.json().get("error", {}).get("message", "Unknown error")
109+
log.error(f"Error associating repository label: {response.status_code}, message: {error_message}")
110+
return {}
111+
112+
def disassociate(self, org_slug: str, label_id: int, repo_id: str) -> dict:
113+
path = f"orgs/{org_slug}/repos/labels/{label_id}/disassociate"
114+
payload = json.dumps({"repository_id": repo_id})
115+
response = self.api.do_request(path=path, method="POST", payload=payload)
116+
if response.status_code == 200:
117+
return response.json()
118+
119+
error_message = response.json().get("error", {}).get("message", "Unknown error")
120+
log.error(f"Error associating repository label: {response.status_code}, message: {error_message}")
121+
return {}

socketdev/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "2.0.21"
1+
__version__ = "2.1.0"

0 commit comments

Comments
 (0)