-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaddonmanager_workers_utility.py
99 lines (86 loc) · 4.39 KB
/
addonmanager_workers_utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# SPDX-License-Identifier: LGPL-2.1-or-later
# ***************************************************************************
# * *
# * Copyright (c) 2022-2023 FreeCAD Project Association *
# * *
# * This file is part of FreeCAD. *
# * *
# * FreeCAD is free software: you can redistribute it and/or modify it *
# * under the terms of the GNU Lesser General Public License as *
# * published by the Free Software Foundation, either version 2.1 of the *
# * License, or (at your option) any later version. *
# * *
# * FreeCAD is distributed in the hope that it will be useful, but *
# * WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with FreeCAD. If not, see *
# * <https://www.gnu.org/licenses/>. *
# * *
# ***************************************************************************
"""Misc. worker thread classes for the FreeCAD Addon Manager."""
try:
from PySide import QtCore
except ImportError:
try:
from PySide6 import QtCore
except ImportError:
from PySide2 import QtCore
import NetworkManager
import time
import addonmanager_freecad_interface as fci
translate = fci.translate
class ConnectionChecker(QtCore.QThread):
"""A worker thread for checking the connection to GitHub as a proxy for overall
network connectivity. It has two signals: success() and failure(str). The failure
signal contains a translated error message suitable for display to an end user."""
success = QtCore.Signal()
failure = QtCore.Signal(str)
def __init__(self):
QtCore.QThread.__init__(self)
self.done = False
self.request_id = None
self.data = None
def run(self):
"""Not generally called directly: create a new ConnectionChecker object and call start()
on it to spawn a child thread."""
fci.Console.PrintLog("Checking network connection...\n")
url = "https://api.github.com/zen"
self.done = False
NetworkManager.AM_NETWORK_MANAGER.completed.connect(self.connection_data_received)
self.request_id = NetworkManager.AM_NETWORK_MANAGER.submit_unmonitored_get(
url, timeout_ms=10000
)
while not self.done:
if QtCore.QThread.currentThread().isInterruptionRequested():
fci.Console.PrintLog("Connection check cancelled\n")
NetworkManager.AM_NETWORK_MANAGER.abort(self.request_id)
self.disconnect_network_manager()
return
QtCore.QCoreApplication.processEvents()
time.sleep(0.1)
if not self.data:
self.failure.emit(
translate(
"AddonsInstaller",
"Unable to read data from GitHub: check your internet connection and proxy "
"settings and try again.",
)
)
self.disconnect_network_manager()
return
fci.Console.PrintLog(f"GitHub's zen message response: {self.data.decode('utf-8')}\n")
self.disconnect_network_manager()
self.success.emit()
def connection_data_received(self, id: int, status: int, data: QtCore.QByteArray):
if self.request_id is not None and self.request_id == id:
if status == 200:
self.data = data.data()
else:
fci.Console.PrintWarning(f"No data received: status returned was {status}\n")
self.data = None
self.done = True
def disconnect_network_manager(self):
NetworkManager.AM_NETWORK_MANAGER.completed.disconnect(self.connection_data_received)