Skip to content

Commit e30fcc2

Browse files
committed
feat: massStoreRunAsynchronous()
Even though commit d915473 introduced a socket-level TCP keepalive support into the server's implementation, this was observed multiple times to not be enough to **deterministically** fix the issues with the `CodeChecker store` client hanging indefinitely when the server takes a long time processing the to-be-stored data. The underlying reasons are not entirely clear and the issue only pops up sporadically, but we did observe a few similar scenarios (such as multi-million report storage from analysing LLVM and then storing between datacentres) where it almost reliably reproduces. The symptoms (even with a configure `kepalive`) generally include the server not becoming notified about the client's disconnect, while the client process is hung on a low-level system call `read(4, ...)`, trying to get the Thrift response of `massStoreRun()` from the HTTP socket. Even if the server finishes the storage processing "in time" and sent the Thrift reply, it never reaches the client, which means it never exits from the waiting, which means it keeps either the terminal or, worse, a CI script occupied, blocking execution. This is the "more proper solution" foreshadowed in commit 15af7d8. Implemented the server-side logic to spawn a `MassStoreRun` task and return its token, giving the `massStoreRunAsynchronous()` API call full force. Implemented the client-side logic to use the new `task_client` module and the same logic as `CodeChecker cmd serverside-tasks --await --token TOKEN...` to poll the server for the task's completion and status.
1 parent d42164b commit e30fcc2

File tree

8 files changed

+160
-103
lines changed

8 files changed

+160
-103
lines changed

docs/web/user_guide.md

+13
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,19 @@ optional arguments:
480480
is given, the longest match will be removed. You may
481481
also use Unix shell-like wildcards (e.g.
482482
'/*/jsmith/').
483+
--detach Runs `store` in fire-and-forget mode: exit immediately
484+
once the server accepted the analysis reports for
485+
storing, without waiting for the server-side data
486+
processing to conclude. Doing this is generally not
487+
recommended, as the client will never be notified of
488+
potential processing failures, and there is no easy way
489+
to wait for the successfully stored results to become
490+
available server-side for potential further processing
491+
(e.g., `CodeChecker cmd diff`). However, using
492+
'--detach' can significantly speed up large-scale
493+
monitoring analyses where access to the results by a
494+
tool is not a goal, such as in the case of non-gating
495+
CI systems.
483496
--config CONFIG_FILE Allow the configuration from an explicit configuration
484497
file. The values configured in the config file will
485498
overwrite the values set in the command line.

web/api/codechecker_api_shared.thrift

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ enum ErrorCode {
3434
API_MISMATCH = 5,
3535

3636
// REMOVED IN API v6.59 (CodeChecker v6.25.0)!
37-
// Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun()
37+
// Previously sent by report_server.thrift/codeCheckerDBAccess::massStoreRun()
3838
// when the client uploaded a source file which contained errors, such as
3939
// review status source-code-comment errors.
4040
/* SOURCE_FILE = 6, */ // Never reuse the value of the enum constant!
4141

4242
// REMOVED IN API v6.59 (CodeChecker v6.25.0)!
43-
// Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun()
43+
// Previously sent by report_server.thrift/codeCheckerDBAccess::massStoreRun()
4444
// when the client uploaded a report with annotations that had invalid types.
4545
/* REPORT_FORMAT = 7, */ // Never reuse the value of the enum constant!
4646
}

web/client/codechecker_client/cmd/store.py

+68-61
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
from threading import Timer
3232
from typing import Dict, Iterable, List, Set, Tuple
3333

34-
from codechecker_api.codeCheckerDBAccess_v6.ttypes import StoreLimitKind
34+
from codechecker_api.codeCheckerDBAccess_v6.ttypes import \
35+
StoreLimitKind, SubmittedRunOptions
3536

3637
from codechecker_report_converter import twodim
3738
from codechecker_report_converter.report import Report, report_file, \
@@ -50,8 +51,8 @@ def assemble_blame_info(_, __) -> int:
5051
"""
5152
raise NotImplementedError()
5253

53-
from codechecker_client import client as libclient
54-
from codechecker_client import product
54+
from codechecker_client import client as libclient, product
55+
from codechecker_client.task_client import await_task_termination
5556
from codechecker_common import arg, logger, cmd_config
5657
from codechecker_common.checker_labels import CheckerLabels
5758
from codechecker_common.compatibility.multiprocessing import Pool
@@ -256,6 +257,24 @@ def add_arguments_to_parser(parser):
256257
"match will be removed. You may also use Unix "
257258
"shell-like wildcards (e.g. '/*/jsmith/').")
258259

260+
parser.add_argument("--detach",
261+
dest="detach",
262+
default=argparse.SUPPRESS,
263+
action="store_true",
264+
required=False,
265+
help="""
266+
Runs `store` in fire-and-forget mode: exit immediately once the server accepted
267+
the analysis reports for storing, without waiting for the server-side data
268+
processing to conclude.
269+
Doing this is generally not recommended, as the client will never be notified
270+
of potential processing failures, and there is no easy way to wait for the
271+
successfully stored results to become available server-side for potential
272+
further processing (e.g., `CodeChecker cmd diff`).
273+
However, using '--detach' can significantly speed up large-scale monitoring
274+
analyses where access to the results by a tool is not a goal, such as in the
275+
case of non-gating CI systems.
276+
""")
277+
259278
cmd_config.add_option(parser)
260279

261280
parser.add_argument('-f', '--force',
@@ -718,7 +737,7 @@ def get_analysis_statistics(inputs, limits):
718737
return statistics_files if has_failed_zip else []
719738

720739

721-
def storing_analysis_statistics(client, inputs, run_name):
740+
def store_analysis_statistics(client, inputs, run_name):
722741
"""
723742
Collects and stores analysis statistics information on the server.
724743
"""
@@ -933,68 +952,56 @@ def main(args):
933952

934953
description = args.description if 'description' in args else None
935954

936-
LOG.info("Storing results to the server...")
937-
938-
try:
939-
with _timeout_watchdog(timedelta(hours=1),
940-
signal.SIGUSR1):
941-
client.massStoreRun(args.name,
942-
args.tag if 'tag' in args else None,
943-
str(context.version),
944-
b64zip,
945-
'force' in args,
946-
trim_path_prefixes,
947-
description)
948-
except WatchdogError as we:
949-
LOG.warning("%s", str(we))
950-
951-
# Showing parts of the exception stack is important here.
952-
# We **WANT** to see that the timeout happened during a wait on
953-
# Thrift reading from the TCP connection (something deep in the
954-
# Python library code at "sock.recv_into").
955-
import traceback
956-
_, _, tb = sys.exc_info()
957-
frames = traceback.extract_tb(tb)
958-
first, last = frames[0], frames[-2]
959-
formatted_frames = traceback.format_list([first, last])
960-
fmt_first, fmt_last = formatted_frames[0], formatted_frames[1]
961-
LOG.info("Timeout was triggered during:\n%s", fmt_first)
962-
LOG.info("Timeout interrupted this low-level operation:\n%s",
963-
fmt_last)
964-
965-
LOG.error("Timeout!"
966-
"\n\tThe server's reply did not arrive after "
967-
"%d seconds (%s) elapsed since the server-side "
968-
"processing began."
969-
"\n\n\tThis does *NOT* mean that there was an issue "
970-
"with the run you were storing!"
971-
"\n\tThe server might still be processing the results..."
972-
"\n\tHowever, it is more likely that the "
973-
"server had already finished, but the client did not "
974-
"receive a response."
975-
"\n\tUsually, this is caused by the underlying TCP "
976-
"connection failing to signal a low-level disconnect."
977-
"\n\tClients potentially hanging indefinitely in these "
978-
"scenarios is an unfortunate and known issue."
979-
"\n\t\tSee http://github.com/Ericsson/codechecker/"
980-
"issues/3672 for details!"
981-
"\n\n\tThis error here is a temporary measure to ensure "
982-
"an infinite hang is replaced with a well-explained "
983-
"timeout."
984-
"\n\tA more proper solution will be implemented in a "
985-
"subsequent version of CodeChecker.",
986-
we.timeout.total_seconds(), str(we.timeout))
987-
sys.exit(1)
955+
LOG.info("Storing results to the server ...")
956+
task_token: str = client.massStoreRunAsynchronous(
957+
b64zip,
958+
SubmittedRunOptions(
959+
runName=args.name,
960+
tag=args.tag if "tag" in args else None,
961+
version=str(context.version),
962+
force="force" in args,
963+
trimPathPrefixes=trim_path_prefixes,
964+
description=description)
965+
)
966+
LOG.info("Reports submitted to the server for processing.")
988967

989-
# Storing analysis statistics if the server allows them.
990968
if client.allowsStoringAnalysisStatistics():
991-
storing_analysis_statistics(client, args.input, args.name)
992-
993-
LOG.info("Storage finished successfully.")
969+
store_analysis_statistics(client, args.input, args.name)
970+
971+
if "detach" in args:
972+
LOG.warning("Exiting the 'store' subcommand as '--detach' was "
973+
"specified: not waiting for the result of the store "
974+
"operation.\n"
975+
"The server might not have finished processing "
976+
"everything at this point, so do NOT rely on querying "
977+
"the results just yet!\n"
978+
"To await the completion of the processing later, "
979+
"you can execute:\n\n"
980+
"\tCodeChecker cmd serverside-tasks --token %s "
981+
"--await",
982+
task_token)
983+
# Print the token to stdout as well, so scripts can use "--detach"
984+
# meaningfully.
985+
print(task_token)
986+
return
987+
988+
task_client = libclient.setup_task_client(protocol, host, port)
989+
task_status: str = await_task_termination(LOG, task_token,
990+
task_api_client=task_client)
991+
992+
if task_status == "COMPLETED":
993+
LOG.info("Storing the reports finished successfully.")
994+
else:
995+
LOG.error("Storing the reports failed! "
996+
"The job terminated in status '%s'. "
997+
"The comments associated with the failure are:\n\n%s",
998+
task_status,
999+
task_client.getTaskInfo(task_token).comments)
1000+
sys.exit(1)
9941001
except Exception as ex:
9951002
import traceback
9961003
traceback.print_exc()
997-
LOG.info("Storage failed: %s", str(ex))
1004+
LOG.error("Storing the reports failed: %s", str(ex))
9981005
sys.exit(1)
9991006
finally:
10001007
os.close(zip_file_handle)

web/client/codechecker_client/helpers/results.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,12 @@ def massStoreRun(self, name, tag, version, zipdir, force,
182182
pass
183183

184184
@thrift_client_call
185-
def massStoreRunAsynchronous(self, zipfile_blob: str,
186-
store_opts: ttypes.SubmittedRunOptions) \
187-
-> str:
188-
pass
185+
def massStoreRunAsynchronous(
186+
self,
187+
zipfile_blob: str,
188+
store_opts: ttypes.SubmittedRunOptions
189+
) -> str:
190+
raise NotImplementedError("Should have called Thrift code!")
189191

190192
@thrift_client_call
191193
def allowsStoringAnalysisStatistics(self):

web/client/codechecker_client/task_client.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ def _transform_product_ids_to_endpoints(
363363
product.id: product.endpoint
364364
for product
365365
in get_product_api().getProducts(None, None)}
366+
ti["productEndpoint"] = product_id_to_endpoint[ti["productId"]]
366367
del ti["productId"]
367368

368369

@@ -445,7 +446,7 @@ def get_product_api() -> ThriftProductHelper:
445446
ti["taskKind"],
446447
ti["summary"],
447448
ti["status"],
448-
ti["productEndpoint"] or "",
449+
ti.get("productEndpoint", ""),
449450
ti["actorUsername"] or "",
450451
ti["enqueuedAt"] or "",
451452
ti["startedAt"] or "",
@@ -529,7 +530,7 @@ def get_product_api() -> ThriftProductHelper:
529530
ti = task_info_for_print[0]
530531
product_line = \
531532
f" - Product: {ti['productEndpoint']}\n" \
532-
if ti["productEndpoint"] else ""
533+
if "productEndpoint" in ti else ""
533534
user_line = f" - User: {ti['actorUsername']}\n" \
534535
if ti["actorUsername"] else ""
535536
cancel_line = " - Cancelled by administrators!\n" \

web/server/codechecker_server/api/mass_store_run.py

+26-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import sqlalchemy
2222
import tempfile
2323
import time
24-
from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast
24+
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, \
25+
cast
2526
import zipfile
2627
import zlib
2728

@@ -54,7 +55,7 @@
5455
from ..metadata import checker_is_unavailable, MetadataInfoParser
5556
from ..product import Product as ServerProduct
5657
from ..session_manager import SessionManager
57-
from ..task_executors.abstract_task import AbstractTask
58+
from ..task_executors.abstract_task import AbstractTask, TaskCancelHonoured
5859
from ..task_executors.task_manager import TaskManager
5960
from .thrift_enum_helper import report_extended_data_type_str
6061

@@ -591,7 +592,13 @@ def _implementation(self, tm: TaskManager):
591592
f"'{self._product.endpoint}' is in "
592593
"a bad shape!")
593594

594-
m = MassStoreRun(self.data_path / "store_zip",
595+
def __cancel_if_needed():
596+
tm.heartbeat(self)
597+
if tm.should_cancel(self):
598+
raise TaskCancelHonoured(self)
599+
600+
m = MassStoreRun(__cancel_if_needed,
601+
self.data_path / "store_zip",
595602
self._package_context,
596603
tm.configuration_database_session_factory,
597604
self._product,
@@ -615,9 +622,8 @@ class MassStoreRun:
615622
# This is the place where complex implementation logic must go, but be
616623
# careful, there is no way to communicate with the user's client anymore!
617624

618-
# TODO: Poll the task manager at regular points for a cancel signal!
619-
620625
def __init__(self,
626+
graceful_cancel: Callable[[], None],
621627
zip_dir: Path,
622628
package_context,
623629
config_db,
@@ -641,6 +647,7 @@ def __init__(self,
641647
self.__config_db = config_db
642648
self.__package_context = package_context
643649
self.__product = product
650+
self.__graceful_cancel_if_requested = graceful_cancel
644651

645652
self.__mips: Dict[str, MetadataInfoParser] = {}
646653
self.__analysis_info: Dict[str, AnalysisInfo] = {}
@@ -668,10 +675,10 @@ def __store_source_files(
668675
filename_to_hash: Dict[str, str]
669676
) -> Dict[str, int]:
670677
""" Storing file contents from plist. """
671-
672678
file_path_to_id = {}
673679

674680
for file_name, file_hash in filename_to_hash.items():
681+
self.__graceful_cancel_if_requested()
675682
source_file_path = path_for_fake_root(file_name, str(source_root))
676683
LOG.debug("Storing source file: %s", source_file_path)
677684
trimmed_file_path = trim_path_prefixes(
@@ -721,6 +728,7 @@ def __add_blame_info(
721728
with DBSession(self.__product.session_factory) as session:
722729
for subdir, _, files in os.walk(blame_root):
723730
for f in files:
731+
self.__graceful_cancel_if_requested()
724732
blame_file = Path(subdir) / f
725733
file_path = f"/{str(blame_file.relative_to(blame_root))}"
726734
blame_info, remote_url, tracking_branch = \
@@ -1502,6 +1510,7 @@ def get_skip_handler(
15021510
LOG.debug("Parsing input file '%s'", f)
15031511

15041512
report_file_path = os.path.join(root_dir_path, f)
1513+
self.__graceful_cancel_if_requested()
15051514
self.__process_report_file(
15061515
report_file_path, session, run_id,
15071516
file_path_to_id, run_history_time,
@@ -1604,6 +1613,7 @@ def store(self,
16041613
original_zip_size: int,
16051614
time_spent_on_task_preparation: float):
16061615
"""Store run results to the server."""
1616+
self.__graceful_cancel_if_requested()
16071617
start_time = time.time()
16081618

16091619
try:
@@ -1627,12 +1637,14 @@ def store(self,
16271637

16281638
with StepLog(self._name, "Parse 'metadata.json's"):
16291639
for root_dir_path, _, _ in os.walk(report_dir):
1640+
self.__graceful_cancel_if_requested()
16301641
metadata_file_path = os.path.join(
16311642
root_dir_path, 'metadata.json')
16321643

16331644
self.__mips[root_dir_path] = \
16341645
MetadataInfoParser(metadata_file_path)
16351646

1647+
self.__graceful_cancel_if_requested()
16361648
with StepLog(self._name,
16371649
"Store look-up ID for checkers in 'metadata.json'"):
16381650
checkers_in_metadata = {
@@ -1656,10 +1668,16 @@ def store(self,
16561668
session, report_dir, source_root, run_id,
16571669
file_path_to_id, run_history_time)
16581670

1671+
self.__graceful_cancel_if_requested()
16591672
session.commit()
16601673
self.__load_report_ids_for_reports_with_fake_checkers(
16611674
session)
16621675

1676+
# The task should not be cancelled after this point, as the
1677+
# "main" bulk of the modifications to the database had already
1678+
# been committed, and the user would be left with potentially
1679+
# a bunch of "fake checkers" visible in the database.
1680+
16631681
if self.__reports_with_fake_checkers:
16641682
with StepLog(
16651683
self._name,
@@ -1720,6 +1738,8 @@ def store(self,
17201738
LOG.error("Database error! Storing reports to the "
17211739
"database failed: %s", ex)
17221740
raise
1741+
except TaskCancelHonoured:
1742+
raise
17231743
except Exception as ex:
17241744
LOG.error("Failed to store results: %s", ex)
17251745
import traceback

0 commit comments

Comments
 (0)