Skip to content

Commit

Permalink
Updating gcp generators to prune IPv4 addresses in IPv6 format.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 483795147
  • Loading branch information
Capirca Team committed Oct 25, 2022
1 parent 20577b4 commit d95c786
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 360 deletions.
3 changes: 2 additions & 1 deletion capirca/lib/gce.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def IsDefaultDeny(term):
return True



def GetNextPriority(priority):
"""Get the priority for the next rule."""
return priority
Expand Down Expand Up @@ -247,9 +246,11 @@ def ConvertToDict(self):

saddrs = sorted(self.term.GetAddressOfVersion('source_address', term_af),
key=ipaddress.get_mixed_type_key)
saddrs = gcp.FilterIPv4InIPv6FormatAddrs(saddrs)
daddrs = sorted(
self.term.GetAddressOfVersion('destination_address', term_af),
key=ipaddress.get_mixed_type_key)
daddrs = gcp.FilterIPv4InIPv6FormatAddrs(daddrs)

# If the address got filtered out and is empty due to address family, we
# don't render the term. At this point of term processing, the direction
Expand Down
30 changes: 27 additions & 3 deletions capirca/lib/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Base class for GCP firewalling products.
"""

import ipaddress
import json
import re

Expand Down Expand Up @@ -34,9 +35,7 @@ class Term(aclgenerator.Term):
# https://cloud.google.com/vpc/docs/firewalls#protocols_and_ports
# 'all' is needed for the dedault deny, it should not be used in a pol file.
_ALLOW_PROTO_NAME = frozenset(
['tcp', 'udp', 'icmp', 'esp', 'ah', 'ipip', 'sctp',
'all'
])
['tcp', 'udp', 'icmp', 'esp', 'ah', 'ipip', 'sctp', 'all'])

def _GetPorts(self):
"""Return a port or port range in string format."""
Expand Down Expand Up @@ -165,3 +164,28 @@ def GetIpv6TermName(term_name):
"""

return '%s-%s' % (term_name, 'v6')


def FilterIPv4InIPv6FormatAddrs(addrs):
"""Returns addresses of the appropriate Address Family.
Args:
addrs: list of IP addresses.
Returns:
list of filtered IPs with no IPv4 in IPv6 format addresses.
"""
filtered = []
for addr in addrs:
ipaddr = ipaddress.ip_interface(addr).ip
if isinstance(ipaddr, ipaddress.IPv6Address):
ipv6 = ipaddress.IPv6Address(ipaddr)
# Check if it's an IPv4-mapped or 6to4 address.
if ipv6.ipv4_mapped is not None or ipv6.sixtofour is not None:
continue
# Check if it's an IPv4-compatible address.
if ipv6.packed.hex(
)[:24] == '000000000000000000000000' and not ipv6.is_unspecified:
continue
filtered += [addr]
return filtered
4 changes: 3 additions & 1 deletion capirca/lib/gcp_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def ConvertToDict(self, priority_index):

if self.term.direction == 'EGRESS':
daddrs = self.term.GetAddressOfVersion('destination_address', ip_version)
daddrs = gcp.FilterIPv4InIPv6FormatAddrs(daddrs)

# If the address got filtered out and is empty due to address family, we
# don't render the term. At this point of term processing, the direction
Expand Down Expand Up @@ -267,7 +268,8 @@ def ConvertToDict(self, priority_index):
rules.append(rule)
priority_index += 1
else:
saddrs = self.term.GetAddressOfVersion('source_address', ip_version)
saddrs = gcp.FilterIPv4InIPv6FormatAddrs(
self.term.GetAddressOfVersion('source_address', ip_version))

# If the address got filtered out and is empty due to address family, we
# don't render the term. At this point of term processing, the direction
Expand Down
Loading

0 comments on commit d95c786

Please sign in to comment.