Skip to content

Commit

Permalink
handle and expression check contraints
Browse files Browse the repository at this point in the history
  • Loading branch information
rpiazza committed Dec 13, 2024
1 parent fab7199 commit 2b55b4d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any

from sqlalchemy import Boolean, Float, Integer, String, Text, create_engine
from sqlalchemy import Boolean, CheckConstraint, Float, Integer, String, Text, create_engine
from sqlalchemy_utils import create_database, database_exists, drop_database

from stix2.base import (
Expand Down Expand Up @@ -122,9 +122,12 @@ def determine_stix_type(stix_object):
def array_allowed():
return False

@staticmethod
def create_regex_constraint_expression(column, pattern):
pass
def create_regex_constraint_expression(self, column_name, pattern):
return CheckConstraint(self.create_regex_constraint_clause(column_name, pattern))

def create_regex_constraint_and_expression(self, clause1, clause2):
return (CheckConstraint("((" + self.create_regex_constraint_clause(clause1[0], clause1[1]) + ") AND (" +
self.create_regex_constraint_clause(clause2[0], clause2[1]) + "))"))

def process_value_for_insert(self, stix_type, value):
sql_type = stix_type.determine_sql_type(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def determine_sql_type_for_timestamp_property(): # noqa: F811
def array_allowed():
return False

@staticmethod
def create_regex_constraint_expression(column_name, pattern):
return CheckConstraint(f"{column_name} REGEXP {pattern}")
def create_regex_constraint_clause(self, column_name, pattern):
return f"{column_name} REGEXP {pattern}"


Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,5 @@ def determine_sql_type_for_timestamp_property(): # noqa: F811
def array_allowed():
return True

@staticmethod
def create_regex_constraint_expression(column_name, pattern):
return CheckConstraint(f"{column_name} ~ {pattern}")
def create_regex_constraint_clause(self, column_name, pattern):
return f"{column_name} ~ {pattern}"
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ def determine_sql_type_for_timestamp_property(): # noqa: F811
def array_allowed():
return False

@staticmethod
def create_regex_constraint_expression(column_name, pattern):
def create_regex_constraint_expression(self, column_name, pattern):
return None

def create_regex_constraint_and_expression(self, clause1, clause2):
return None

@staticmethod
Expand Down
12 changes: 6 additions & 6 deletions stix2/datastore/relational_db/table_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def create_ref_table(metadata, db_backend, specifics, table_name, foreign_key_na
def create_hashes_table(name, metadata, db_backend, schema_name, table_name, key_type=Text, level=1):
columns = list()
# special case, perhaps because its a single embedded object with hashes, and not a list of embedded object
# making the parent table's primary key does seem to worl
# making the parent table's primary key does seem to work

columns.append(
Column(
Expand Down Expand Up @@ -720,13 +720,13 @@ def ref_column(name, specifics, db_backend, auth_type=0):
types = "|".join(specifics)
if auth_type == 0:
reg_ex = f"'^({types})" + "--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$'" # noqa: F811
constraint = db_backend.create_regex_constraint_expression(name, reg_ex)
constraint = db_backend.create_regex_constraint_expression(name, reg_ex)
else:
reg_ex = "'--[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$')"
# constraint = \
# CheckConstraint(db_backend.create_regex_constraint_expression(f"NOT({name}", f"'^({types})'") + " AND " +
# db_backend.create_regex_constraint_expression(name, reg_ex))
return Column(name, db_backend.determine_sql_type_for_reference_property()) # , constraint)
constraint = \
db_backend.create_regex_constraint_and_expression((f"NOT({name}", f"'^({types})'"),
(name, reg_ex))
return Column(name, db_backend.determine_sql_type_for_reference_property(), constraint) # , constraint)
else:
return Column(
name,
Expand Down

0 comments on commit 2b55b4d

Please sign in to comment.