Skip to content

Commit

Permalink
Update Capirca to generate unique field-set terms by eliminating dupl…
Browse files Browse the repository at this point in the history
…icates, and then replace the removed terms with the original ones within the policy match condition.

PiperOrigin-RevId: 718441286
  • Loading branch information
Capirca Team committed Jan 22, 2025
1 parent deabaf2 commit 49fb654
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 14 deletions.
100 changes: 86 additions & 14 deletions capirca/lib/arista_tp.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,10 @@ def _GenPrefixFieldset(self, direction, name, pfxs, ex_pfxs, af):
field_list += (" " * 6) + "%s\n" % p
for p in ex_pfxs:
field_list += (" " * 6) + "except %s\n" % p

fieldset_hdr = ("field-set " + af + " prefix " + direction + "-" +
("%s" % name) + "\n")
fieldset_name = "%s-%s" % (direction, name)
fieldset_hdr = ("field-set " + af + " prefix " + fieldset_name + "\n")
field_set = fieldset_hdr + field_list
return field_set
return fieldset_name, field_set

def _TranslatePolicy(self, pol, exp_info):
self.arista_traffic_policies = []
Expand All @@ -753,7 +752,8 @@ def _TranslatePolicy(self, pol, exp_info):

term_names = set()
new_terms = [] # list of generated terms
policy_field_sets = [] # list of generated field-sets
# Dictionary of generated field-sets with field-set name used as the key.
policy_field_sets = dict()
policy_counters = set() # set of the counters in the policy

# default to mixed policies
Expand Down Expand Up @@ -904,10 +904,10 @@ def _TranslatePolicy(self, pol, exp_info):

# if there are no addresses to match, don't generate a field-set
if (src_addr or src_addr_ex) and (src_addr_ex or field_set):
fs = self._GenPrefixFieldset(
name, fs = self._GenPrefixFieldset(
"src", term.name, src_addr, src_addr_ex, af_map_txt[ft]
)
policy_field_sets.append(fs)
policy_field_sets[name] = fs

dst_addr, dst_addr_ex = self._MinimizePrefixes(
term.GetAddressOfVersion("destination_address", self._AF_MAP[ft]),
Expand All @@ -917,10 +917,10 @@ def _TranslatePolicy(self, pol, exp_info):
)

if (dst_addr or dst_addr_ex) and (dst_addr_ex or field_set):
fs = self._GenPrefixFieldset(
name, fs = self._GenPrefixFieldset(
"dst", term.name, dst_addr, dst_addr_ex, af_map_txt[ft]
)
policy_field_sets.append(fs)
policy_field_sets[name] = fs

# generate the unique list of named counters
if term.counter:
Expand All @@ -934,6 +934,63 @@ def _TranslatePolicy(self, pol, exp_info):
(header, filter_name, filter_type, new_terms, policy_counters,
policy_field_sets))

@staticmethod
def _remove_duplicate_field_sets(field_sets):
"""Removes duplicate field-sets and maps duplicate names to the original names.
Args:
field_sets (dict): A dictionary where keys are field-set names and values
are the field-set content as strings.
Returns:
tuple: A tuple containing:
- A list of unique field-set content.
- A dictionary mapping duplicate field-set names to original names.
Example:
If a policy has field-sets with identical prefixes, the function will
return only the unique field-sets.
!
field-set ipv6 prefix src-ipv6-term-1
2001:4860:4860::8888/128
2001:4860:4860::8844/128
!
field-set ipv6 prefix src-ipv6-term-2
2001:4860:4860::8888/128
2001:4860:4860::8844/128
!
Result:
!
field-set ipv6 prefix src-ipv6-term-1
2001:4860:4860::8888/128
2001:4860:4860::8844/128
!
"""

# Key: field-set name (str), Value: Set() of prefixes.
unique_field_sets = dict()
# Key: duplicate field-set name, Value: Existing field-set name.
field_sets_to_rename = dict()

for curr_name, field_set_pfxs in field_sets.items():
# Extract the prefixes from the field-set (lines excluding the header).
current_prefixes = tuple(
line.strip() for line in field_set_pfxs.splitlines()[1:]
)

for existing_name, existing_prefixes in unique_field_sets.items():
if current_prefixes == existing_prefixes:
# Found a duplicate; Mark the field-set for reanming.
field_sets_to_rename[curr_name] = existing_name
break
else:
# No duplicate found; add to unique field-sets.
unique_field_sets[curr_name] = current_prefixes

return (
[field_sets[name] for name in unique_field_sets.keys()],
field_sets_to_rename,
)

def __str__(self):
config = Config()

Expand All @@ -948,22 +1005,37 @@ def __str__(self):
# add the header information
config.Append("", "traffic-policies")

# duplicate field-sets to be renamed in the traffic policy.
field_sets_to_rename = dict()
if field_sets:
for fs in field_sets:
config.Append(" ", fs)
config.Append(" ", "!")
# Remove duplicate field-sets if any.
unique_field_sets, field_sets_to_rename = (
self._remove_duplicate_field_sets(field_sets)
)
for fs in unique_field_sets:
config.Append(INDENT_STR, fs)
config.Append(INDENT_STR, "!")

config.Append(" ", "no traffic-policy %s" % filter_name)
config.Append(" ", "traffic-policy %s" % filter_name)
config.Append(INDENT_STR, "no traffic-policy %s" % filter_name)
config.Append(INDENT_STR, "traffic-policy %s" % filter_name)

# if there are counters, export the list of counters
if counters:
str_counters = " ".join(counters)
config.Append(" ", "counter %s" % str_counters)

# regex pattern for matching duplicate field-sets names in the policy.
rename_pattern = re.compile("|".join(field_sets_to_rename.keys()))
for term in terms:
term_str = str(term)
if term_str:
if field_sets_to_rename:
# Rename all duplicate field-set references in the policy.
term_str = rename_pattern.sub(
lambda match, rename_field_sets=field_sets_to_rename:
rename_field_sets[match.group()],
term_str,
)
config.Append("", term_str, verbatim=True)

return str(config) + "\n"
87 changes: 87 additions & 0 deletions tests/lib/arista_tp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,20 @@
action:: accept
}
"""
SRC_FIELD_SET_MIXED_1 = """
term FS_TERM_1 {
source-address:: INTERNAL
protocol:: tcp
action:: accept
}
"""
SRC_FIELD_SET_MIXED_2 = """
term FS_TERM_2 {
source-address:: INTERNAL
protocol:: tcp
action:: accept
}
"""

DST_FIELD_SET_INET = """
term FS_INET {
Expand All @@ -486,6 +500,20 @@
action:: accept
}
"""
DST_FIELD_SET_MIXED_3 = """
term FS_TERM_3 {
destination-address:: INTERNAL
protocol:: tcp
action:: accept
}
"""
DST_FIELD_SET_MIXED_4 = """
term FS_TERM_4 {
destination-address:: INTERNAL
protocol:: tcp
action:: accept
}
"""

# this term should not have the logging element rendered
LOGGING_ACCEPT = """
Expand Down Expand Up @@ -884,6 +912,65 @@ def testUsingFieldSetMixed(self):
self.assertIn("field-set ipv6 prefix dst-ipv6-good-term-3", output)
self.assertIn("destination prefix field-set dst-ipv6-good-term-3", output)

def testDuplicateFieldSets(self):
addr_list = list()
for octet in range(0, 256):
net = nacaddr.IP("192.168." + str(octet) + ".64/27")
addr_list.append(net)
for octet in range(0, 256):
net = nacaddr.IPv6(
"2001:db8:1010:" + str(octet) + "::64/64", strict=False
)
addr_list.append(net)
self.naming.GetNetAddr.return_value = addr_list
self.naming.GetServiceByProto.return_value = ["25"]

atp = arista_tp.AristaTrafficPolicy(
policy.ParsePolicy(
GOOD_FIELD_SET_HEADER
+ SRC_FIELD_SET_MIXED_1
+ SRC_FIELD_SET_MIXED_2
+ DST_FIELD_SET_MIXED_3
+ DST_FIELD_SET_MIXED_4,
self.naming,
),
EXP_INFO,
)

output = str(atp)
# Assertion for term matches.
self.assertIn("match FS_TERM_1 ipv4", output)
self.assertIn("match ipv6-FS_TERM_1 ipv6", output)
self.assertIn("match FS_TERM_2 ipv4", output)
self.assertIn("match ipv6-FS_TERM_2 ipv6", output)
self.assertIn("match FS_TERM_3 ipv4", output)
self.assertIn("match ipv6-FS_TERM_3 ipv6", output)
self.assertIn("match FS_TERM_4 ipv4", output)
self.assertIn("match ipv6-FS_TERM_4 ipv6", output)

# Assertion for unique field sets.
self.assertIn("field-set ipv4 prefix src-FS_TERM_1", output)
self.assertIn("field-set ipv6 prefix src-ipv6-FS_TERM_1", output)

# Assertion for the presence of duplicate field sets.
self.assertNotIn("field-set ipv4 prefix src-FS_TERM_2", output, output)
self.assertNotIn("field-set ipv6 prefix src-ipv6-FS_TERM_2", output)
self.assertNotIn("field-set ipv4 prefix src-FS_TERM_3", output, output)
self.assertNotIn("field-set ipv6 prefix src-ipv6-FS_TERM_3", output)
self.assertNotIn("field-set ipv4 prefix src-FS_TERM_4", output, output)
self.assertNotIn("field-set ipv6 prefix src-ipv6-FS_TERM_4", output)

# Assertion for the presence of duplicate field set references in the
# traffic-policy.
self.assertIn("source prefix field-set src-FS_TERM_1", output)
self.assertIn("source prefix field-set src-ipv6-FS_TERM_1", output)
self.assertNotIn("field-set src-FS_TERM_2", output)
self.assertNotIn("field-set src-ipv6-FS_TERM_2", output)
self.assertNotIn("field-set dst-FS_TERM_3", output)
self.assertNotIn("field-set dst-ipv6-FS_TERM_3", output)
self.assertNotIn("field-set dst-FS_TERM_4", output)
self.assertNotIn("field-set dst-ipv6-FS_TERM_4", output)

def testTermTypeIndexKeys(self):
# ensure an _INET entry for each _TERM_TYPE entry
self.assertCountEqual(
Expand Down

0 comments on commit 49fb654

Please sign in to comment.