-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
293 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import uuid | ||
|
||
from app.database import Base | ||
from app.models.event import DistributionLevel | ||
from sqlalchemy import ( | ||
JSON, | ||
Boolean, | ||
Column, | ||
DateTime, | ||
Enum, | ||
ForeignKey, | ||
Integer, | ||
String, | ||
) | ||
from sqlalchemy.dialects.postgresql import UUID | ||
from sqlalchemy.orm import Mapped, mapped_column | ||
|
||
|
||
class Galaxy(Base): | ||
__tablename__ = "galaxies" | ||
|
||
id = Column(Integer, primary_key=True, index=True) | ||
uuid = Column(UUID(as_uuid=True), unique=True, default=uuid.uuid4) | ||
name = Column(String, nullable=False) | ||
type = Column(String, nullable=False) | ||
description = Column(String, nullable=False) | ||
version = Column(Integer, nullable=False) | ||
icon = Column(String, nullable=False) | ||
namespace = Column(String, nullable=False) | ||
enabled = Column(Boolean, nullable=False, default=False) | ||
local_only = Column(Boolean, nullable=False, default=False) | ||
kill_chain_order = Column(JSON, nullable=True, default={}) | ||
default = Column(Boolean, nullable=False, default=False) | ||
org_id = Column(Integer, ForeignKey("organisations.id"), index=True, nullable=False) | ||
orgc_id = Column( | ||
Integer, ForeignKey("organisations.id"), index=True, nullable=False | ||
) | ||
created = Column(DateTime, nullable=False) | ||
modified = Column(DateTime, nullable=False) | ||
distribution: Mapped[DistributionLevel] = mapped_column( | ||
Enum(DistributionLevel, name="distribution_level"), | ||
nullable=False, | ||
default=DistributionLevel.INHERIT_EVENT, | ||
) | ||
# predicates = relationship("TaxonomyPredicate", lazy="subquery") | ||
|
||
|
||
# class TaxonomyPredicate(Base): | ||
# __tablename__ = "taxonomy_predicates" | ||
|
||
# id = Column(Integer, primary_key=True, index=True) | ||
# taxonomy_id = Column( | ||
# Integer, ForeignKey("taxonomies.id"), index=True, nullable=True | ||
# ) | ||
# value = Column(String, nullable=False) | ||
# expanded = Column(String, nullable=False) | ||
# colour = Column(String, nullable=False) | ||
# description = Column(String, nullable=False) | ||
# exclusive = Column(Boolean, nullable=False, default=False) | ||
# numerical_value = Column(Integer, index=True) | ||
|
||
# entries = relationship("TaxonomyEntry", lazy="subquery") | ||
|
||
|
||
# class TaxonomyEntry(Base): | ||
# __tablename__ = "taxonomy_entries" | ||
|
||
# id = Column(Integer, primary_key=True, index=True) | ||
# taxonomy_predicate_id = Column( | ||
# Integer, ForeignKey("taxonomy_predicates.id"), index=True, nullable=True | ||
# ) | ||
# value = Column(String, nullable=False) | ||
# expanded = Column(String, nullable=False) | ||
# colour = Column(String, nullable=False) | ||
# description = Column(String, nullable=False) | ||
# numerical_value = Column(Integer, index=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import json | ||
import os | ||
from datetime import datetime | ||
|
||
from app.models import galaxy as galaxies_models | ||
from app.schemas import galaxy as galaxies_schemas | ||
from app.schemas import user as users_schemas | ||
from fastapi import HTTPException, Query, status | ||
from fastapi_pagination.ext.sqlalchemy import paginate | ||
from sqlalchemy.orm import Session | ||
|
||
|
||
def get_galaxies(db: Session, filter: str = Query(None)) -> galaxies_models.Galaxy: | ||
query = db.query(galaxies_models.Galaxy) | ||
|
||
if filter: | ||
query = query.filter(galaxies_models.Galaxy.namespace.ilike(f"%{filter}%")) | ||
|
||
query = query.order_by(galaxies_models.Galaxy.namespace) | ||
|
||
return paginate( | ||
query, | ||
additional_data={"query": {"filter": filter}}, | ||
) | ||
|
||
|
||
def get_galaxy_by_id(db: Session, galaxy_id: int) -> galaxies_models.Galaxy: | ||
return ( | ||
db.query(galaxies_models.Galaxy) | ||
.filter(galaxies_models.Galaxy.id == galaxy_id) | ||
.first() | ||
) | ||
|
||
|
||
def update_galaxies( | ||
db: Session, user: users_schemas.User | ||
) -> list[galaxies_schemas.Galaxy]: | ||
galaxies = [] | ||
galaxies_dir = "app/submodules/misp-galaxy/galaxies" | ||
|
||
for root, __, files in os.walk(galaxies_dir): | ||
for galaxy_file in files: | ||
if not galaxy_file.endswith(".json"): | ||
continue | ||
|
||
with open(os.path.join(root, galaxy_file)) as f: | ||
galaxy_data = json.load(f) | ||
galaxy = galaxies_models.Galaxy( | ||
name=galaxy_data["name"], | ||
uuid=galaxy_data["uuid"], | ||
namespace=( | ||
galaxy_data["namespace"] | ||
if "namespace" in galaxy_data | ||
else "missing-namespace" | ||
), | ||
version=galaxy_data["version"], | ||
description=galaxy_data["description"], | ||
icon=galaxy_data["icon"], | ||
type=galaxy_data["type"], | ||
kill_chain_order=( | ||
galaxy_data["kill_chain_order"] | ||
if "kill_chain_order" in galaxy_data | ||
else None | ||
), | ||
org_id=user.org_id, | ||
orgc_id=user.org_id, | ||
created=datetime.now(), | ||
modified=datetime.now(), | ||
) | ||
db.add(galaxy) | ||
db.commit() | ||
db.refresh(galaxy) | ||
|
||
galaxies.append(galaxy) | ||
|
||
# db.add(db_entry) | ||
# db.commit() | ||
# db.refresh(db_entry) | ||
|
||
return galaxies | ||
|
||
|
||
def update_galaxy( | ||
db: Session, | ||
galaxy_id: int, | ||
galaxy: galaxies_schemas.GalaxyUpdate, | ||
) -> galaxies_models.Galaxy: | ||
db_galaxy = get_galaxy_by_id(db, galaxy_id=galaxy_id) | ||
|
||
if db_galaxy is None: | ||
raise HTTPException( | ||
status_code=status.HTTP_404_NOT_FOUND, detail="Galaxy not found" | ||
) | ||
|
||
galaxy_patch = galaxy.model_dump(exclude_unset=True) | ||
for key, value in galaxy_patch.items(): | ||
setattr(db_galaxy, key, value) | ||
|
||
db.add(db_galaxy) | ||
db.commit() | ||
db.refresh(db_galaxy) | ||
|
||
return db_galaxy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
from app.auth.auth import get_current_active_user | ||
from app.dependencies import get_db | ||
from app.repositories import galaxies as galaxies_repository | ||
from app.schemas import galaxy as galaxies_schemas | ||
from app.schemas import user as user_schemas | ||
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status | ||
from fastapi_pagination import Page | ||
from fastapi_pagination.customization import CustomizedPage, UseModelConfig | ||
from sqlalchemy.orm import Session | ||
|
||
router = APIRouter() | ||
|
||
Page = CustomizedPage[ | ||
Page, | ||
UseModelConfig(extra="allow"), | ||
] | ||
|
||
|
||
@router.get("/galaxies/", response_model=Page[galaxies_schemas.Galaxy]) | ||
def get_galaxies( | ||
db: Session = Depends(get_db), | ||
user: user_schemas.User = Security( | ||
get_current_active_user, scopes=["galaxies:read"] | ||
), | ||
filter: str = Query(None), | ||
): | ||
return galaxies_repository.get_galaxies(db, filter=filter) | ||
|
||
|
||
@router.get("/galaxies/{galaxy_id}", response_model=galaxies_schemas.Galaxy) | ||
def get_galaxy_by_id( | ||
galaxy_id: int, | ||
db: Session = Depends(get_db), | ||
user: user_schemas.User = Security( | ||
get_current_active_user, scopes=["galaxies:read"] | ||
), | ||
) -> galaxies_schemas.Galaxy: | ||
db_galaxy = galaxies_repository.get_galaxy_by_id(db, galaxy_id=galaxy_id) | ||
if db_galaxy is None: | ||
raise HTTPException( | ||
status_code=status.HTTP_404_NOT_FOUND, detail="Galaxy not found" | ||
) | ||
return db_galaxy | ||
|
||
|
||
@router.post("/galaxies/update", response_model=list[galaxies_schemas.Galaxy]) | ||
def update_galaxies( | ||
db: Session = Depends(get_db), | ||
user: user_schemas.User = Security( | ||
get_current_active_user, scopes=["galaxies:update"] | ||
), | ||
): | ||
return galaxies_repository.update_galaxies(db, user=user) | ||
|
||
|
||
@router.patch("/galaxies/{galaxy_id}", response_model=galaxies_schemas.Galaxy) | ||
def update_galaxy( | ||
galaxy_id: int, | ||
galaxy: galaxies_schemas.GalaxyUpdate, | ||
db: Session = Depends(get_db), | ||
user: user_schemas.User = Security( | ||
get_current_active_user, scopes=["galaxies:update"] | ||
), | ||
) -> galaxies_schemas.Galaxy: | ||
return galaxies_repository.update_galaxy(db=db, galaxy_id=galaxy_id, galaxy=galaxy) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from datetime import datetime | ||
from typing import Optional | ||
from uuid import UUID | ||
|
||
from app.models.event import DistributionLevel | ||
from pydantic import BaseModel, ConfigDict | ||
|
||
|
||
class GalaxyBase(BaseModel): | ||
uuid: Optional[UUID] = None | ||
name: str | ||
type: str | ||
description: str | ||
version: int | ||
icon: str | ||
namespace: str | ||
enabled: bool | ||
local_only: bool | ||
kill_chain_order: Optional[dict] = {} | ||
default: bool | ||
org_id: int | ||
orgc_id: int | ||
created: datetime | ||
modified: datetime | ||
distribution: DistributionLevel | ||
|
||
|
||
class Galaxy(GalaxyBase): | ||
id: int | ||
model_config = ConfigDict(from_attributes=True) | ||
|
||
|
||
class GalaxyUpdate(BaseModel): | ||
default: Optional[bool] = None | ||
enabled: Optional[bool] = None | ||
local_only: Optional[bool] = None |