21
21
)
22
22
from exasol .analytics .query_handler .query_handler import QueryHandler
23
23
from exasol .analytics .query_handler .result import Continue , Finish
24
+ from exasol .analytics .utils .errors import UninitializedAttributeError
24
25
25
26
26
27
class ResultHandlerReturnValue (enum .Enum ):
@@ -55,23 +56,38 @@ def __init__(
55
56
self ._current_query_handler : Optional [
56
57
QueryHandler [List [SQLStageInputOutput ], SQLStageInputOutput ]
57
58
] = None
58
- self ._current_query_handler_context : Optional [ScopeQueryHandlerContext ] = None
59
+ self ._current_qh_context : Optional [ScopeQueryHandlerContext ] = None
59
60
self ._create_current_query_handler ()
60
61
61
- def _check_is_valid (self ):
62
- if self ._current_query_handler is None :
63
- raise RuntimeError ("No current query handler set." )
64
-
65
62
def get_current_query_handler (
66
63
self ,
67
64
) -> QueryHandler [List [SQLStageInputOutput ], SQLStageInputOutput ]:
68
- self ._check_is_valid ()
69
- return self ._current_query_handler
65
+ value = self ._current_query_handler
66
+ if value is None :
67
+ raise RuntimeError ("No current query handler set." )
68
+ return value
69
+
70
+ @property
71
+ def _checked_current_qh_context (self ) -> ScopeQueryHandlerContext :
72
+ value = self ._current_qh_context
73
+ if value is None :
74
+ raise UninitializedAttributeError (
75
+ "Current query handler context is undefined."
76
+ )
77
+ return value
78
+
79
+ @property
80
+ def _checked_current_stage (self ) -> SQLStage :
81
+ value = self ._current_stage
82
+ if value is None :
83
+ raise UninitializedAttributeError ("Current stage is None." )
84
+ return value
70
85
71
86
def handle_result (
72
87
self , result : Union [Continue , Finish [SQLStageInputOutput ]]
73
88
) -> ResultHandlerReturnValue :
74
- self ._check_is_valid ()
89
+ # check if current query handler is set
90
+ self .get_current_query_handler ()
75
91
if isinstance (result , Finish ):
76
92
return self ._handle_finished_result (result )
77
93
elif isinstance (result , Continue ):
@@ -90,7 +106,7 @@ def _handle_finished_result(
90
106
return self ._try_to_move_to_next_stage ()
91
107
92
108
def _try_to_move_to_next_stage (self ) -> ResultHandlerReturnValue :
93
- self ._current_query_handler_context .release ()
109
+ self ._checked_current_qh_context .release ()
94
110
if self ._is_not_last_stage ():
95
111
self ._move_to_next_stage ()
96
112
return ResultHandlerReturnValue .CONTINUE_PROCESSING
@@ -101,7 +117,7 @@ def _try_to_move_to_next_stage(self) -> ResultHandlerReturnValue:
101
117
def invalidate (self ):
102
118
self ._current_stage = None
103
119
self ._current_query_handler = None
104
- self ._current_query_handler_context = None
120
+ self ._current_qh_context = None
105
121
106
122
def _is_not_last_stage (self ):
107
123
return self ._current_stage_index < len (self ._stages_in_execution_order ) - 1
@@ -113,7 +129,7 @@ def _move_to_next_stage(self):
113
129
114
130
def _create_current_query_handler (self ):
115
131
stage_inputs = self ._stage_inputs_map [self ._current_stage ]
116
- self ._current_query_handler_context = (
132
+ self ._current_qh_context = (
117
133
self ._query_handler_context .get_child_query_handler_context ()
118
134
)
119
135
result_bucketfs_location = self ._result_bucketfs_location .joinpath (
@@ -123,12 +139,12 @@ def _create_current_query_handler(self):
123
139
result_bucketfs_location = result_bucketfs_location ,
124
140
sql_stage_inputs = stage_inputs ,
125
141
)
126
- self ._current_query_handler = self ._current_stage .create_train_query_handler (
127
- stage_input , self ._current_query_handler_context
142
+ self ._current_query_handler = self ._checked_current_stage .create_train_query_handler (
143
+ stage_input , self ._current_qh_context
128
144
)
129
145
130
146
def _add_result_to_successors (self , result : SQLStageInputOutput ):
131
- successors = self ._sql_stage_graph .successors (self ._current_stage )
147
+ successors = self ._sql_stage_graph .successors (self ._checked_current_stage )
132
148
if len (successors ) == 0 :
133
149
raise RuntimeError ("Programming error" )
134
150
self ._add_result_to_inputs_of_successors (result , successors )
@@ -146,7 +162,7 @@ def _add_result_to_reference_counting_bag(
146
162
object_proxies = find_object_proxies (result )
147
163
for object_proxy in object_proxies :
148
164
if object_proxy not in self ._reference_counting_bag :
149
- self ._current_query_handler_context .transfer_object_to (
165
+ self ._checked_current_qh_context .transfer_object_to (
150
166
object_proxy , self ._query_handler_context
151
167
)
152
168
for _ in successors :
@@ -160,7 +176,7 @@ def _transfer_ownership_of_result_to_query_result_handler(self, result):
160
176
object_proxy
161
177
)
162
178
else :
163
- self ._current_query_handler_context .transfer_object_to (
179
+ self ._checked_current_qh_context .transfer_object_to (
164
180
object_proxy , self ._query_handler_context
165
181
)
166
182
0 commit comments