2626Currently Dulwich supports only the `get` operation
2727
2828"""
29+ import locale
30+ import logging
2931import os
3032import shlex
3133import shutil
3234import subprocess # nosec B404
3335import sys
34- from typing import Any , Dict , List , Optional , Tuple , Union
35- from urllib .parse import urlparse
36+ from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Optional , Tuple , Union
37+ from urllib .parse import urlparse , urlunparse
3638
3739from dulwich .config import StackedConfig
3840from dulwich .credentials import urlmatch_credential_sections
41+ from funcy import cached_property
42+
43+ from scmrepo .exceptions import SCMError
44+
45+ if TYPE_CHECKING :
46+ from dulwich .config import ConfigDict
47+
48+ logger = logging .getLogger (__name__ )
3949
4050SectionLike = Union [bytes , str , Tuple [Union [bytes , str ], ...]]
4151
4252
43- class CredentialNotFoundError (Exception ):
53+ class CredentialNotFoundError (SCMError ):
4454 """Error occurred while retrieving credentials/no credentials available."""
4555
4656
@@ -60,10 +70,11 @@ def __init__(self, command: str):
6070 if self ._command [0 ] == "!" :
6171 # On Windows this will only work in git-bash and/or WSL2
6272 self ._run_kwargs ["shell" ] = True
73+ self ._encoding = locale .getpreferredencoding ()
6374
64- def _prepare_command (self ) -> Union [str , List [str ]]:
75+ def _prepare_command (self , action : Optional [ str ] = None ) -> Union [str , List [str ]]:
6576 if self ._command [0 ] == "!" :
66- return self ._command [1 :]
77+ return self ._command [1 :] + ( f" { action } " if action else "" )
6778
6879 if sys .platform != "win32" :
6980 argv = shlex .split (self ._command )
@@ -72,6 +83,8 @@ def _prepare_command(self) -> Union[str, List[str]]:
7283 # join arguments when providing a list, so we can just split
7384 # using whitespace.
7485 argv = self ._command .split ()
86+ if action :
87+ argv .append (action )
7588
7689 if os .path .isabs (argv [0 ]):
7790 return argv
@@ -91,77 +104,84 @@ def _prepare_command(self) -> Union[str, List[str]]:
91104
92105 def get (
93106 self ,
94- * ,
95- protocol : Optional [str ] = None ,
96- hostname : Optional [str ] = None ,
97- port : Optional [int ] = None ,
98- username : Optional [str ] = None ,
99- ) -> Tuple [bytes , bytes ]:
100- cmd = self ._prepare_command ()
101- if isinstance (cmd , str ):
102- cmd += " get"
103- else :
104- cmd .append ("get" )
105-
106- helper_input = []
107- if protocol :
108- helper_input .append (f"protocol={ protocol } " )
109- if hostname :
110- helper_input .append (
111- f"host={ hostname } { ':' + str (port ) if port is not None else '' } "
112- )
113- if username :
114- helper_input .append (f"username={ username } " )
115-
116- if not helper_input :
107+ ** kwargs ,
108+ ) -> "Credential" :
109+ if kwargs .get ("protocol" , kwargs .get ("hostname" )) is None :
117110 raise ValueError ("One of protocol, hostname must be provided" )
118-
111+ cmd = self ._prepare_command ("get" )
112+ helper_input = [f"{ key } ={ value } " for key , value in kwargs .items ()]
119113 helper_input .append ("" )
120114
121115 try :
122116 res = subprocess .run ( # type: ignore # nosec B603 # breaks on 3.6
123117 cmd ,
124118 check = True ,
125119 capture_output = True ,
126- input = os .linesep .join (helper_input ).encode ("ascii" ),
120+ input = os .linesep .join (helper_input ),
121+ encoding = self ._encoding ,
127122 ** self ._run_kwargs ,
128123 )
129124 except subprocess .CalledProcessError as exc :
130125 raise CredentialNotFoundError (exc .stderr ) from exc
131126 except FileNotFoundError as exc :
132127 raise CredentialNotFoundError ("Helper not found" ) from exc
128+ if res .stderr :
129+ logger .debug (res .stderr )
133130
134131 credentials = {}
135132 for line in res .stdout .strip ().splitlines ():
136133 try :
137- key , value = line .split (b "=" )
134+ key , value = line .split ("=" )
138135 credentials [key ] = value
139136 except ValueError :
140137 continue
138+ return Credential (** credentials )
141139
142- if not all (
143- (
144- credentials ,
145- b"username" in credentials ,
146- b"password" in credentials ,
147- )
148- ):
149- raise CredentialNotFoundError ("Could not get credentials from helper" )
150-
151- return credentials [b"username" ], credentials [b"password" ]
152-
153- def store (self , * args , ** kwargs ):
140+ def store (self , ** kwargs ):
154141 """Store the credential, if applicable to the helper"""
155- raise NotImplementedError
142+ cmd = self ._prepare_command ("store" )
143+ helper_input = [f"{ key } ={ value } " for key , value in kwargs .items ()]
144+ helper_input .append ("" )
145+
146+ try :
147+ res = subprocess .run ( # type: ignore # nosec B603 # pylint: disable=W1510
148+ cmd ,
149+ capture_output = True ,
150+ input = os .linesep .join (helper_input ),
151+ encoding = self ._encoding ,
152+ ** self ._run_kwargs ,
153+ )
154+ if res .stderr :
155+ logger .debug (res .stderr )
156+ except FileNotFoundError :
157+ logger .debug ("Helper not found" , exc_info = True )
156158
157- def erase (self , * args , * *kwargs ):
159+ def erase (self , ** kwargs ):
158160 """Remove a matching credential, if any, from the helper’s storage"""
159- raise NotImplementedError
161+ cmd = self ._prepare_command ("erase" )
162+ helper_input = [f"{ key } ={ value } " for key , value in kwargs .items ()]
163+ helper_input .append ("" )
164+
165+ try :
166+ res = subprocess .run ( # type: ignore # nosec B603 # pylint: disable=W1510
167+ cmd ,
168+ capture_output = True ,
169+ input = os .linesep .join (helper_input ),
170+ encoding = self ._encoding ,
171+ ** self ._run_kwargs ,
172+ )
173+ if res .stderr :
174+ logger .debug (res .stderr )
175+ except FileNotFoundError :
176+ logger .debug ("Helper not found" , exc_info = True )
160177
161178
162- def get_matching_helper_commands (base_url : str , config ):
179+ def get_matching_helper_commands (
180+ base_url : str , config : Optional [Union ["ConfigDict" , "StackedConfig" ]] = None
181+ ):
182+ config = config or StackedConfig .default ()
163183 if isinstance (config , StackedConfig ):
164- backends = config .backends
184+ backends : Iterable [ "ConfigDict" ] = config .backends
165185 else :
166186 backends = [config ]
167187
@@ -177,19 +197,112 @@ def get_matching_helper_commands(base_url: str, config):
177197 yield command .decode (conf .encoding or sys .getdefaultencoding ())
178198
179199
180- def get_credentials_from_helper ( base_url : str , config ) -> Tuple [ bytes , bytes ] :
181- """Retrieves credentials for the given url from git credential helpers"""
200+ class Credential :
201+ """Git credentials, equivalent to CGit git- credential API.
182202
183- for command in get_matching_helper_commands (base_url , config ):
184- helper = CredentialHelper (command )
185- parsed = urlparse (base_url )
186- try :
187- return helper .get (
188- protocol = parsed .scheme ,
189- hostname = parsed .hostname ,
190- port = parsed .port ,
191- username = parsed .username ,
192- )
193- except CredentialNotFoundError :
194- continue
195- raise CredentialNotFoundError
203+ Usage:
204+
205+ 1. Generate a credential based on context
206+
207+ >>> generated = Credential(url="https://github.com/dtrifiro/aprivaterepo")
208+
209+ 2. Ask git-credential to give username/password for this context
210+
211+ >>> credential = generated.fill()
212+
213+ 3. Use the credential from (2) in Git operation
214+ 4. If the operation in (3) was successful, approve it for re-use in subsequent
215+ operations
216+
217+ >>> credential.approve()
218+
219+ See also:
220+ https://git-scm.com/docs/git-credential#_typical_use_of_git_credential
221+ https://github.com/git/git/blob/master/credential.h
222+
223+ """
224+
225+ def __init__ (
226+ self ,
227+ * ,
228+ protocol : Optional [str ] = None ,
229+ host : Optional [str ] = None , # host with optional ":<port>" included
230+ path : Optional [str ] = None ,
231+ username : Optional [str ] = None ,
232+ password : Optional [str ] = None ,
233+ password_expiry_utc : Optional [int ] = None ,
234+ url : Optional [str ] = None ,
235+ ):
236+ self .protocol = protocol
237+ self .host = host
238+ self .path = path
239+ self .username = username
240+ self .password = password
241+ self .password_expiry_utc = password_expiry_utc
242+ self ._approved = False
243+ if url :
244+ parsed = urlparse (url )
245+ self .protocol = self .protocol or parsed .scheme
246+ if not self .protocol :
247+ raise ValueError ("protocol must be specified when using URL" )
248+ port = f":{ parsed .port } " if parsed .port is not None else ""
249+ hostname = parsed .hostname or ""
250+ self .host = self .host or f"{ hostname } { port } "
251+ self .username = self .username or parsed .username
252+ self .password = self .password or parsed .password
253+
254+ @property
255+ def url (self ) -> str :
256+ if self .username or self .password :
257+ username = self .username or ""
258+ password = self .password or ""
259+ netloc = f"{ username } :{ password } @{ self .host } "
260+ else :
261+ netloc = self .host or ""
262+ return urlunparse ((self .protocol or "" , netloc , self .path or "" , "" , "" , "" ))
263+
264+ @property
265+ def _helper_kwargs (self ) -> Dict [str , str ]:
266+ kwargs = {}
267+ for attr in (
268+ "protocol" ,
269+ "host" ,
270+ "path" ,
271+ "username" ,
272+ "password" ,
273+ "password_expiry_utc" ,
274+ ):
275+ value = getattr (self , attr )
276+ if value is not None :
277+ kwargs [attr ] = str (value )
278+ return kwargs
279+
280+ @cached_property
281+ def helpers (self ) -> List ["CredentialHelper" ]:
282+ url = self .url
283+ return [
284+ CredentialHelper (command ) for command in get_matching_helper_commands (url )
285+ ]
286+
287+ def fill (self ) -> "Credential" :
288+ """Return a new credential with filled username and password."""
289+ for helper in self .helpers :
290+ try :
291+ return helper .get (** self ._helper_kwargs )
292+ except CredentialNotFoundError :
293+ continue
294+ raise CredentialNotFoundError (f"No available credentials for '{ self .url } '" )
295+
296+ def approve (self ):
297+ """Store this credential in available helpers."""
298+ if self ._approved or not (self .username and self .password ):
299+ return
300+ for helper in self .helpers :
301+ helper .store (** self ._helper_kwargs )
302+ self ._approved = True
303+
304+ def reject (self ):
305+ """Remove this credential from available helpers."""
306+ for helper in self .helpers :
307+ helper .erase (** self ._helper_kwargs )
308+ self ._approved = False
0 commit comments