Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/resources/schema/mysql/mysql-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,19 @@ CREATE TABLE IF NOT EXISTS snapshot (
meta_ser_manifest TEXT,
meta_payload BLOB,
PRIMARY KEY (persistence_id, sequence_number));

CREATE TABLE IF NOT EXISTS durable_state
(
global_offset BIGINT AUTO_INCREMENT,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
state_payload LONGBLOB NOT NULL,
state_serial_id INTEGER NOT NULL,
state_serial_manifest VARCHAR(255),
tag VARCHAR(255),
state_timestamp BIGINT NOT NULL,
PRIMARY KEY (persistence_id),
UNIQUE KEY state_global_offset_uk (global_offset)
);
CREATE INDEX state_tag_idx on durable_state (tag);
CREATE INDEX state_global_offset_idx on durable_state (global_offset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] add a newline at end of file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

1 change: 1 addition & 0 deletions core/src/main/resources/schema/mysql/mysql-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS event_tag;
DROP TABLE IF EXISTS event_journal;
DROP TABLE IF EXISTS snapshot;
DROP TABLE IF EXISTS durable_state;
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLS
case PostgresProfile => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg)
case SQLServerProfile => new SqlServerSequenceNextValUpdater(profile, durableStateTableCfg)
case OracleProfile => new OracleSequenceNextValUpdater(profile, durableStateTableCfg)
// TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
// case MySQLProfile => new MySQLSequenceNextValUpdater(profile, durableStateTableCfg)
// MySQLProfile uses the AUTO_INCREMENT column feature, so we not extract manually the next `globalOffset` value
case _ => throw new UnsupportedOperationException(s"Unsupported JdbcProfile <$profile> for durableState.")
}

Expand Down Expand Up @@ -87,6 +86,30 @@ import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLS
"""
}

private[jdbc] def replaceDbWithDurableState(row: DurableStateTables.DurableStateRow) = {
sqlu"""REPLACE INTO #${durableStateTableCfg.schemaAndTableName}
(
#${durableStateTableCfg.columnNames.persistenceId},
#${durableStateTableCfg.columnNames.revision},
#${durableStateTableCfg.columnNames.statePayload},
#${durableStateTableCfg.columnNames.stateSerId},
#${durableStateTableCfg.columnNames.stateSerManifest},
#${durableStateTableCfg.columnNames.tag},
#${durableStateTableCfg.columnNames.stateTimestamp}
)
VALUES
(
${row.persistenceId},
${row.revision},
${row.statePayload},
${row.stateSerId},
${row.stateSerManifest},
${row.tag},
#${System.currentTimeMillis()}
)
"""
}

private[jdbc] def getSequenceNextValueExpr() = sequenceNextValUpdater.getSequenceNextValueExpr()

def deleteFromDb(persistenceId: String) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.Try

import slick.jdbc.{ JdbcBackend, JdbcProfile }
import slick.jdbc.{ JdbcBackend, JdbcProfile, MySQLProfile }
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ExtendedActorSystem
Expand Down Expand Up @@ -101,7 +101,11 @@ class JdbcDurableStateStore[A](
Future
.fromTry(row)
.flatMap { r =>
val action = if (revision == 1) insertDurableState(r) else updateDurableState(r)
val action = profile match {
case _: MySQLProfile => replaceDurableState(r)
case _ if revision == 1 => insertDurableState(r)
case _ => updateDurableState(r)
}
db.run(action)
}
.map { rowsAffected =>
Expand Down Expand Up @@ -249,5 +253,11 @@ class JdbcDurableStateStore[A](
} yield u
}

private def replaceDurableState(row: DurableStateTables.DurableStateRow) = {
import queries._

replaceDbWithDurableState(row)
}

def deleteAllFromDb() = db.run(queries.deleteAllFromDb())
}
7 changes: 3 additions & 4 deletions core/src/test/resources/mysql-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ jdbc-read-journal {
slick = ${slick}
}

# TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
# the pekko-persistence-jdbc provider in use for durable state store
#jdbc-durable-state-store {
# slick = ${slick}
#}
jdbc-durable-state-store {
slick = ${slick}
}

slick {
profile = "slick.jdbc.MySQLProfile$"
Expand Down
7 changes: 3 additions & 4 deletions core/src/test/resources/mysql-shared-db-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ jdbc-read-journal {
use-shared-db = "slick"
}

# TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
# the pekko-persistence-jdbc provider in use for durable state store
#jdbc-durable-state-store {
# use-shared-db = "slick"
#}
jdbc-durable-state-store {
use-shared-db = "slick"
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.pekko
import pekko.actor._
import pekko.persistence.jdbc.state.{ MyPayload, OffsetSyntax }
import OffsetSyntax._
import pekko.persistence.jdbc.testkit.internal.{ H2, Oracle, Postgres, SchemaType, SqlServer }
import pekko.persistence.jdbc.testkit.internal.{ H2, MySQL, Oracle, Postgres, SchemaType, SqlServer }
import pekko.persistence.query.{ NoOffset, Offset, Sequence, UpdatedDurableState }
import pekko.stream.scaladsl.Sink
import org.scalatest.time.{ Millis, Seconds, Span }
Expand Down Expand Up @@ -83,9 +83,8 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
e shouldBe an[org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException]
case Postgres =>
e shouldBe an[org.postgresql.util.PSQLException]
// TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
// case MySQL =>
// e shouldBe an[java.sql.SQLIntegrityConstraintViolationException]
case MySQL =>
e shouldBe an[java.sql.SQLIntegrityConstraintViolationException]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'a' not 'an' before 'j'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for MySQL with an inconsistency with the line below (case Oracle)?
Or both for Oracle and SqlServer (not H2 and Postgres)

case Oracle =>
e shouldBe an[java.sql.SQLIntegrityConstraintViolationException]
case SqlServer =>
Expand Down Expand Up @@ -281,7 +280,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
// trick to complete the future
val f = source
.takeWhile { e =>
m += ((e.persistenceId, e.offset.value))
m += e.persistenceId -> e.offset.value
e.offset.value < 12
}
.runWith(Sink.seq)
Expand Down Expand Up @@ -312,7 +311,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
// trick to complete the future
val f = source
.takeWhile { e =>
m += ((e.persistenceId, e.offset.value))
m += e.persistenceId -> e.offset.value
e.offset.value < 12
}
.runWith(Sink.seq)
Expand Down Expand Up @@ -370,7 +369,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
// trick to complete the future
val f = source
.takeWhile { e =>
m += ((e.persistenceId, e.offset.value))
m += e.persistenceId -> e.offset.value
e.offset.value < 21
}
.runWith(Sink.seq)
Expand Down Expand Up @@ -401,7 +400,7 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
// trick to complete the future
val f = source
.takeWhile { e =>
m += ((e.persistenceId, e.offset.value))
m += e.persistenceId -> e.offset.value
e.offset.value < 3060
}
.runWith(Sink.seq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.pekko
import pekko.actor._
import pekko.persistence.jdbc.db.SlickDatabase
import pekko.persistence.jdbc.config._
import pekko.persistence.jdbc.testkit.internal.{ H2, Oracle, Postgres, SchemaType, SqlServer }
import pekko.persistence.jdbc.testkit.internal.{ H2, MySQL, Oracle, Postgres, SchemaType, SqlServer }
import pekko.persistence.jdbc.util.DropCreate
import pekko.serialization.SerializationExtension
import pekko.util.Timeout
Expand All @@ -47,13 +47,11 @@ abstract class StateSpecBase(val config: Config, schemaType: SchemaType)
implicit lazy val e: ExecutionContext = system.dispatcher

private[jdbc] def schemaTypeToProfile(s: SchemaType) = s match {
case H2 => slick.jdbc.H2Profile
case Postgres => slick.jdbc.PostgresProfile
// TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
// case MySQL => slick.jdbc.MySQLProfile
case H2 => slick.jdbc.H2Profile
case Postgres => slick.jdbc.PostgresProfile
case MySQL => slick.jdbc.MySQLProfile
case SqlServer => slick.jdbc.SQLServerProfile
case Oracle => slick.jdbc.OracleProfile
case _ => throw new UnsupportedOperationException(s"Unsupported <$s> for durableState.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this - it causes no harm and is useful if we add a new db type and forget to support it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a new DBMS is added, isn't it easier to have a non-exhaustive match reported by the IDE (and even some plugins) rather than a case _ that would hide it?

}

val customSerializers = ConfigFactory.parseString("""
Expand Down Expand Up @@ -109,11 +107,10 @@ abstract class StateSpecBase(val config: Config, schemaType: SchemaType)
f(system)
} finally {
system.actorSelection("system/" + "pekko-persistence-jdbc-durable-state-sequence-actor").resolveOne().onComplete {
case Success(actorRef) => {
case Success(actorRef) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of reformatting existing code that is not related to the PR

system.stop(actorRef)
Thread.sleep(1000)
system.log.debug(s"Is terminated: ${actorRef.isTerminated}")
}
case Failure(_) =>
system.log.warning("system/" + "-persistence-jdbc-durable-state-sequence-actorsomename" + " does not exist")
}
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The Apache Pekko Persistence JDBC plugin allows for using JDBC-compliant databases as backend for @extref:[Apache Pekko Persistence](pekko:persistence.html) and @extref:[Apache Pekko Persistence Query](pekko:persistence-query.html).

This plugin supports both **Event Sourcing** and **Durable State** persistence modes, but does not support DurableState under the backend of MySQL.
This plugin supports both **Event Sourcing** and **Durable State** persistence modes.

In EventSourcing mode, pekko-persistence-jdbc writes journal and snapshot entries to a configured JDBC store. It also implements the full pekko-persistence-query API and is therefore very useful for implementing DDD-style application models using Apache Pekko and Scala for creating reactive applications.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new files should use the standard Apache header unless there are methods copied in from existing files - where we should use that file's header

eg https://github.com/apache/pekko-persistence-jdbc/blob/main/project/PekkoCoreDependency.scala#L1

* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.integration

import com.typesafe.config.ConfigFactory
import org.apache.pekko.persistence.jdbc.state.scaladsl.{DurableStateStorePluginSpec, DurableStateStoreSchemaPluginSpec}
import slick.jdbc.MySQLProfile

class MySQLDurableStateStorePluginSpec
extends DurableStateStorePluginSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQLProfile) {}

class MySQLDurableStateStorePluginSchemaSpec
extends DurableStateStoreSchemaPluginSpec(ConfigFactory.load("mysql-application.conf"),
MySQLProfile) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.jdbc.integration

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec
import org.apache.pekko.persistence.jdbc.testkit.internal.MySQL

class MySQLScalaJdbcDurableStateStoreQueryTest
extends JdbcDurableStateSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQL) {
implicit lazy val system: ActorSystem =
ActorSystem("MySQLScalaJdbcDurableStateStoreQueryTest", config.withFallback(customSerializers))
}
Loading