3
3
from multiprocessing import Process , Pipe
4
4
import time
5
5
from select import select
6
+ from logging import Logger , getLogger
6
7
7
8
8
9
POSTGRESQL_CHANNEL_NAME = "casbin_role_watcher"
9
10
10
11
11
12
def casbin_subscription (
12
13
process_conn : Pipe ,
14
+ logger : Logger ,
13
15
host : str ,
14
16
user : str ,
15
17
password : str ,
@@ -20,7 +22,7 @@ def casbin_subscription(
20
22
sslmode : Optional [str ] = None ,
21
23
sslrootcert : Optional [str ] = None ,
22
24
sslcert : Optional [str ] = None ,
23
- sslkey : Optional [str ] = None
25
+ sslkey : Optional [str ] = None ,
24
26
):
25
27
# delay connecting to postgresql (postgresql connection failure)
26
28
time .sleep (delay )
@@ -39,14 +41,14 @@ def casbin_subscription(
39
41
conn .set_isolation_level (extensions .ISOLATION_LEVEL_AUTOCOMMIT )
40
42
curs = conn .cursor ()
41
43
curs .execute (f"LISTEN { channel_name } ;" )
42
- print ("Waiting for casbin policy update" )
44
+ logger . debug ("Waiting for casbin policy update" )
43
45
while True and not curs .closed :
44
46
if not select ([conn ], [], [], 5 ) == ([], [], []):
45
- print ("Casbin policy update identified.." )
47
+ logger . debug ("Casbin policy update identified.." )
46
48
conn .poll ()
47
49
while conn .notifies :
48
50
notify = conn .notifies .pop (0 )
49
- print (f"Notify: { notify .payload } " )
51
+ logger . debug (f"Notify: { notify .payload } " )
50
52
process_conn .send (notify .payload )
51
53
52
54
@@ -63,7 +65,8 @@ def __init__(
63
65
sslmode : Optional [str ] = None ,
64
66
sslrootcert : Optional [str ] = None ,
65
67
sslcert : Optional [str ] = None ,
66
- sslkey : Optional [str ] = None
68
+ sslkey : Optional [str ] = None ,
69
+ logger : Optional [Logger ] = None ,
67
70
):
68
71
self .update_callback = None
69
72
self .parent_conn = None
@@ -77,6 +80,9 @@ def __init__(
77
80
self .sslrootcert = sslrootcert
78
81
self .sslcert = sslcert
79
82
self .sslkey = sslkey
83
+ if logger is None :
84
+ logger = getLogger ()
85
+ self .logger = logger
80
86
self .subscribed_process = self .create_subscriber_process (start_process )
81
87
82
88
def create_subscriber_process (
@@ -91,6 +97,7 @@ def create_subscriber_process(
91
97
target = casbin_subscription ,
92
98
args = (
93
99
child_conn ,
100
+ self .logger ,
94
101
self .host ,
95
102
self .user ,
96
103
self .password ,
@@ -101,7 +108,7 @@ def create_subscriber_process(
101
108
self .sslmode ,
102
109
self .sslrootcert ,
103
110
self .sslcert ,
104
- self .sslkey
111
+ self .sslkey ,
105
112
),
106
113
daemon = True ,
107
114
)
@@ -110,7 +117,7 @@ def create_subscriber_process(
110
117
return p
111
118
112
119
def set_update_callback (self , fn_name : Callable ):
113
- print ( "runtime is set update callback" , fn_name )
120
+ self . logger . debug ( f "runtime is set update callback { fn_name } " )
114
121
self .update_callback = fn_name
115
122
116
123
def update (self ):
@@ -138,10 +145,10 @@ def should_reload(self):
138
145
try :
139
146
if self .parent_conn .poll (None ):
140
147
message = self .parent_conn .recv ()
141
- print (f"message:{ message } " )
148
+ self . logger . debug (f"message:{ message } " )
142
149
return True
143
150
except EOFError :
144
- print (
151
+ self . logger . warning (
145
152
"Child casbin-watcher subscribe process has stopped, "
146
153
"attempting to recreate the process in 10 seconds..."
147
154
)
0 commit comments