Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scanpipe/pipelines/deploy_to_develop.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def map_python(self):
symbols.
"""
d2d.map_python_pyx_to_binaries(project=self.project, logger=self.log)
d2d.map_python_protobuf_files(project=self.project, logger=self.log)

def match_directories_to_purldb(self):
"""Match selected directories in PurlDB."""
Expand Down
47 changes: 47 additions & 0 deletions scanpipe/pipes/d2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -2372,3 +2372,50 @@ def map_python_pyx_to_binaries(project, logger=None):
to_resource=matching_elf,
map_type="python_pyx_match",
)


def map_python_protobuf_files(project, logger=None):
"""Map protobuf-generated .py/.pyi files to their source .proto files."""
from_resources = (
project.codebaseresources.files().from_codebase().filter(extension=".proto")
)
to_resources = (
project.codebaseresources.files()
.to_codebase()
.has_no_relation()
.filter(extension__in=[".py", ".pyi"])
)
to_resources_count = to_resources.count()
from_resources_count = from_resources.count()
if not from_resources_count:
return
if not to_resources_count:
return
proto_index = {}
for proto_resource in from_resources:
base_name = proto_resource.name.replace(".proto", "")
proto_index[base_name] = proto_resource
mapped_count = 0
for to_resource in to_resources:
base_name = _extract_protobuf_base_name(to_resource.name)
if base_name and base_name in proto_index:
from_resource = proto_index[base_name]
pipes.make_relation(
from_resource=from_resource,
to_resource=to_resource,
map_type="protobuf_mapping",
extra_data={"protobuf_base_name": base_name},
)
mapped_count += 1


def _extract_protobuf_base_name(filename):
"""Extract the base name from a protobuf-generated filename."""
import re

name_without_ext = filename.rsplit(".", 1)[0]
protobuf_pattern = r"^(.+)_pb[23]$"
match = re.match(protobuf_pattern, name_without_ext)
if match:
return match.group(1)
return None
3 changes: 2 additions & 1 deletion scanpipe/pipes/d2d_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class EcosystemConfig:
),
"Python": EcosystemConfig(
ecosystem_option="Python",
source_symbol_extensions=[".pyx", ".pxd"],
source_symbol_extensions=[".pyx", ".pxd", ".py", ".pyi"],
matchable_resource_extensions=[".py", ".pyi"],
),
}

Expand Down
95 changes: 95 additions & 0 deletions scanpipe/tests/pipes/test_d2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -2155,3 +2155,98 @@ def test_scanpipe_d2d_load_ecosystem_config(self):
expected_extra_data = json.load(f)

self.assertEqual(expected_extra_data, asdict(pipeline.ecosystem_config))

def test_scanpipe_pipes_d2d_extract_protobuf_base_name(self):
"""Test the protobuf base name extraction function."""
test_cases = [
("command_request_pb2.py", "command_request"),
("connection_request_pb2.pyi", "connection_request"),
("response_pb2.py", "response"),
("user_pb3.py", "user"),
("data_pb2.pyi", "data"),
("regular_file.py", None),
("not_protobuf.pyi", None),
("pb2_standalone.py", None),
]
for filename, expected in test_cases:
with self.subTest(filename=filename):
result = d2d._extract_protobuf_base_name(filename)
self.assertEqual(expected, result)

def test_scanpipe_pipes_d2d_map_python_protobuf_files(self):
"""Test protobuf file mapping functionality."""
from1 = make_resource_file(
self.project1,
path="from/valkey_glide-2.0.1/glide-core/src/protobuf/command_request.proto",
)
from2 = make_resource_file(
self.project1,
path="from/valkey_glide-2.0.1/glide-core/src/protobuf/connection_request.proto",
)
from3 = make_resource_file(
self.project1,
path="from/valkey_glide-2.0.1/glide-core/src/protobuf/response.proto",
)
to1 = make_resource_file(
self.project1,
path="to/glide/protobuf/command_request_pb2.py",
)
to2 = make_resource_file(
self.project1,
path="to/glide/protobuf/command_request_pb2.pyi",
)
to3 = make_resource_file(
self.project1,
path="to/glide/protobuf/connection_request_pb2.py",
)
to4 = make_resource_file(
self.project1,
path="to/glide/protobuf/connection_request_pb2.pyi",
)
to5 = make_resource_file(
self.project1,
path="to/glide/protobuf/response_pb2.py",
)
to6 = make_resource_file(
self.project1,
path="to/glide/protobuf/response_pb2.pyi",
)
d2d.map_python_protobuf_files(self.project1)
relations = self.project1.codebaserelations.filter(map_type="protobuf_mapping")
self.assertEqual(6, relations.count())
expected_mappings = [
(from1, to1, "command_request"),
(from1, to2, "command_request"),
(from2, to3, "connection_request"),
(from2, to4, "connection_request"),
(from3, to5, "response"),
(from3, to6, "response"),
]
for from_resource, to_resource, expected_base_name in expected_mappings:
relation = relations.filter(
from_resource=from_resource, to_resource=to_resource
).first()
self.assertIsNotNone(relation)
self.assertEqual(
expected_base_name, relation.extra_data["protobuf_base_name"]
)

def test_scanpipe_pipes_d2d_map_python_protobuf_files_no_proto_files(self):
"""Test protobuf mapping when no .proto files exist."""
make_resource_file(
self.project1,
path="to/glide/protobuf/command_request_pb2.py",
)
d2d.map_python_protobuf_files(self.project1)
relations = self.project1.codebaserelations.filter(map_type="protobuf_mapping")
self.assertEqual(0, relations.count())

def test_scanpipe_pipes_d2d_map_python_protobuf_files_no_py_files(self):
"""Test protobuf mapping when no .py/.pyi files exist."""
make_resource_file(
self.project1,
path="from/valkey_glide-2.0.1/glide-core/src/protobuf/command_request.proto",
)
d2d.map_python_protobuf_files(self.project1)
relations = self.project1.codebaserelations.filter(map_type="protobuf_mapping")
self.assertEqual(0, relations.count())