-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo.py
109 lines (95 loc) · 3.14 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@author: goupper
@file: demo.py
@version: v1
@time: 2022/12/13
"""
import threading
import requests
import consul
class HTTPClient(consul.base.HTTPClient):
_instance_lock = threading.Lock()
_session = None
def __init__(self, *args, **kwargs):
super(HTTPClient, self).__init__(*args, **kwargs)
if not self.__class__._session:
# multiplexing tcp connection
# solve consul error: 429 Your IP is issuing too many concurrent connections, please rate limit your calls
with self._instance_lock:
if not self.__class__._session:
self.__class__._session = requests.Session()
self.session = self.__class__._session
@staticmethod
def response(response):
response.encoding = 'utf-8'
return consul.base.Response(
response.status_code, response.headers, response.text, response.content
)
def get(self, callback, path, params=None, headers=None):
uri = self.uri(path, params)
return callback(
self.response(
self.session.get(
uri,
headers=headers,
verify=self.verify,
cert=self.cert,
timeout=self.timeout,
)
)
)
def put(self, callback, path, params=None, data='', headers=None):
uri = self.uri(path, params)
return callback(
self.response(
self.session.put(
uri,
data=data,
headers=headers,
verify=self.verify,
cert=self.cert,
timeout=self.timeout,
)
)
)
def delete(self, callback, path, params=None, data='', headers=None):
uri = self.uri(path, params)
return callback(
self.response(
self.session.delete(
uri,
data=data,
headers=headers,
verify=self.verify,
cert=self.cert,
timeout=self.timeout,
)
)
)
def post(self, callback, path, params=None, headers=None, data=''):
uri = self.uri(path, params)
return callback(
self.response(
self.session.post(
uri,
data=data,
headers=headers,
verify=self.verify,
cert=self.cert,
timeout=self.timeout,
)
)
)
class CustomConsul(consul.Consul):
@staticmethod
def http_connect(host, port, scheme, verify=True, cert=None, timeout=None):
return HTTPClient(host, port, scheme, verify, cert, timeout)
if __name__ == '__main__':
consul_client = CustomConsul()
key = 'test-key'
consul_client.kv.put(key, 'test put')
_, data = consul_client.kv.get(key)
value = data['Value'].decode() if (data and data['Value']) else None
assert value == 'test put'