Skip to content

Commit ffc6bc1

Browse files
committed
feat: massStoreRunAsynchronous()
Even though commit d915473 introduced a socket-level TCP keepalive support into the server's implementation, this was observed multiple times to not be enough to **deterministically** fix the issues with the `CodeChecker store` client hanging indefinitely when the server takes a long time processing the to-be-stored data. The underlying reasons are not entirely clear and the issue only pops up sporadically, but we did observe a few similar scenarios (such as multi-million report storage from analysing LLVM and then storing between datacentres) where it almost reliably reproduces. The symptoms (even with a configure `kepalive`) generally include the server not becoming notified about the client's disconnect, while the client process is hung on a low-level system call `read(4, ...)`, trying to get the Thrift response of `massStoreRun()` from the HTTP socket. Even if the server finishes the storage processing "in time" and sent the Thrift reply, it never reaches the client, which means it never exits from the waiting, which means it keeps either the terminal or, worse, a CI script occupied, blocking execution. This is the "more proper solution" foreshadowed in commit 15af7d8. Implemented the server-side logic to spawn a `MassStoreRun` task and return its token, giving the `massStoreRunAsynchronous()` API call full force. Implemented the client-side logic to use the new `task_client` module and the same logic as `CodeChecker cmd serverside-tasks --await --token TOKEN...` to poll the server for the task's completion and status.
1 parent d42164b commit ffc6bc1

File tree

8 files changed

+159
-103
lines changed

8 files changed

+159
-103
lines changed

docs/web/user_guide.md

+13
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,19 @@ optional arguments:
480480
is given, the longest match will be removed. You may
481481
also use Unix shell-like wildcards (e.g.
482482
'/*/jsmith/').
483+
--detach Runs `store` in fire-and-forget mode: exit immediately
484+
once the server accepted the analysis reports for
485+
storing, without waiting for the server-side data
486+
processing to conclude. Doing this is generally not
487+
recommended, as the client will never be notified of
488+
potential processing failures, and there is no easy way
489+
to wait for the successfully stored results to become
490+
available server-side for potential further processing
491+
(e.g., `CodeChecker cmd diff`). However, using
492+
'--detach' can significantly speed up large-scale
493+
monitoring analyses where access to the results by a
494+
tool is not a goal, such as in the case of non-gating
495+
CI systems.
483496
--config CONFIG_FILE Allow the configuration from an explicit configuration
484497
file. The values configured in the config file will
485498
overwrite the values set in the command line.

web/api/codechecker_api_shared.thrift

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ enum ErrorCode {
3434
API_MISMATCH = 5,
3535

3636
// REMOVED IN API v6.59 (CodeChecker v6.25.0)!
37-
// Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun()
37+
// Previously sent by report_server.thrift/codeCheckerDBAccess::massStoreRun()
3838
// when the client uploaded a source file which contained errors, such as
3939
// review status source-code-comment errors.
4040
/* SOURCE_FILE = 6, */ // Never reuse the value of the enum constant!
4141

4242
// REMOVED IN API v6.59 (CodeChecker v6.25.0)!
43-
// Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun()
43+
// Previously sent by report_server.thrift/codeCheckerDBAccess::massStoreRun()
4444
// when the client uploaded a report with annotations that had invalid types.
4545
/* REPORT_FORMAT = 7, */ // Never reuse the value of the enum constant!
4646
}

web/client/codechecker_client/cmd/store.py

+68-61
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
from threading import Timer
3232
from typing import Dict, Iterable, List, Set, Tuple
3333

34-
from codechecker_api.codeCheckerDBAccess_v6.ttypes import StoreLimitKind
34+
from codechecker_api.codeCheckerDBAccess_v6.ttypes import \
35+
StoreLimitKind, SubmittedRunOptions
3536

3637
from codechecker_report_converter import twodim
3738
from codechecker_report_converter.report import Report, report_file, \
@@ -50,8 +51,8 @@ def assemble_blame_info(_, __) -> int:
5051
"""
5152
raise NotImplementedError()
5253

53-
from codechecker_client import client as libclient
54-
from codechecker_client import product
54+
from codechecker_client import client as libclient, product
55+
from codechecker_client.task_client import await_task_termination
5556
from codechecker_common import arg, logger, cmd_config
5657
from codechecker_common.checker_labels import CheckerLabels
5758
from codechecker_common.compatibility.multiprocessing import Pool
@@ -256,6 +257,24 @@ def add_arguments_to_parser(parser):
256257
"match will be removed. You may also use Unix "
257258
"shell-like wildcards (e.g. '/*/jsmith/').")
258259

260+
parser.add_argument("--detach",
261+
dest="detach",
262+
default=argparse.SUPPRESS,
263+
action="store_true",
264+
required=False,
265+
help="""
266+
Runs `store` in fire-and-forget mode: exit immediately once the server accepted
267+
the analysis reports for storing, without waiting for the server-side data
268+
processing to conclude.
269+
Doing this is generally not recommended, as the client will never be notified
270+
of potential processing failures, and there is no easy way to wait for the
271+
successfully stored results to become available server-side for potential
272+
further processing (e.g., `CodeChecker cmd diff`).
273+
However, using '--detach' can significantly speed up large-scale monitoring
274+
analyses where access to the results by a tool is not a goal, such as in the
275+
case of non-gating CI systems.
276+
""")
277+
259278
cmd_config.add_option(parser)
260279

261280
parser.add_argument('-f', '--force',
@@ -718,7 +737,7 @@ def get_analysis_statistics(inputs, limits):
718737
return statistics_files if has_failed_zip else []
719738

720739

721-
def storing_analysis_statistics(client, inputs, run_name):
740+
def store_analysis_statistics(client, inputs, run_name):
722741
"""
723742
Collects and stores analysis statistics information on the server.
724743
"""
@@ -933,68 +952,56 @@ def main(args):
933952

934953
description = args.description if 'description' in args else None
935954

936-
LOG.info("Storing results to the server...")
937-
938-
try:
939-
with _timeout_watchdog(timedelta(hours=1),
940-
signal.SIGUSR1):
941-
client.massStoreRun(args.name,
942-
args.tag if 'tag' in args else None,
943-
str(context.version),
944-
b64zip,
945-
'force' in args,
946-
trim_path_prefixes,
947-
description)
948-
except WatchdogError as we:
949-
LOG.warning("%s", str(we))
950-
951-
# Showing parts of the exception stack is important here.
952-
# We **WANT** to see that the timeout happened during a wait on
953-
# Thrift reading from the TCP connection (something deep in the
954-
# Python library code at "sock.recv_into").
955-
import traceback
956-
_, _, tb = sys.exc_info()
957-
frames = traceback.extract_tb(tb)
958-
first, last = frames[0], frames[-2]
959-
formatted_frames = traceback.format_list([first, last])
960-
fmt_first, fmt_last = formatted_frames[0], formatted_frames[1]
961-
LOG.info("Timeout was triggered during:\n%s", fmt_first)
962-
LOG.info("Timeout interrupted this low-level operation:\n%s",
963-
fmt_last)
964-
965-
LOG.error("Timeout!"
966-
"\n\tThe server's reply did not arrive after "
967-
"%d seconds (%s) elapsed since the server-side "
968-
"processing began."
969-
"\n\n\tThis does *NOT* mean that there was an issue "
970-
"with the run you were storing!"
971-
"\n\tThe server might still be processing the results..."
972-
"\n\tHowever, it is more likely that the "
973-
"server had already finished, but the client did not "
974-
"receive a response."
975-
"\n\tUsually, this is caused by the underlying TCP "
976-
"connection failing to signal a low-level disconnect."
977-
"\n\tClients potentially hanging indefinitely in these "
978-
"scenarios is an unfortunate and known issue."
979-
"\n\t\tSee http://github.com/Ericsson/codechecker/"
980-
"issues/3672 for details!"
981-
"\n\n\tThis error here is a temporary measure to ensure "
982-
"an infinite hang is replaced with a well-explained "
983-
"timeout."
984-
"\n\tA more proper solution will be implemented in a "
985-
"subsequent version of CodeChecker.",
986-
we.timeout.total_seconds(), str(we.timeout))
987-
sys.exit(1)
955+
LOG.info("Storing results to the server ...")
956+
task_token: str = client.massStoreRunAsynchronous(
957+
b64zip,
958+
SubmittedRunOptions(
959+
runName=args.name,
960+
tag=args.tag if "tag" in args else None,
961+
version=str(context.version),
962+
force="force" in args,
963+
trimPathPrefixes=trim_path_prefixes,
964+
description=description)
965+
)
966+
LOG.info("Reports submitted to the server for processing.")
988967

989-
# Storing analysis statistics if the server allows them.
990968
if client.allowsStoringAnalysisStatistics():
991-
storing_analysis_statistics(client, args.input, args.name)
992-
993-
LOG.info("Storage finished successfully.")
969+
store_analysis_statistics(client, args.input, args.name)
970+
971+
if "detach" in args:
972+
LOG.warning("Exiting the 'store' subcommand as '--detach' was "
973+
"specified: not waiting for the result of the store "
974+
"operation.\n"
975+
"The server might not have finished processing "
976+
"everything at this point, so do NOT rely on querying "
977+
"the results just yet!\n"
978+
"To await the completion of the processing later, "
979+
"you can execute:\n\n"
980+
"\tCodeChecker cmd serverside-tasks --token %s "
981+
"--await",
982+
task_token)
983+
# Print the token to stdout as well, so scripts can use "--detach"
984+
# meaningfully.
985+
print(task_token)
986+
return
987+
988+
task_client = libclient.setup_task_client(protocol, host, port)
989+
task_status: str = await_task_termination(LOG, task_token,
990+
task_api_client=task_client)
991+
992+
if task_status == "COMPLETED":
993+
LOG.info("Storing the reports finished successfully.")
994+
else:
995+
LOG.error("Storing the reports failed! "
996+
"The job terminated in status '%s'. "
997+
"The comments associated with the failure are:\n\n%s",
998+
task_status,
999+
task_client.getTaskInfo(task_token).comments)
1000+
sys.exit(1)
9941001
except Exception as ex:
9951002
import traceback
9961003
traceback.print_exc()
997-
LOG.info("Storage failed: %s", str(ex))
1004+
LOG.error("Storing the reports failed: %s", str(ex))
9981005
sys.exit(1)
9991006
finally:
10001007
os.close(zip_file_handle)

web/client/codechecker_client/helpers/results.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,12 @@ def massStoreRun(self, name, tag, version, zipdir, force,
182182
pass
183183

184184
@thrift_client_call
185-
def massStoreRunAsynchronous(self, zipfile_blob: str,
186-
store_opts: ttypes.SubmittedRunOptions) \
187-
-> str:
188-
pass
185+
def massStoreRunAsynchronous(
186+
self,
187+
zipfile_blob: str,
188+
store_opts: ttypes.SubmittedRunOptions
189+
) -> str:
190+
raise NotImplementedError("Should have called Thrift code!")
189191

190192
@thrift_client_call
191193
def allowsStoringAnalysisStatistics(self):

web/client/codechecker_client/task_client.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ def _transform_product_ids_to_endpoints(
363363
product.id: product.endpoint
364364
for product
365365
in get_product_api().getProducts(None, None)}
366+
ti["productEndpoint"] = product_id_to_endpoint[ti["productId"]]
366367
del ti["productId"]
367368

368369

@@ -445,7 +446,7 @@ def get_product_api() -> ThriftProductHelper:
445446
ti["taskKind"],
446447
ti["summary"],
447448
ti["status"],
448-
ti["productEndpoint"] or "",
449+
ti.get("productEndpoint", ""),
449450
ti["actorUsername"] or "",
450451
ti["enqueuedAt"] or "",
451452
ti["startedAt"] or "",
@@ -529,7 +530,7 @@ def get_product_api() -> ThriftProductHelper:
529530
ti = task_info_for_print[0]
530531
product_line = \
531532
f" - Product: {ti['productEndpoint']}\n" \
532-
if ti["productEndpoint"] else ""
533+
if "productEndpoint" in ti else ""
533534
user_line = f" - User: {ti['actorUsername']}\n" \
534535
if ti["actorUsername"] else ""
535536
cancel_line = " - Cancelled by administrators!\n" \

web/server/codechecker_server/api/mass_store_run.py

+25-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import sqlalchemy
2222
import tempfile
2323
import time
24-
from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast
24+
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, \
25+
cast
2526
import zipfile
2627
import zlib
2728

@@ -54,7 +55,7 @@
5455
from ..metadata import checker_is_unavailable, MetadataInfoParser
5556
from ..product import Product as ServerProduct
5657
from ..session_manager import SessionManager
57-
from ..task_executors.abstract_task import AbstractTask
58+
from ..task_executors.abstract_task import AbstractTask, TaskCancelHonoured
5859
from ..task_executors.task_manager import TaskManager
5960
from .thrift_enum_helper import report_extended_data_type_str
6061

@@ -591,7 +592,12 @@ def _implementation(self, tm: TaskManager):
591592
f"'{self._product.endpoint}' is in "
592593
"a bad shape!")
593594

594-
m = MassStoreRun(self.data_path / "store_zip",
595+
def __cancel_if_needed():
596+
if tm.should_cancel(self):
597+
raise TaskCancelHonoured(self)
598+
599+
m = MassStoreRun(__cancel_if_needed,
600+
self.data_path / "store_zip",
595601
self._package_context,
596602
tm.configuration_database_session_factory,
597603
self._product,
@@ -615,9 +621,8 @@ class MassStoreRun:
615621
# This is the place where complex implementation logic must go, but be
616622
# careful, there is no way to communicate with the user's client anymore!
617623

618-
# TODO: Poll the task manager at regular points for a cancel signal!
619-
620624
def __init__(self,
625+
graceful_cancel: Callable[[], None],
621626
zip_dir: Path,
622627
package_context,
623628
config_db,
@@ -641,6 +646,7 @@ def __init__(self,
641646
self.__config_db = config_db
642647
self.__package_context = package_context
643648
self.__product = product
649+
self.__graceful_cancel_if_requested = graceful_cancel
644650

645651
self.__mips: Dict[str, MetadataInfoParser] = {}
646652
self.__analysis_info: Dict[str, AnalysisInfo] = {}
@@ -668,10 +674,10 @@ def __store_source_files(
668674
filename_to_hash: Dict[str, str]
669675
) -> Dict[str, int]:
670676
""" Storing file contents from plist. """
671-
672677
file_path_to_id = {}
673678

674679
for file_name, file_hash in filename_to_hash.items():
680+
self.__graceful_cancel_if_requested()
675681
source_file_path = path_for_fake_root(file_name, str(source_root))
676682
LOG.debug("Storing source file: %s", source_file_path)
677683
trimmed_file_path = trim_path_prefixes(
@@ -721,6 +727,7 @@ def __add_blame_info(
721727
with DBSession(self.__product.session_factory) as session:
722728
for subdir, _, files in os.walk(blame_root):
723729
for f in files:
730+
self.__graceful_cancel_if_requested()
724731
blame_file = Path(subdir) / f
725732
file_path = f"/{str(blame_file.relative_to(blame_root))}"
726733
blame_info, remote_url, tracking_branch = \
@@ -1502,6 +1509,7 @@ def get_skip_handler(
15021509
LOG.debug("Parsing input file '%s'", f)
15031510

15041511
report_file_path = os.path.join(root_dir_path, f)
1512+
self.__graceful_cancel_if_requested()
15051513
self.__process_report_file(
15061514
report_file_path, session, run_id,
15071515
file_path_to_id, run_history_time,
@@ -1604,6 +1612,7 @@ def store(self,
16041612
original_zip_size: int,
16051613
time_spent_on_task_preparation: float):
16061614
"""Store run results to the server."""
1615+
self.__graceful_cancel_if_requested()
16071616
start_time = time.time()
16081617

16091618
try:
@@ -1627,12 +1636,14 @@ def store(self,
16271636

16281637
with StepLog(self._name, "Parse 'metadata.json's"):
16291638
for root_dir_path, _, _ in os.walk(report_dir):
1639+
self.__graceful_cancel_if_requested()
16301640
metadata_file_path = os.path.join(
16311641
root_dir_path, 'metadata.json')
16321642

16331643
self.__mips[root_dir_path] = \
16341644
MetadataInfoParser(metadata_file_path)
16351645

1646+
self.__graceful_cancel_if_requested()
16361647
with StepLog(self._name,
16371648
"Store look-up ID for checkers in 'metadata.json'"):
16381649
checkers_in_metadata = {
@@ -1656,10 +1667,16 @@ def store(self,
16561667
session, report_dir, source_root, run_id,
16571668
file_path_to_id, run_history_time)
16581669

1670+
self.__graceful_cancel_if_requested()
16591671
session.commit()
16601672
self.__load_report_ids_for_reports_with_fake_checkers(
16611673
session)
16621674

1675+
# The task should not be cancelled after this point, as the
1676+
# "main" bulk of the modifications to the database had already
1677+
# been committed, and the user would be left with potentially
1678+
# a bunch of "fake checkers" visible in the database.
1679+
16631680
if self.__reports_with_fake_checkers:
16641681
with StepLog(
16651682
self._name,
@@ -1720,6 +1737,8 @@ def store(self,
17201737
LOG.error("Database error! Storing reports to the "
17211738
"database failed: %s", ex)
17221739
raise
1740+
except TaskCancelHonoured:
1741+
raise
17231742
except Exception as ex:
17241743
LOG.error("Failed to store results: %s", ex)
17251744
import traceback

0 commit comments

Comments
 (0)