-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvalidation.py
More file actions
234 lines (187 loc) · 8.8 KB
/
validation.py
File metadata and controls
234 lines (187 loc) · 8.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# validation.py
import logging
import datetime
import base64
import requests
from date_formats import date_formats
import time
from pdf_utils import get_pdf_page_count, is_valid_pdf_base64, is_valid_pdf_url
used_transaction_ids = set() # Shared resource for tracking transaction IDs
def validate_request_data(request_data, txn_id):
# Check if the command is 'managexsign'
command = request_data.get('request', {}).get('command')
if not command or command != "managexsign":
logging.error(f"Invalid or missing command: {command}.")
return {'error': 'Invalid or missing command.', 'status': 400}
# Check if the transaction ID is unique
if txn_id in used_transaction_ids:
logging.error(f"Duplicate transaction ID")
return {'error': 'Duplicate transaction ID', 'status': 400}
else:
used_transaction_ids.add(txn_id) # Add the txn_id to the used set
# Extract timestamp from the request data
timestamp = request_data.get('request', {}).get('timestamp')
if not timestamp:
return {'error': 'Timestamp is missing.', 'status': 400}
# Validate timestamp: Must not be older than 30 seconds
try:
timestamp_dt = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
current_time = datetime.datetime.now(datetime.timezone.utc)
time_difference = (current_time - timestamp_dt).total_seconds()
if abs(time_difference) > 30:
return {'error': 'Timestamp is older than 30 seconds.', 'status': 400}
except ValueError:
return {'error': 'Invalid timestamp format.', 'status': 400}
# If all checks pass
return {'success': True}
def extract_and_validate_certificate(request_data):
# Extract CN and SN from the JSON
attributes = request_data.get('request', {}).get('certificate', {}).get('attribute', [])
if attributes is None:
return {'error': 'Certificate attribute is missing.', 'status': 400}
SN = None
for attribute in attributes:
if attribute.startswith("SN="):
SN = attribute.split('=')[1] # Get value after SN=
break # Stop after finding the first SN
# Validate SN
if not SN: # SN must be provided
return {'error': 'SN must be provided.', 'status': 400}
# If everything is valid
return {'success': True, 'SN': SN}
def validate_pdf_data(request_data):
# Extract pdf_base64 and pdf_url
pdf_base64 = request_data.get('request', {}).get('pdf_data')
pdf_url = request_data.get('request', {}).get('pdfurl')
if pdf_base64 and pdf_url:
logging.error("Both pdf_data and pdfurl cannot be provided together.")
return {'error': 'Both pdf_data and pdfurl cannot be provided together.', 'status': 400}
elif pdf_base64:
pdf_data = is_valid_pdf_base64(pdf_base64)
if not pdf_data:
logging.error("Invalid PDF provided in base64 format.")
return {'error': 'Invalid PDF in base64 format.', 'status': 400}
elif pdf_url:
pdf_data = is_valid_pdf_url(pdf_url)
if not pdf_data:
logging.error("Invalid or inaccessible PDF provided in URL.")
return {'error': 'Invalid or inaccessible PDF URL.', 'status': 400}
else:
logging.error("Neither valid pdf_data nor valid pdfurl was provided.")
return {'error': 'Neither valid pdf_data nor valid pdfurl was provided.', 'status': 400}
# If all checks pass, return the valid PDF data
return {'success': True, 'pdf_data': pdf_data}
def is_valid_pdf_base64(pdf_base64):
try:
pdf_data = base64.b64decode(pdf_base64, validate=True)
if pdf_data.startswith(b'%PDF'):
return pdf_data # Valid PDF data
except (base64.binascii.Error, ValueError):
return None # Invalid base64
return None # Not a valid PDF
def is_valid_pdf_url(pdf_url):
try:
response = requests.get(pdf_url, stream=True, timeout=10)
if response.status_code == 200 and response.headers.get('Content-Type') == 'application/pdf':
return response.content # Valid PDF data
except requests.RequestException as e:
logging.error(f"Error accessing PDF URL: {e}")
return None
return None # Not a valid PDF or inaccessible
def validate_and_process_pdf_metadata(request_data):
# Extract dateformat
dateformat = request_data.get('request', {}).get('pdf', {}).get('dateformat', '').strip()
# Extract email
email = request_data.get('request', {}).get('pdf', {}).get('email', '').strip()
# Handle timestamp enabling logic
enabletimestamp = request_data.get('request', {}).get('pdf', {}).get('enabletimestamp', '').lower()
timestamp_url = None
if enabletimestamp == "yes":
timestamp_url = 'http://timestamp.comodoca.com'
# Check if timestamp URL is set and try to contact the timestamp service
if timestamp_url:
try:
response = requests.get(timestamp_url, timeout=10) # 10 seconds timeout
if response.status_code != 200:
logging.error("Time stamping service is not working.")
return {'error': 'Time stamping service is not working.', 'status': 503}
except requests.exceptions.RequestException as e:
logging.error(f"Error connecting to the time stamping service: {e}")
return {'error': 'Time stamping service is not working.', 'status': 503}
# If dateformat is not provided or invalid, default to 'dd-MMM-yyyy HH:mm:ss'
if not dateformat or dateformat not in date_formats:
dateformat = 'dd-MMM-yyyy HH:mm:ss'
# Extract the title
title = request_data.get('request', {}).get('pdf', {}).get('title', 'Untitled').strip()
if not title:
title = "MX_Signer" # Default name if title is missing or empty
# Replace spaces in the title with underscores
title = title.replace(" ", "_")
# Get the current datetime and format it according to the dateformat
signing_datetime = datetime.datetime.now() # Correct usage of datetime.datetime
date_str = signing_datetime.strftime(date_formats[dateformat])
# Return processed data including timestamp_url
return {
'success': True,
'dateformat': dateformat,
'email': email,
'enabletimestamp': enabletimestamp,
'timestamp_url': timestamp_url,
'title': title,
'date_str': date_str
}
def validate_and_process_pdf_page_data(request_data, pdf_data):
page_number = request_data.get('request', {}).get('pdf', {}).get('page', 0)
total_pages = get_pdf_page_count(pdf_data)
logging.info(f"Total Pages in PDF: {total_pages}")
# Validate page number
if page_number is None or page_number == '':
return {'error': 'Please select a page number.', 'status': 400}
try:
page_number = int(page_number)
except ValueError:
return {'error': 'Invalid page number format.', 'status': 400}
if page_number > total_pages:
logging.error("Page Limit Exceeded.")
return {'error': 'Page Limit Exceeded.', 'status': 400}
sigpage = page_number - 1 if page_number > 0 else 0
# Extract coordinates
coordinates = request_data.get('request', {}).get('pdf', {}).get('coordinates', '')
if not coordinates:
coordinates = "490, 212, 567, 239"
try:
coordinates = [int(coord) for coord in coordinates.split(',')]
except ValueError:
return {'error': 'Invalid coordinates format.', 'status': 400}
# Lock PDF settings
lockpdf = request_data.get('request', {}).get('pdf', {}).get('lockpdf', '').strip().lower()
invisible_sign = request_data.get('request', {}).get('pdf', {}).get('invisiablesign', '').strip().lower()
sigflags = 1 if lockpdf == "yes" else 3
sigandcertify = lockpdf == "yes"
sigbutton = invisible_sign != "yes"
signaturebox = None if invisible_sign == "yes" else coordinates
return {
'success': True,
'sigpage': sigpage,
'coordinates': coordinates,
'sigflags': sigflags,
'sigandcertify': sigandcertify,
'sigbutton': sigbutton,
'signaturebox': signaturebox,
'invisible_sign': invisible_sign # Add this key
}
def extract_recipient_and_cert_email(request_data, cn_name):
# Extract recipient name from the request
recipient_name = request_data.get('request', {}).get('pdf', {}).get('recipient', '').strip()
# If recipient name is empty, use cn_name
if not recipient_name:
recipient_name = cn_name
# Extract certemail from the request
certemail = request_data.get('request', {}).get('pdf', {}).get('certemail', '').strip()
# Extract webhook_url from the request
webhook_url = request_data.get('request', {}).get('parameter', {}).get('webhook_url', None)
return {
'recipient_name': recipient_name,
'certemail': certemail,
'webhook_url': webhook_url
}