Skip to content

Commit

Permalink
Rework chbench snapshot generation (#5330)
Browse files Browse the repository at this point in the history
This PR introduces a number of changes around how we generate snapshots
from chbench, for use with the ingest tests. First, our snapshot
generation code was never 100% complete, as it never wrote out the
source_offsets (I had created it by hand and it was part of the S3
archive). Snapshot_view_states now captures the offsets for each source.

Second, offsets.json hardcoded the view_name to topic name. Instead of
duplicating this information, add code to Peeker to archive the
configuration used to generate the snapshot. Included in this
configuration file are the sources that comprise each view (currently
expected to be a 1 to 1 mapping for our ingest benchmarks).

Third, the TOML config file for Peeker is interpreted by Peeker (it
substitutes environment variables) and cannot be directly loaded by
other programming languages. To fix this, I added a method to Peeker to
write out the computed version of the configuration (and loaded it into
TOML to validate the config).

Fourth, the TOML config file for Peeker could not be serialized due to
sleep_ms being a Duration field and not being the last field in the
struct. I moved this field to the end of each struct where mentioned,
but this created a second problem. Our deserializer converted a single
string into a Duration object, which is serialized as a table, thereby
breaking symmetry in serialize / deserialize. I removed the custom
deserialization so that we can load the serialized file using the same
deserialization logic.

I think we can write a SQL query / some recursive logic to
programmatically get the source names for each view, but this current PR
works for saving snapshots and archiving the Peeker config file.
  • Loading branch information
cirego authored Jan 16, 2021
1 parent fade91d commit 76228ce
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 75 deletions.
10 changes: 6 additions & 4 deletions demo/chbench/mzcompose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ services:
command: ${PEEKER_CMD:---queries q01,q02,q17}
volumes:
- ./peeker-config:/etc/peeker
- type: bind
source: ${MZ_CHBENCH_SNAPSHOT:-/tmp}
target: /snapshot
test-correctness:
# NOTE: we really don't want to include depends_on, it causes dependencies to be restarted
mzbuild: test-correctness
Expand Down Expand Up @@ -380,10 +383,6 @@ mzworkflows:
- step: run
service: mzutil
command: snapshot_view_states.py
# Output summary information about the topics
- step: run
service: kafka-util
command: summarize_topics.py

# Run a workflow to measure the ingest performance of Materialize. Assumes that the you have
# already called setup-ingest-benchmark and the cluster is still running
Expand All @@ -401,6 +400,9 @@ mzworkflows:
# Take a snapshot of topic schemas and contents
generate-snapshot:
steps:
- step: run
service: peeker
command: --write-config /snapshot/config.toml
- step: workflow
workflow: setup-ingest-benchmark
- step: run
Expand Down
1 change: 1 addition & 0 deletions misc/mzutil/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
psycopg2-binary==2.8.6
toml==0.10.2
31 changes: 31 additions & 0 deletions misc/mzutil/scripts/snapshot_view_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import argparse
import os
import sys
import typing

import psycopg2 # type: ignore
Expand All @@ -34,6 +35,16 @@ def view_names(
yield row[0]


def source_names(
conn: psycopg2.extensions.connection,
) -> typing.Generator[str, None, None]:
"""Return a generator containing all sources in Materialize."""
with conn.cursor() as cursor:
cursor.execute("SELECT source_name FROM mz_source_info")
for row in cursor:
yield row[0]


def snapshot_materialize_views(args: argparse.Namespace) -> None:
"""Record the current table status of all views installed in Materialize."""

Expand All @@ -46,6 +57,25 @@ def snapshot_materialize_views(args: argparse.Namespace) -> None:
cursor.copy_expert(query, outfile)


def snapshot_source_offsets(args: argparse.Namespace) -> None:
"""Record the current topic offset of all sources installed in Materialize."""

with psycopg2.connect(f"postgresql://{args.host}:{args.port}/materialize") as conn:
for source in source_names(conn):
with conn.cursor() as cursor:
query = "SELECT mz_source_info.offset as offset FROM mz_source_info WHERE source_name = %s"
cursor.execute(query, (source,))

if cursor.rowcount != 1:
print(f"ERROR: Expected one row for {source}: {cursor.fetchall()}")
sys.exit(1)

viewfile = os.path.join(args.snapshot_dir, f"{source}.offset")
with open(viewfile, "w") as outfile:
offset = cursor.fetchone()[0]
outfile.write(f"{offset}")


def main() -> None:
"""Parse arguments and snapshot materialized views."""

Expand All @@ -67,6 +97,7 @@ def main() -> None:

args = parser.parse_args()
snapshot_materialize_views(args)
snapshot_source_offsets(args)


if __name__ == "__main__":
Expand Down
67 changes: 59 additions & 8 deletions misc/mzutil/scripts/wait_for_view_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sys
import pathlib
import time
import toml
import typing

import psycopg2 # type: ignore
Expand Down Expand Up @@ -57,15 +58,23 @@ def view_contents(cursor: psycopg2.extensions.cursor, view: str, timestamp: int)
return stream.getvalue().strip()


class SourceInfo:
"""Container class containing information about a source."""

def __init__(self, topic_name: str, offset: int):
self.topic_name = topic_name
self.offset = offset


def source_at_offset(
cursor: psycopg2.extensions.cursor, source_name: str, desired_offset: int
cursor: psycopg2.extensions.cursor, source_info: SourceInfo
) -> typing.Union[None, int]:
"""Return the mz timestamp from a source if it has reached the desired offset."""
query = (
'SELECT timestamp FROM mz_source_info WHERE source_name = %s and "offset" = %s'
)
try:
cursor.execute(query, (source_name, desired_offset))
cursor.execute(query, (source_info.topic_name, source_info.offset))
if cursor.rowcount > 1:
print("ERROR: More than one row returned when querying source offsets:")
for row in cursor:
Expand All @@ -86,14 +95,58 @@ def wait_for_materialize_views(args: argparse.Namespace) -> None:
start_time = time.monotonic()

# Create a dictionary mapping view names (as calculated from the filename) to expected contents
view_snapshots = {
view_snapshots: typing.Dict[str, str] = {
p.stem: p.read_text().strip()
for p in pathlib.Path(args.snapshot_dir).glob("*.sql")
}

source_offsets: typing.Dict[str, int] = {
p.stem: int(p.read_text().strip())
for p in pathlib.Path(args.snapshot_dir).glob("*.offset")
}

# Create a dictionary mapping view names to source name and offset
with open(os.path.join(args.snapshot_dir, "offsets.json")) as fd:
source_offsets = json.load(fd)
view_sources: typing.Dict[str, SourceInfo] = {}
with open(os.path.join(args.snapshot_dir, "config.toml")) as fd:
conf = toml.load(fd)

if len(conf["sources"]) != 1:
print(f"ERROR: Expected just one source block: {conf['sources']}")
sys.exit(1)

source_info = conf["sources"][0]
topic_prefix: str = source_info["topic_namespace"]
source_names: typing.List[str] = source_info["names"]

for query_info in conf["queries"]:

# Ignore views not in this snapshot (they likely have multiple sources...)
view: str = query_info["name"]
if view not in view_snapshots:
continue

sources: typing.List[str] = query_info["sources"]
if len(query_info["sources"]) != 1:
print(
f"ERROR: Expected just one source for view {view}: {query_info['sources']}"
)
sys.exit(1)

source_name: str = query_info["sources"][0]
if source_name not in source_name:
print(
f"ERROR: No matching source {source_name} for view {view}: {source_names}"
)
sys.exit(1)

topic_name = f"{topic_prefix}{source_name}"
if topic_name not in source_offsets:
print(
f"ERROR: Missing offset information for source {topic_name}: {source_offsets}"
)
sys.exit(1)

view_sources[view] = SourceInfo(topic_name, source_offsets[topic_name])

with psycopg2.connect(f"postgresql://{args.host}:{args.port}/materialize") as conn:
installed_views = set(view_names(conn))
Expand All @@ -118,9 +171,7 @@ def wait_for_materialize_views(args: argparse.Namespace) -> None:

# Determine if the source is at the desired offset and identify the
# mz_logical_timestamp associated with the offset
desired_offset = source_offsets[view]["offset"]
source_name = source_offsets[view]["topic"]
timestamp = source_at_offset(cursor, source_name, desired_offset)
timestamp = source_at_offset(cursor, view_sources[view])
if not timestamp:
continue

Expand Down
Loading

0 comments on commit 76228ce

Please sign in to comment.