|
2 | 2 |
|
3 | 3 | __author__ = "Mario Rojas"
|
4 | 4 | __license__ = "BSD 3-clause"
|
5 |
| -__version__ = "1.2.0" |
| 5 | +__version__ = "1.3.0" |
6 | 6 | __maintainer__ = "Mario Rojas"
|
7 | 7 | __status__ = "Production"
|
8 | 8 |
|
|
11 | 11 | import os
|
12 | 12 | import re
|
13 | 13 | import threading
|
| 14 | +import time |
| 15 | +from threading import Semaphore |
14 | 16 |
|
15 | 17 | from dotenv import load_dotenv
|
16 | 18 |
|
|
21 | 23 | from scripts.helpers import cve_trends
|
22 | 24 |
|
23 | 25 | load_dotenv()
|
| 26 | +Throttle_msg = "" |
24 | 27 |
|
25 | 28 | # argparse setup
|
26 | 29 | parser = argparse.ArgumentParser(description="CVE Prioritizer", epilog='Happy Patching',
|
|
32 | 35 | required=False, metavar='')
|
33 | 36 | parser.add_argument('-n', '--cvss', type=float, help='CVSS threshold (Default 6.0)', default=6.0, metavar='')
|
34 | 37 | parser.add_argument('-o', '--output', type=str, help='Output filename', required=False, metavar='')
|
35 |
| -parser.add_argument('-t', '--threads', type=str, help='Number of concurrent threads', required=False, metavar='') |
| 38 | +parser.add_argument('-t', '--threads', type=int, help='Number of concurrent threads', required=False, metavar='', |
| 39 | + default=100) |
36 | 40 | parser.add_argument('-v', '--verbose', help='Verbose mode', action='store_true')
|
37 | 41 | parser.add_argument('-l', '--list', help='Space separated list of CVEs', nargs='+', required=False, metavar='')
|
38 | 42 |
|
|
46 | 50 | header = SIMPLE_HEADER
|
47 | 51 | epss_threshold = args.epss
|
48 | 52 | cvss_threshold = args.cvss
|
| 53 | + sem = Semaphore(args.threads) |
49 | 54 |
|
50 | 55 | # Temporal lists
|
51 | 56 | cve_list = []
|
|
64 | 69 | elif args.list:
|
65 | 70 | cve_list = args.list
|
66 | 71 | if not os.getenv('NIST_API'):
|
67 |
| - print(LOGO + 'Warning: Using this tool without specifying a NIST API may result in errors' |
68 |
| - + '\n\n' + header) |
| 72 | + if len(cve_list) > 75: |
| 73 | + Throttle_msg = "Large number of CVEs detected, requests will be throttle to avoid API issues" |
| 74 | + print(LOGO + Throttle_msg + '\n' |
| 75 | + + 'Warning: Using this tool without specifying a NIST API may result in errors' + '\n\n' + header) |
69 | 76 | else:
|
70 | 77 | print(LOGO + header)
|
71 | 78 | elif args.file:
|
72 | 79 | cve_list = [line.rstrip() for line in args.file]
|
73 | 80 | if not os.getenv('NIST_API'):
|
74 |
| - print(LOGO + 'Warning: Using this tool without specifying a NIST API may result in errors' |
75 |
| - + '\n\n' + header) |
| 81 | + if len(cve_list) > 75: |
| 82 | + Throttle_msg = "Large number of CVEs detected, requests will be throttle to avoid API issues" |
| 83 | + print(LOGO + Throttle_msg + '\n' |
| 84 | + + 'Warning: Using this tool without specifying a NIST API may result in errors' + '\n\n' + header) |
76 | 85 | else:
|
77 | 86 | print(LOGO + header)
|
78 | 87 | elif args.demo:
|
|
94 | 103 | output_file.write("cve_id,priority,epss,cvss,cvss_version,cvss_severity,cisa_kev"+"\n")
|
95 | 104 |
|
96 | 105 | for cve in cve_list:
|
| 106 | + throttle = 1 |
| 107 | + if len(cve_list) > 75 and not os.getenv('NIST_API'): |
| 108 | + throttle = 6 |
97 | 109 | if not re.match("(CVE|cve-\d{4}-\d+$)", cve):
|
98 | 110 | print(f"{cve} Error: CVEs should be provided in the standard format CVE-0000-0000*")
|
99 | 111 | else:
|
| 112 | + sem.acquire() |
100 | 113 | t = threading.Thread(target=worker, args=(cve.upper().strip(), cvss_threshold, epss_threshold, args.verbose,
|
101 |
| - args.output)) |
| 114 | + sem, args.output)) |
102 | 115 | threads.append(t)
|
103 | 116 | t.start()
|
| 117 | + time.sleep(throttle) |
104 | 118 |
|
105 | 119 | for t in threads:
|
106 | 120 | t.join()
|
0 commit comments