Skip to content
214 changes: 183 additions & 31 deletions src/verifai/features/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,19 +964,26 @@ def distance(self, valueA, valueB):
else:
return self.distanceMetric(valueA, valueB)

@cached_property
def fixedDomains(self):
@staticmethod
def _timeExpandDomain(domain, timeBound):
return domain

def fixedDomains(self, timeBound):
"""Return the fixed-length Domains associated with this feature."""
timeExpandedDomain = self._timeExpandDomain(self.domain, timeBound)

if not self.lengthDomain:
return self.domain
domains = {}
for length in self.lengthDomain:
length = length[0]
domains[length] = Array(self.domain, (length,))
domains = timeExpandedDomain
else:
domains = {}
for length in self.lengthDomain:
length = length[0]
domains[length] = Array(timeExpandedDomain, (length,))

return domains

def __repr__(self):
rep = f'Feature({self.domain}'
rep = f'{self.__class__.__name__}({self.domain}'
if self.distribution is not None:
rep += f', distribution={self.distribution}'
if self.lengthDomain is not None:
Expand All @@ -987,6 +994,11 @@ def __repr__(self):
rep += f', distanceMetric={self.distanceMetric}'
return rep + ')'

class TimeSeriesFeature(Feature):
@staticmethod
def _timeExpandDomain(domain, timeBound):
return Array(domain, (timeBound,))

### Feature spaces

class FeatureSpace:
Expand All @@ -1001,32 +1013,46 @@ class FeatureSpace:
})
"""

def __init__(self, features, distanceMetric=None):
def __init__(self, features, distanceMetric=None, timeBound=0):
self.namedFeatures = tuple(sorted(features.items(), key=lambda i: i[0]))
self.featureNamed = OrderedDict(self.namedFeatures)
self.features = tuple(self.featureNamed.values())
self.makePoint = namedtuple('SpacePoint', self.featureNamed.keys())

self.staticFeatureNamed = OrderedDict({name: feat for name, feat in self.featureNamed.items()
if not isinstance(feat, TimeSeriesFeature)})
self.dynamicFeatureNamed = OrderedDict({name: feat for name, feat in self.featureNamed.items()
if isinstance(feat, TimeSeriesFeature)})

# self.makePoint = namedtuple('SpacePoint', self.featureNamed)
self.makeStaticPoint = namedtuple('StaticSpacePoint', self.staticFeatureNamed)
self.makeDynamicPoint = namedtuple('DynamicSpacePoint', self.dynamicFeatureNamed)

self.distanceMetric = distanceMetric
self.timeBound = timeBound

if len(self.dynamicFeatureNamed) > 0 and self.timeBound == 0:
raise RuntimeError("FeatureSpace which includes TimeSeriesFeature has no timeBound.")

@cached_property
def domains(self):
"""Return the domain or domains associated with this space.
"""Return the expanded domain or domains associated with this space.

Returns a pair consisting of the Domain of all lengths of feature
lists, plus a dict mapping each (flattened) point in that Domain to the
corresponding Domain of other features. If the FeatureSpace has no
feature lists, then returns (None, dom) where dom is the fixed Domain
of all features.
of all features. If any Features are TimeSeriesFeatures then they are
expanded to the a max of timeBound.
"""
fixedDomains = {}
lengthDomains = {}
variableDomains = {}
for name, feature in self.namedFeatures:
if feature.lengthDomain:
lengthDomains[name] = feature.lengthDomain
variableDomains[name] = feature.fixedDomains
variableDomains[name] = feature.fixedDomains(self.timeBound)
else:
fixedDomains[name] = feature.domain
fixedDomains[name] = feature._timeExpandDomain(feature.domain, self.timeBound)
if len(lengthDomains) == 0:
return (None, Struct(fixedDomains))
lengthDomain = Struct(lengthDomains)
Expand Down Expand Up @@ -1056,16 +1082,19 @@ def flatten(self, point, fixedDimension=False):
"""Flatten a point in this space. See Domain.flatten.

If fixedDimension is True, the point is flattened out as if all feature
lists had their maximum lengths, with None as a placeholder. This means
that all points in the space will flatten to the same length.
lists had their maximum lengths and time steps, with None as a placeholder.
This means that all points in the space will flatten to the same length.
"""
from verifai.samplers.feature_sampler import Sample
assert isinstance(point, Sample)

flattened = []
for feature, value in zip(self.features, point):
for feature, value in zip(self.staticFeatureNamed.values(), point.staticSample):
domain = feature.domain
if feature.lengthDomain:
length = len(value)
flattened.append(length)
fixedDomain = feature.fixedDomains[length]
fixedDomain = feature.fixedDomains(self.timeBound)[length]
fixedDomain.flattenOnto(value, flattened)
if fixedDimension: # add padding to maximum length
sizePerElt = domain.flattenedDimension
Expand All @@ -1074,6 +1103,39 @@ def flatten(self, point, fixedDimension=False):
flattened.append(None)
else:
domain.flattenOnto(value, flattened)

flattened.append(len(point.dynamicSampleHistory))

for feature_i, feature in enumerate(self.dynamicFeatureNamed.values()):
if feature.lengthDomain:
length = len(value)
else:
length = None

flattened.append(length)

for dynamic_point in point.dynamicSampleHistory:
value = dynamic_point[feature_i]
domain = feature.domain

if length is None:
domain.flattenOnto(value, flattened)
else:
fixedDomain = feature.fixedDomains(self.timeBound)[length]
fixedDomain.flattenOnto(value, flattened)
if fixedDimension:
sizePerElt = domain.flattenedDimension
needed = (feature.maxLength - length) * sizePerElt
for i in range(needed):
flattened.append(None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pull this code out into a helper function (can be local, doesn't need to be a method on the class) and use it in the loop for static features too? Most of the code is identical.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We can leave this to later, it's not that important)


if fixedDimension:
needed = (self.space.timeBound - len(point.dynamicSampleHistory)) * length * sizePerElt
flattened += [None for _ in range(needed)]

flattened_point = tuple(flattened)
if fixedDimension:
assert len(flattened_point) == self.fixedFlattenedDimension
return tuple(flattened)

@cached_property
Expand All @@ -1083,13 +1145,15 @@ def fixedFlattenedDimension(self):
Also an upper bound on the length of the vector returned by flatten
by default, when fixedDimension=False."""
dim = 0
dim += 1 # Timesteps
for feature in self.features:
domain = feature.domain
timeMult = self.timeBound if isinstance(feature, TimeSeriesFeature) else 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thought probably best left for later: maybe we should move all the flattening logic into new Feature._flatten and Feature._flattenedDimension methods. Then this kind of case split could be handled by overrides in TimeSeriesFeature instead of needing to have special cases here.

if feature.lengthDomain:
dim += 1 # dimension storing length of the feature list
dim += feature.maxLength * domain.flattenedDimension
dim += timeMult * feature.maxLength * domain.flattenedDimension
else:
dim += domain.flattenedDimension
dim += timeMult * domain.flattenedDimension
return dim

def meaningOfFlatCoordinate(self, index, pointName='point'):
Expand All @@ -1100,33 +1164,61 @@ def meaningOfFlatCoordinate(self, index, pointName='point'):
have different meaning depending on the lengths of feature lists.
"""
assert 0 <= index < self.fixedFlattenedDimension
for name, feature in self.namedFeatures:
for name, feature in self.staticFeatureNamed.items():
domain = feature.domain
if feature.lengthDomain:
if index == 0:
return f'len({pointName}.{name})'
return f'len({pointName}.staticSample.{name})'
else:
index -= 1
elem = index // domain.flattenedDimension
if elem < feature.maxLength:
subIndex = index % domain.flattenedDimension
subPoint = f'{pointName}.{name}[{elem}]'
subPoint = f'{pointName}.staticSample.{name}[{elem}]'
return domain.meaningOfFlatCoordinate(subIndex,
pointName=subPoint)
index -= feature.maxLength * domain.flattenedDimension
else:
if index < domain.flattenedDimension:
subPoint = f'{pointName}.{name}'
subPoint = f'{pointName}.staticSample.{name}'
return domain.meaningOfFlatCoordinate(index,
pointName=subPoint)
index -= domain.flattenedDimension

if index == 0:
return f'len({pointName}.dynamicSampleHistory)'
index -= 1

for name, feature in self.dynamicFeatureNamed.items():
domain = feature.domain
for time_i in range(self.timeBound):
if feature.lengthDomain:
if index == 0:
return f'len({pointName}.dynamicSampleHistory[{time_i}].{name})'
else:
index -= 1
elem = index // domain.flattenedDimension
if elem < feature.maxLength:
subIndex = index % domain.flattenedDimension
subPoint = f'{pointName}.dynamicSampleHistory[{time_i}].{name}[{elem}]'
return domain.meaningOfFlatCoordinate(subIndex,
pointName=subPoint)
index -= feature.maxLength * domain.flattenedDimension
else:
if index < domain.flattenedDimension:
subPoint = f'{pointName}.dynamicSampleHistory[{time_i}].{name}'
return domain.meaningOfFlatCoordinate(index,
pointName=subPoint)
index -= domain.flattenedDimension

raise RuntimeError('impossible index arithmetic')

def pandasIndexForFlatCoordinate(self, index):
"""Pandas index of a coordinate of a flattened point in this space.

See meaningOfFlatCoordinate, and Domain.pandasIndexForFlatCoordinate.
"""
#TODO Update
assert 0 <= index < self.fixedFlattenedDimension
for name, feature in self.namedFeatures:
domain = feature.domain
Expand Down Expand Up @@ -1154,7 +1246,7 @@ def coordinateIsNumerical(self, index):
See meaningOfFlatCoordinate, and Domain.coordinateIsNumerical.
"""
assert 0 <= index < self.fixedFlattenedDimension
for name, feature in self.namedFeatures:
for name, feature in self.staticFeatureNamed.items():
domain = feature.domain
if feature.lengthDomain:
if index == 0:
Expand All @@ -1170,26 +1262,86 @@ def coordinateIsNumerical(self, index):
if index < domain.flattenedDimension:
return domain.coordinateIsNumerical(index)
index -= domain.flattenedDimension

if index == 0:
return True
index -= 1

for name, feature in self.dynamicFeatureNamed.items():
domain = feature.domain
for time_i in range(self.timeBound):
if feature.lengthDomain:
if index == 0:
return True
else:
index -= 1
elem = index // domain.flattenedDimension
if elem < feature.maxLength:
subIndex = index % domain.flattenedDimension
return domain.coordinateIsNumerical(subIndex)
index -= feature.maxLength * domain.flattenedDimension
else:
if index < domain.flattenedDimension:
return domain.coordinateIsNumerical(index)
index -= domain.flattenedDimension

raise RuntimeError('impossible index arithmetic')

def unflatten(self, coords, fixedDimension=False):
"""Unflatten a tuple of coordinates to a point in this space."""
values = []
from verifai.samplers.feature_sampler import LateFeatureSample

staticValues = []
iterator = iter(coords)
for feature in self.features:

for feature in self.staticFeatureNamed.values():
domain = feature.domain
if feature.lengthDomain:
length = next(iterator)
fixedDomain = feature.fixedDomains[length]
values.append(fixedDomain.unflattenIterator(iterator))
fixedDomain = feature.fixedDomains(self.timeBound)[length]
staticValues.append(fixedDomain.unflattenIterator(iterator))
if fixedDimension: # consume padding
sizePerElt = domain.flattenedDimension
needed = (feature.maxLength - length) * sizePerElt
for i in range(needed):
next(iterator)
else:
values.append(domain.unflattenIterator(iterator))
return self.makePoint(*values)
staticValues.append(domain.unflattenIterator(iterator))

staticSample = self.makeStaticPoint(*staticValues)

timeSteps = next(iterator)

dynamicValuesList = [[] for _ in range(timeSteps)]

for feature in self.dynamicFeatureNamed.values():
domain = feature.domain
length = next(iterator)

for time_i in range(timeSteps):


if length is None:
dynamicValuesList[time_i].append(domain.unflattenIterator(iterator))
else:
fixedDomain = feature.fixedDomains(self.timeBound)[length]
dynamicValuesList[time_i].append(fixedDomain.unflattenIterator(iterator))
if fixedDimension: # consume padding
sizePerElt = domain.flattenedDimension
needed = (feature.maxLength - length) * sizePerElt
for i in range(needed):
next(iterator)

if fixedDimension:
needed = (self.space.timeBound - timeSteps) * length * sizePerElt
for i in range(needed):
next(iterator)

dynamicSampleList = [self.makeDynamicPoint(*dynamicValues) for dynamicValues in dynamicValuesList]

updateCallback = lambda rho: None

return LateFeatureSample(space=None, staticSample=staticSample, dynamicSampleList=dynamicSampleList, updateCallback=updateCallback)

def __repr__(self):
rep = f'FeatureSpace({self.featureNamed}'
Expand Down
26 changes: 14 additions & 12 deletions src/verifai/samplers/domain_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,24 @@ def update(self, sample, info, rho):
"""
pass

def nextSample(self, feedback=None):
"""Generate the next sample, given feedback from the last sample.

This exists only for backwards-compatibility. It has been replaced by
the getSample and update APIs.
"""
if self.last_sample is not None:
self.update(self.last_sample, self.last_info, feedback)
self.last_sample, self.last_info = self.getSample()
return self.last_sample
# TODO: Deprecate
# def nextSample(self, feedback=None):
# """Generate the next sample, given feedback from the last sample.

# This exists only for backwards-compatibility. It has been replaced by
# the getSample and update APIs.
# """
# if self.last_sample is not None:
# self.update(self.last_sample, self.last_info, feedback)
# self.last_sample, self.last_info = self.getSample()
# return self.last_sample

def __iter__(self):
try:
feedback = None
while True:
feedback = yield self.nextSample(feedback)
sample, info = self.getSample()
rho = yield sample
self.update(sample, info, rho)
except TerminationException:
return

Expand Down
Loading