Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
b2be019
(fix) Update path to start_postgres_docker.py
skuenzli Mar 1, 2025
5759d75
(feat) Add a database config 'type' that DBOS can switch on to use th…
skuenzli Mar 1, 2025
2af2fe4
Add pymysql driver
skuenzli Mar 1, 2025
d6f227b
(feat) Rough-out creating the system database for MySQL.
skuenzli Mar 1, 2025
326963b
Create the application and system databases
skuenzli Mar 1, 2025
d78ede4
Define a test_mysql module with a simple workflow to drive bootstrapp…
skuenzli Mar 1, 2025
363de06
Remove unused import.
skuenzli Mar 1, 2025
6eea614
Initialize MySQL migrations directory.
skuenzli Mar 1, 2025
aecfb4e
Hack initial workflow execution into MySQL
skuenzli Mar 2, 2025
9ea90c7
Force MySQL (test) config to use the 'dbos' database/schema because M…
skuenzli Mar 2, 2025
ecd2dc1
Provide mysql implementation for recording transaction outputs
skuenzli Mar 2, 2025
956d6c6
Add test_simple_workflow_attempts_counter from test_dbos
skuenzli Mar 2, 2025
172a30a
Expand workflow_uuid column length to support up to a billion child w…
skuenzli Mar 2, 2025
2d37907
Replace use of postgres dialect (pg) with standard sqlalchemy (sa) fo…
skuenzli Mar 2, 2025
d7d4369
Only start notification listener for PostgresQL because MySQL doesn't…
skuenzli Mar 2, 2025
50a725b
Properly qualify the query for the workflow status by workflow id.
skuenzli Mar 2, 2025
70fbc0a
Adjust expected row results for update on conflict to 2
skuenzli Mar 2, 2025
d2c4f63
Expand workflow_uuid column length to support up to a billion child w…
skuenzli Mar 2, 2025
ba7d65a
Fix expression for getting the current txn id in MySQL
skuenzli Mar 2, 2025
41c8fb3
Log failed inserts into transaction output and errors
skuenzli Mar 2, 2025
1fecc63
Port test_dbos::test_exception_workflow to test_mysql
skuenzli Mar 2, 2025
017941a
Port test_dbos::test_temp_workflow to test_mysql
skuenzli Mar 2, 2025
de1030f
Port test_dbos::test_temp_workflow_errors to test_mysql
skuenzli Mar 2, 2025
b738ad2
Port test_dbos::test_recovery_workflow to test_mysql
skuenzli Mar 2, 2025
e7357a3
Port test_dbos::test_recovery_workflow_step to test_mysql
skuenzli Mar 2, 2025
fbac105
Port test_dbos::test_workflow_returns_none to test_mysql
skuenzli Mar 2, 2025
10a0f8c
Port test_dbos::test_recovery_temp_workflow to test_mysql
skuenzli Mar 2, 2025
bf1e149
Port test_dbos::test_recovery_thread to test_mysql
skuenzli Mar 2, 2025
1e1110a
Port test_dbos::test_start_workflow to test_mysql
skuenzli Mar 2, 2025
e11eab1
Port test_dbos test_retrieve_workflow, test_retrieve_workflow_in_work…
skuenzli Mar 2, 2025
28dd567
Extract function to detect foreign key integrity violations and add s…
skuenzli Mar 2, 2025
569e0a1
Extract methods to receive messages to enable adding support for MySQL
skuenzli Mar 2, 2025
cb5e613
Implement basic support for sending and receiving notifications on My…
skuenzli Mar 2, 2025
f5346f2
Port test_dbos::test_send_recv_temp_wf to test_mysql
skuenzli Mar 3, 2025
57f4eff
Extract existing impl to insert events to enable support for MySQL
skuenzli Mar 3, 2025
28251d0
Add support for inserting events to MySQL.
skuenzli Mar 3, 2025
50380ab
Port test_dbos::test_nonserializable_values to test_mysql
skuenzli Mar 3, 2025
1fe60c1
Port test_dbos::test_multi_set_event to test_mysql
skuenzli Mar 3, 2025
87c5dbc
Port test_dbos::test_debug_logging to test_mysql
skuenzli Mar 3, 2025
4c00790
Port test_dbos::test_destroy_semantics to test_mysql
skuenzli Mar 3, 2025
35f4177
Port test_dbos::test_double_decoration to test_mysql
skuenzli Mar 3, 2025
84818fb
Port test_dbos::test_app_version to test_mysql
skuenzli Mar 3, 2025
b222e7e
Port test_dbos::test_recovery_appversion to test_mysql
skuenzli Mar 3, 2025
39c63c3
Add support for MySQL in DB wizard
skuenzli Mar 5, 2025
0773233
Expand MySQL column lengths to handle real-world values
skuenzli Mar 5, 2025
a3e6025
Support enqueuing in MySQL.
skuenzli Mar 10, 2025
1e76fe5
Build out assertions in test_enqueue.
skuenzli Mar 10, 2025
7eb3a12
Add tests that verify workflow status with very verbose input, output…
skuenzli Mar 10, 2025
82355aa
Align use of Text to UnicodeText throughout SystemSchema.
skuenzli Mar 16, 2025
6811a06
Make name of the system database configurable
skuenzli Mar 22, 2025
2d872a0
Make name of the application database configurable
skuenzli Mar 22, 2025
e799ee6
Document the configure schema functions
skuenzli Mar 26, 2025
52e4ec2
Merge pull request #2 from k9securityio/externalize-db_schema-name-fo…
skuenzli Mar 26, 2025
23ad924
Execute MySQL tests in a 'dbos_mysql' database to demonstrate the dat…
skuenzli Mar 26, 2025
5e514c5
Merge branch 'externalize-db_schema-name-for-mysql' into support-mysql8
skuenzli Mar 26, 2025
b3e34aa
Update ApplicationSchema to use workflow_uuid column definition from …
skuenzli Mar 26, 2025
9287eaf
Remove explicit fastapi CLI dependency so that fastapi[standard] can …
skuenzli Jul 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ one, run:

```bash
export PGPASSWORD=dbos
python3 dbos/_templates/hello/start_postgres_docker.py
python3 dbos/_templates/dbos-db-starter/start_postgres_docker.py
```

A successful test run results in the following output:
Expand Down
2 changes: 1 addition & 1 deletion alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = dbos/_migrations
script_location = dbos/_migrations/mysql

version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
166 changes: 135 additions & 31 deletions dbos/_app_db.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from typing import Optional, TypedDict

import sqlalchemy as sa
import sqlalchemy.dialects.mysql as mysql
import sqlalchemy.dialects.postgresql as pg
from sqlalchemy.exc import DBAPIError
from sqlalchemy.orm import Session, sessionmaker

from ._dbos_config import ConfigFile
from ._error import DBOSWorkflowConflictIDError
from ._schemas.application_database import ApplicationSchema
from ._logger import dbos_logger
from ._schemas._mysql import Expressions
from ._schemas.application_database import (
ApplicationSchema,
configure_application_schema_mysql,
)


class TransactionResultInternal(TypedDict):
Expand All @@ -29,37 +35,70 @@ class ApplicationDatabase:

def __init__(self, config: ConfigFile):
self.config = config
self.db_type = config["database"]["type"]

app_db_name = config["database"]["app_db_name"]
if "postgresql" == self.db_type:
# If the application database does not already exist, create it
postgres_db_url = sa.URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database="postgres",
)
postgres_db_engine = sa.create_engine(postgres_db_url)
with postgres_db_engine.connect() as conn:
conn.execution_options(isolation_level="AUTOCOMMIT")
if not conn.execute(
sa.text("SELECT 1 FROM pg_database WHERE datname=:db_name"),
parameters={"db_name": app_db_name},
).scalar():
conn.execute(sa.text(f"CREATE DATABASE {app_db_name}"))
postgres_db_engine.dispose()

# Create a connection pool for the application database
app_db_url = sa.URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database=app_db_name,
)
elif "mysql" == self.db_type:
configure_application_schema_mysql(db_schema_name=app_db_name)
db_url_args = {
"drivername": "mysql+pymysql",
"username": config["database"]["username"],
"password": config["database"]["password"],
"host": config["database"]["hostname"],
"port": config["database"]["port"],
}
mysql_db_url = sa.URL.create(**db_url_args)
engine = sa.create_engine(mysql_db_url)
with engine.connect() as conn:
conn.execution_options(isolation_level="AUTOCOMMIT")
conn.execute(
sa.text(
f"""
CREATE DATABASE IF NOT EXISTS `{app_db_name}`
CHARACTER SET utf8mb4
COLLATE utf8mb4_bin ;
"""
)
)
dbos_logger.info(f"application database exists: {app_db_name}")
engine.dispose()

db_url_args["database"] = app_db_name
app_db_url = sa.URL.create(**db_url_args)
else:
raise RuntimeError(
f"unsupported database type: {config['database']['type']}"
)

# If the application database does not already exist, create it
postgres_db_url = sa.URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database="postgres",
)
postgres_db_engine = sa.create_engine(postgres_db_url)
with postgres_db_engine.connect() as conn:
conn.execution_options(isolation_level="AUTOCOMMIT")
if not conn.execute(
sa.text("SELECT 1 FROM pg_database WHERE datname=:db_name"),
parameters={"db_name": app_db_name},
).scalar():
conn.execute(sa.text(f"CREATE DATABASE {app_db_name}"))
postgres_db_engine.dispose()

# Create a connection pool for the application database
app_db_url = sa.URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database=app_db_name,
)
self.engine = sa.create_engine(
app_db_url, pool_size=20, max_overflow=5, pool_timeout=30
)
Expand All @@ -76,10 +115,22 @@ def __init__(self, config: ConfigFile):
def destroy(self) -> None:
self.engine.dispose()

@staticmethod
def _raise_unsupported_db_type(self):
raise RuntimeError(
f"unsupported database type: {self.db_type} (configured: {self.config['database']['type']})"
)

def record_transaction_output(
session: Session, output: TransactionResultInternal
self, session: Session, output: TransactionResultInternal
) -> None:
if "postgresql" == self.db_type:
self._record_transaction_output_pg(session, output)
elif "mysql" == self.db_type:
self._record_transaction_output_mysql(session, output)
else:
self._raise_unsupported_db_type()

def _record_transaction_output_pg(self, session, output):
try:
session.execute(
pg.insert(ApplicationSchema.transaction_outputs).values(
Expand All @@ -99,7 +150,38 @@ def record_transaction_output(
raise DBOSWorkflowConflictIDError(output["workflow_uuid"])
raise

def _record_transaction_output_mysql(self, session, output):
try:
session.execute(
mysql.insert(ApplicationSchema.transaction_outputs).values(
workflow_uuid=output["workflow_uuid"],
function_id=output["function_id"],
output=output["output"],
error=None,
txn_id=sa.text(
"(SELECT TRX_ID FROM INFORMATION_SCHEMA.INNODB_TRX WHERE TRX_MYSQL_THREAD_ID = CONNECTION_ID())"
),
txn_snapshot=output["txn_snapshot"],
executor_id=(
output["executor_id"] if output["executor_id"] else None
),
)
)
except DBAPIError as dbapi_error:
dbos_logger.warning(
f'error recording transaction output: {output["output"]}; dbapi_error: {dbapi_error}'
)
raise dbapi_error

def record_transaction_error(self, output: TransactionResultInternal) -> None:
if "postgresql" == self.db_type:
self._record_transaction_error_pg(output)
elif "mysql" == self.db_type:
self._record_transaction_error_mysql(output)
else:
self._raise_unsupported_db_type()

def _record_transaction_error_pg(self, output):
try:
with self.engine.begin() as conn:
conn.execute(
Expand All @@ -122,6 +204,28 @@ def record_transaction_error(self, output: TransactionResultInternal) -> None:
raise DBOSWorkflowConflictIDError(output["workflow_uuid"])
raise

def _record_transaction_error_mysql(self, output):
try:
with self.engine.begin() as conn:
conn.execute(
mysql.insert(ApplicationSchema.transaction_outputs).values(
workflow_uuid=output["workflow_uuid"],
function_id=output["function_id"],
output=None,
error=output["error"],
txn_id=sa.text(Expressions.get_current_txid_string),
txn_snapshot=output["txn_snapshot"],
executor_id=(
output["executor_id"] if output["executor_id"] else None
),
)
)
except DBAPIError as dbapi_error:
dbos_logger.warning(
f'error recording transaction error: {output["error"]}; dbapi_error: {dbapi_error}'
)
raise dbapi_error

@staticmethod
def check_transaction_execution(
session: Session, workflow_uuid: str, function_id: int
Expand Down
2 changes: 1 addition & 1 deletion dbos/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
assert (
ctx.sql_session is not None
), "Cannot find a database connection"
ApplicationDatabase.record_transaction_output(
dbos._app_db.record_transaction_output(
ctx.sql_session, txn_output
)
break
Expand Down
64 changes: 41 additions & 23 deletions dbos/_db_wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import typer
import yaml
from rich import print
from sqlalchemy import URL, create_engine, text
from sqlalchemy import URL, Engine, create_engine, text

if TYPE_CHECKING:
from ._dbos_config import ConfigFile
Expand Down Expand Up @@ -171,28 +171,46 @@ def _check_docker_installed() -> bool:


def _check_db_connectivity(config: "ConfigFile") -> Optional[Exception]:
postgres_db_url = URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database="postgres",
query={"connect_timeout": "1"},
)
postgres_db_engine = create_engine(postgres_db_url)
try:
with postgres_db_engine.connect() as conn:
val = conn.execute(text("SELECT 1")).scalar()
if val != 1:
dbos_logger.error(
f"Unexpected value returned from database: expected 1, received {val}"
)
return Exception()
except Exception as e:
return e
finally:
postgres_db_engine.dispose()
database_type = config["database"].get("type", "postgresql")

engine: Optional[Engine] = None
if "postgresql" == database_type:
postgres_db_url = URL.create(
"postgresql+psycopg",
username=config["database"]["username"],
password=config["database"]["password"],
host=config["database"]["hostname"],
port=config["database"]["port"],
database="postgres",
query={"connect_timeout": "1"},
)
engine = create_engine(postgres_db_url)
elif "mysql" == database_type:
db_url_args = {
"drivername": "mysql+pymysql",
"username": config["database"]["username"],
"password": config["database"]["password"],
"host": config["database"]["hostname"],
"port": config["database"]["port"],
}
mysql_db_url = URL.create(**db_url_args)
engine = create_engine(mysql_db_url)

if engine:
try:
with engine.connect() as conn:
val = conn.execute(text("SELECT 1")).scalar()
if val != 1:
dbos_logger.error(
f"Unexpected value returned from database: expected 1, received {val}"
)
return Exception()
except Exception as e:
return e
finally:
engine.dispose()
else:
return Exception(f"Could not create engine for {database_type} database")

return None

Expand Down
13 changes: 7 additions & 6 deletions dbos/_dbos.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,13 @@ def _launch(self) -> None:
self._executor.submit(startup_recovery_thread, self, workflow_ids)

# Listen to notifications
notification_listener_thread = threading.Thread(
target=self._sys_db._notification_listener,
daemon=True,
)
notification_listener_thread.start()
self._background_threads.append(notification_listener_thread)
if self._sys_db.is_notification_listener_enabled():
notification_listener_thread = threading.Thread(
target=self._sys_db._notification_listener,
daemon=True,
)
notification_listener_thread.start()
self._background_threads.append(notification_listener_thread)

# Start flush workflow buffers thread
flush_workflow_buffers_thread = threading.Thread(
Expand Down
1 change: 1 addition & 0 deletions dbos/_dbos_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class RuntimeConfig(TypedDict, total=False):


class DatabaseConfig(TypedDict, total=False):
type: str
hostname: str
port: int
username: str
Expand Down
1 change: 1 addition & 0 deletions dbos/_migrations/mysql/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Generic single-database configuration.
Loading
Loading