7171from pycti .utils .opencti_stix2_utils import OpenCTIStix2Utils
7272
7373
74+ def build_request_headers (token : str , custom_headers : str , app_logger ):
75+ headers_dict = {
76+ "User-Agent" : "pycti/" + __version__ ,
77+ "Authorization" : "Bearer " + token ,
78+ }
79+ # Build and add custom headers
80+ if custom_headers is not None :
81+ for header_pair in custom_headers .strip ().split (";" ):
82+ if header_pair : # Skip empty header pairs
83+ try :
84+ key , value = header_pair .split (":" , 1 )
85+ headers_dict [key .strip ()] = value .strip ()
86+ except ValueError :
87+ app_logger .warning (
88+ "Ignored invalid header pair" , {"header_pair" : header_pair }
89+ )
90+ return headers_dict
91+
92+
7493class File :
7594 def __init__ (self , name , data , mime = "text/plain" ):
7695 self .name = name
@@ -99,24 +118,28 @@ class OpenCTIApiClient:
99118 ```
100119 :param json_logging: format the logs as json if set to True
101120 :type json_logging: bool, optional
121+ :param bundle_send_to_queue: if bundle will be sent to queue
122+ :type bundle_send_to_queue: bool, optional
102123 :param cert: If String, file path to pem file. If Tuple, a ('path_to_cert.crt', 'path_to_key.key') pair representing the certificate and the key.
103124 :type cert: str, tuple, optional
104- :param auth: Add a AuthBase class with custom authentication for you OpenCTI infrastructure.
105- :type auth: requests.auth.AuthBase, optional
125+ :param custom_headers: Add custom headers to use with the graphql queries
126+ :type custom_headers: str, optional must in the format header01:value;header02:value
127+ :param perform_health_check: if client init must check the api access
128+ :type perform_health_check: bool, optional
106129 """
107130
108131 def __init__ (
109132 self ,
110133 url : str ,
111134 token : str ,
112- log_level = "info" ,
135+ log_level : str = "info" ,
113136 ssl_verify : Union [bool , str ] = False ,
114137 proxies : Union [Dict [str , str ], None ] = None ,
115- json_logging = False ,
116- bundle_send_to_queue = True ,
138+ json_logging : bool = False ,
139+ bundle_send_to_queue : bool = True ,
117140 cert : Union [str , Tuple [str , str ], None ] = None ,
118- auth = None ,
119- perform_health_check = True ,
141+ custom_headers : str = None ,
142+ perform_health_check : bool = True ,
120143 ):
121144 """Constructor method"""
122145
@@ -138,17 +161,10 @@ def __init__(
138161 # Define API
139162 self .api_token = token
140163 self .api_url = url + "/graphql"
141- self .request_headers = {
142- "User-Agent" : "pycti/" + __version__ ,
143- "Authorization" : "Bearer " + token ,
144- }
145-
146- if auth is not None :
147- self .session = requests .session ()
148- self .session .auth = auth
149- else :
150- self .session = requests .session ()
151-
164+ self .request_headers = build_request_headers (
165+ token , custom_headers , self .app_logger
166+ )
167+ self .session = requests .session ()
152168 # Define the dependencies
153169 self .work = OpenCTIApiWork (self )
154170 self .playbook = OpenCTIApiPlaybook (self )
0 commit comments