diff --git a/pom.xml b/pom.xml index 3579563..9cc790c 100755 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.contrastsecurity cassandra-migration Cassandra Migration - 0.9-SNAPSHOT + 1.0.0-SNAPSHOT Database migration tool for Cassandra @@ -82,6 +82,17 @@ com.datastax.cassandra cassandra-driver-core 3.0.0 + test + + + com.datastax.oss + java-driver-query-builder + 4.14.0 + + + com.datastax.oss + java-driver-core + 4.14.0 junit @@ -175,6 +186,14 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java b/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java index 043b956..83e4817 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java @@ -8,19 +8,21 @@ import com.contrastsecurity.cassandra.migration.config.ScriptsLocations; import com.contrastsecurity.cassandra.migration.dao.SchemaVersionDAO; import com.contrastsecurity.cassandra.migration.info.MigrationInfoService; -import com.contrastsecurity.cassandra.migration.info.MigrationVersion; import com.contrastsecurity.cassandra.migration.logging.Log; import com.contrastsecurity.cassandra.migration.logging.LogFactory; import com.contrastsecurity.cassandra.migration.resolver.CompositeMigrationResolver; import com.contrastsecurity.cassandra.migration.resolver.MigrationResolver; +import com.contrastsecurity.cassandra.migration.utils.StringUtils; import com.contrastsecurity.cassandra.migration.utils.VersionPrinter; -import com.datastax.driver.core.Host; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import sun.reflect.generics.reflectiveObjects.NotImplementedException; -import java.util.List; +import java.net.InetSocketAddress; +import java.util.Collection; public class CassandraMigration { @@ -29,12 +31,19 @@ public class CassandraMigration { private ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); private Keyspace keyspace; private MigrationConfigs configs; + private CqlSession session; public CassandraMigration() { this.keyspace = new Keyspace(); this.configs = new MigrationConfigs(); } + public CassandraMigration(CqlSession session, MigrationConfigs configs) { + this.session = session; + this.keyspace = configs.getKeyspace(); + this.configs = configs; + } + public ClassLoader getClassLoader() { return classLoader; } @@ -66,11 +75,11 @@ private MigrationResolver createMigrationResolver() { public int migrate() { return execute(new Action() { - public Integer execute(Session session) { - new Initialize().run(session, keyspace, MigrationVersion.CURRENT.getTable()); + public Integer execute(CqlSession session) { + new Initialize().run(session, keyspace); MigrationResolver migrationResolver = createMigrationResolver(); - SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); + SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace); Migrate migrate = new Migrate(migrationResolver, configs.getTarget(), schemaVersionDAO, session, keyspace.getCluster().getUsername(), configs.isAllowOutOfOrder()); @@ -81,9 +90,9 @@ public Integer execute(Session session) { public MigrationInfoService info() { return execute(new Action() { - public MigrationInfoService execute(Session session) { + public MigrationInfoService execute(CqlSession session) { MigrationResolver migrationResolver = createMigrationResolver(); - SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); + SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace); MigrationInfoService migrationInfoService = new MigrationInfoService(migrationResolver, schemaVersionDAO, configs.getTarget(), false, true); migrationInfoService.refresh(); @@ -94,21 +103,21 @@ public MigrationInfoService execute(Session session) { } public void validate() { - String validationError = execute(new Action() { - @Override - public String execute(Session session) { - MigrationResolver migrationResolver = createMigrationResolver(); - SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); - Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false); - return validate.run(); - } - }); - - if (validationError != null) { - throw new CassandraMigrationException("Validation failed. " + validationError); - } + String validationError = execute(new Action() { + @Override + public String execute(CqlSession session) { + MigrationResolver migrationResolver = createMigrationResolver(); + SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace); + Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false); + return validate.run(); + } + }); + + if (validationError != null) { + throw new CassandraMigrationException("Validation failed. " + validationError); + } } - + public void baseline() { //TODO throw new NotImplementedException(); @@ -119,11 +128,11 @@ private String getConnectionInfo(Metadata metadata) { sb.append("Connected to cluster: "); sb.append(metadata.getClusterName()); sb.append("\n"); - for (Host host : metadata.getAllHosts()) { + for (Node node : metadata.getNodes().values()) { sb.append("Data center: "); - sb.append(host.getDatacenter()); + sb.append(node.getDatacenter()); sb.append("; Host: "); - sb.append(host.getAddress()); + sb.append(node.getBroadcastAddress()); } return sb.toString(); } @@ -132,38 +141,44 @@ T execute(Action action) { T result; VersionPrinter.printVersion(classLoader); - - com.datastax.driver.core.Cluster cluster = null; - Session session = null; try { if (null == keyspace) throw new IllegalArgumentException("Unable to establish Cassandra session. Keyspace is not configured."); - if (null == keyspace.getCluster()) - throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured."); - - com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder(); - builder.addContactPoints(keyspace.getCluster().getContactpoints()).withPort(keyspace.getCluster().getPort()); - if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) { - if (null != keyspace.getCluster().getPassword() && !keyspace.getCluster().getPassword().trim().isEmpty()) { - builder.withCredentials(keyspace.getCluster().getUsername(), - keyspace.getCluster().getPassword()); - } else { - throw new IllegalArgumentException("Password must be provided with username."); + + if (session == null) { + + if (null == keyspace.getCluster()) + throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured."); + + CqlSessionBuilder cqlSessionBuilder = new CqlSessionBuilder() + .withKeyspace(keyspace.getName()); + if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) { + if (null != keyspace.getCluster().getPassword() && !keyspace.getCluster().getPassword().trim().isEmpty()) { + cqlSessionBuilder.withAuthCredentials(keyspace.getCluster().getUsername(), + keyspace.getCluster().getPassword()); + } else { + throw new IllegalArgumentException("Password must be provided with username."); + } } + if (StringUtils.hasText(keyspace.getCluster().getLocalDatacenter())) { + cqlSessionBuilder.withLocalDatacenter(keyspace.getCluster().getLocalDatacenter()); + } + for (String contactPoint : keyspace.getCluster().getContactpoints()) { + cqlSessionBuilder.addContactPoint(new InetSocketAddress(contactPoint, keyspace.getCluster().getPort())); + } + session = cqlSessionBuilder.build(); } - cluster = builder.build(); - Metadata metadata = cluster.getMetadata(); + Metadata metadata = session.getMetadata(); LOG.info(getConnectionInfo(metadata)); - session = cluster.newSession(); if (null == keyspace.getName() || keyspace.getName().trim().length() == 0) throw new IllegalArgumentException("Keyspace not specified."); - List keyspaces = metadata.getKeyspaces(); + Collection keyspaces = metadata.getKeyspaces().values(); boolean keyspaceExists = false; for (KeyspaceMetadata keyspaceMetadata : keyspaces) { - if (keyspaceMetadata.getName().equalsIgnoreCase(keyspace.getName())) + if (keyspaceMetadata.getName().asInternal().equalsIgnoreCase(keyspace.getName())) keyspaceExists = true; } if (keyspaceExists) @@ -176,20 +191,14 @@ T execute(Action action) { if (null != session && !session.isClosed()) try { session.close(); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("Error closing Cassandra session"); } - if (null != cluster && !cluster.isClosed()) - try { - cluster.close(); - } catch(Exception e) { - LOG.warn("Error closing Cassandra cluster"); - } } return result; } interface Action { - T execute(Session session); + T execute(CqlSession session); } } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/action/Initialize.java b/src/main/java/com/contrastsecurity/cassandra/migration/action/Initialize.java index 1bf0772..04f1ce5 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/action/Initialize.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/action/Initialize.java @@ -2,12 +2,12 @@ import com.contrastsecurity.cassandra.migration.config.Keyspace; import com.contrastsecurity.cassandra.migration.dao.SchemaVersionDAO; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; public class Initialize { - public void run(Session session, Keyspace keyspace, String migrationVersionTableName) { - SchemaVersionDAO dao = new SchemaVersionDAO(session, keyspace, migrationVersionTableName); + public void run(CqlSession session, Keyspace keyspace) { + SchemaVersionDAO dao = new SchemaVersionDAO(session, keyspace); dao.createTablesIfNotExist(); } } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/action/Migrate.java b/src/main/java/com/contrastsecurity/cassandra/migration/action/Migrate.java index ab41a13..23843d6 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/action/Migrate.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/action/Migrate.java @@ -9,7 +9,7 @@ import com.contrastsecurity.cassandra.migration.resolver.MigrationResolver; import com.contrastsecurity.cassandra.migration.utils.StopWatch; import com.contrastsecurity.cassandra.migration.utils.TimeFormat; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; public class Migrate { private static final Log LOG = LogFactory.getLog(Migrate.class); @@ -17,12 +17,12 @@ public class Migrate { private final MigrationVersion target; private final SchemaVersionDAO schemaVersionDAO; private final MigrationResolver migrationResolver; - private final Session session; + private final CqlSession session; private final String user; private final boolean allowOutOfOrder; public Migrate(MigrationResolver migrationResolver, MigrationVersion target, SchemaVersionDAO schemaVersionDAO, - Session session, String user, boolean allowOutOfOrder) { + CqlSession session, String user, boolean allowOutOfOrder) { this.migrationResolver = migrationResolver; this.schemaVersionDAO = schemaVersionDAO; this.session = session; @@ -36,6 +36,7 @@ public int run() { stopWatch.start(); int migrationSuccessCount = 0; + boolean firstTimeMigration = schemaVersionDAO.versionNotFound(); while (true) { final boolean firstRun = migrationSuccessCount == 0; @@ -76,11 +77,18 @@ public int run() { MigrationInfo[] pendingMigrations = infoService.pending(); if (pendingMigrations.length == 0) { + if (infoService.current() != null) { + if (firstTimeMigration) { + schemaVersionDAO.addMigrationVersion(infoService.current().getVersion().getVersion()); + } else if (migrationSuccessCount > 0){ + schemaVersionDAO.updateMigrationVersion(infoService.current().getVersion().getVersion()); + } + } break; } boolean isOutOfOrder = pendingMigrations[0].getVersion().compareTo(currentSchemaVersion) < 0; - MigrationVersion mv = applyMigration(pendingMigrations[0], isOutOfOrder); + MigrationVersion mv = applyMigration(pendingMigrations[0], isOutOfOrder, firstTimeMigration); if(mv == null) { //no more migrations break; @@ -96,11 +104,10 @@ public int run() { return migrationSuccessCount; } - private MigrationVersion applyMigration(final MigrationInfo migration, boolean isOutOfOrder) { + private MigrationVersion applyMigration(final MigrationInfo migration, boolean isOutOfOrder, boolean firstTimeMigration) { MigrationVersion version = migration.getVersion(); LOG.info("Migrating keyspace " + schemaVersionDAO.getKeyspace().getName() + " to version " + version + " - " + migration.getDescription() + (isOutOfOrder ? " (out of order)" : "")); - StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -114,16 +121,21 @@ private MigrationVersion applyMigration(final MigrationInfo migration, boolean i LOG.debug("Successfully completed and committed migration of keyspace " + schemaVersionDAO.getKeyspace().getName() + " to version " + version); } catch (CassandraMigrationException e) { - String failedMsg = "Migration of keyspace " + schemaVersionDAO.getKeyspace().getName() + - " to version " + version + " failed!"; - - LOG.error(failedMsg + " Please restore backups and roll back database and code!"); - stopWatch.stop(); int executionTime = (int) stopWatch.getTotalTimeMillis(); + if (firstTimeMigration) { + AppliedMigration appliedMigration = new AppliedMigration(version, migration.getDescription(), + migration.getType(), migration.getScript(), migration.getChecksum(), user, executionTime, true, firstTimeMigration); + schemaVersionDAO.addAppliedMigration(appliedMigration); + LOG.error("Failed applying migration but since migration is being run first time it will be ignored", e); + return version; + } AppliedMigration appliedMigration = new AppliedMigration(version, migration.getDescription(), migration.getType(), migration.getScript(), migration.getChecksum(), user, executionTime, false); schemaVersionDAO.addAppliedMigration(appliedMigration); + String failedMsg = "Migration of keyspace " + schemaVersionDAO.getKeyspace().getName() + + " to version " + version + " failed!"; + LOG.error(failedMsg + " Please restore backups and roll back database and code!"); throw e; } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/api/JavaMigration.java b/src/main/java/com/contrastsecurity/cassandra/migration/api/JavaMigration.java index 3f6277b..3ab77f3 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/api/JavaMigration.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/api/JavaMigration.java @@ -1,7 +1,7 @@ package com.contrastsecurity.cassandra.migration.api; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; public interface JavaMigration { - void migrate(Session session) throws Exception; + void migrate(CqlSession session) throws Exception; } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/config/Cluster.java b/src/main/java/com/contrastsecurity/cassandra/migration/config/Cluster.java index 89d339b..74065a6 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/config/Cluster.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/config/Cluster.java @@ -30,6 +30,7 @@ public String getDescription() { private int port = 9042; private String username; private String password; + private String localDatacenter; public Cluster() { String contactpointsP = System.getProperty(ClusterProperty.CONTACTPOINTS.getName()); @@ -80,4 +81,12 @@ public String getPassword() { public void setPassword(String password) { this.password = password; } + + public String getLocalDatacenter() { + return localDatacenter; + } + + public void setLocalDatacenter(String localDatacenter) { + this.localDatacenter = localDatacenter; + } } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/config/Keyspace.java b/src/main/java/com/contrastsecurity/cassandra/migration/config/Keyspace.java index 5c14e94..cfb6bf3 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/config/Keyspace.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/config/Keyspace.java @@ -1,13 +1,25 @@ package com.contrastsecurity.cassandra.migration.config; + +import com.contrastsecurity.cassandra.migration.utils.Ensure; + +import static com.contrastsecurity.cassandra.migration.utils.Ensure.notNull; + +/** + * This represents the definition of a key space and is basically + * a builder for the CQL statement that is required to create a keyspace + * before any migration can be executed. + * + * @author Patrick Kranz + */ public class Keyspace { private static final String PROPERTY_PREFIX = "cassandra.migration.keyspace."; public enum KeyspaceProperty { NAME(PROPERTY_PREFIX + "name", "Name of Cassandra keyspace"); - private String name; - private String description; + private final String name; + private final String description; KeyspaceProperty(String name, String description) { this.name = name; @@ -25,6 +37,8 @@ public String getDescription() { private Cluster cluster; private String name; + private boolean durableWrites; + private ReplicationStrategy replicationStrategy; public Keyspace() { cluster = new Cluster(); @@ -41,6 +55,19 @@ public void setCluster(Cluster cluster) { this.cluster = cluster; } + /** + * This creates a new instance of a keyspace using the provided keyspace name. It by default + * uses a {@link SimpleStrategy} for replication and sets durable writes to true. + * These default values can be overwritten by the provided methods. + * + * @param name the name of the keyspace to be used. + */ + public Keyspace(String name) { + this.name = Ensure.notNullOrEmpty(name, "keyspaceName"); + this.replicationStrategy = new SimpleStrategy(); + this.durableWrites = true; + } + public String getName() { return name; } @@ -48,4 +75,28 @@ public String getName() { public void setName(String name) { this.name = name; } + + public Keyspace with(ReplicationStrategy replicationStrategy) { + this.replicationStrategy = notNull(replicationStrategy, "replicationStrategy"); + return this; + } + + public boolean isDurableWrites() { + return durableWrites; + } + + public ReplicationStrategy getReplicationStrategy() { + return replicationStrategy; + } + + public String getCqlStatement() { + StringBuilder builder = new StringBuilder(60); + builder.append("CREATE KEYSPACE IF NOT EXISTS ") + .append(getName()) + .append(" WITH REPLICATION = ") + .append(getReplicationStrategy().createCqlStatement()) + .append(" AND DURABLE_WRITES = ") + .append(Boolean.toString(isDurableWrites())); + return builder.toString(); + } } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/config/MigrationConfigs.java b/src/main/java/com/contrastsecurity/cassandra/migration/config/MigrationConfigs.java index fc477e6..0a41d17 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/config/MigrationConfigs.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/config/MigrationConfigs.java @@ -2,30 +2,9 @@ import com.contrastsecurity.cassandra.migration.info.MigrationVersion; import com.contrastsecurity.cassandra.migration.utils.StringUtils; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; public class MigrationConfigs { - public enum MigrationProperty { - SCRIPTS_ENCODING("cassandra.migration.scripts.encoding", "Encoding for CQL scripts"), - SCRIPTS_LOCATIONS("cassandra.migration.scripts.locations", "Locations of the migration scripts in CSV format"), - ALLOW_OUTOFORDER("cassandra.migration.scripts.allowoutoforder", "Allow out of order migration"), - TARGET_VERSION("cassandra.migration.version.target", "The target version. Migrations with a higher version number will be ignored."); - - private String name; - private String description; - - MigrationProperty(String name, String description) { - this.name = name; - this.description = description; - } - - public String getName() { - return name; - } - - public String getDescription() { - return description; - } - } public MigrationConfigs() { String scriptsEncodingP = System.getProperty(MigrationProperty.SCRIPTS_ENCODING.getName()); @@ -47,6 +26,10 @@ public MigrationConfigs() { } } + private Keyspace keyspace; + + private DefaultConsistencyLevel consistencyLevel = DefaultConsistencyLevel.QUORUM; + /** * The encoding of Cql migration scripts (default: UTF-8) */ @@ -57,6 +40,8 @@ public MigrationConfigs() { */ private String[] scriptsLocations = {"db/migration"}; + private String executionProfile; + /** * Allow out of order migrations (default: false) */ @@ -67,6 +52,20 @@ public MigrationConfigs() { */ private MigrationVersion target = MigrationVersion.LATEST; + private String tablePrefix; + + public MigrationConfigs(Keyspace keyspace) { + this.keyspace = keyspace; + } + + public String getTablePrefix() { + return tablePrefix; + } + + public void setTablePrefix(String tablePrefix) { + this.tablePrefix = tablePrefix; + } + public String getEncoding() { return encoding; } @@ -106,4 +105,65 @@ public MigrationVersion getTarget() { public void setTargetAsString(String target) { this.target = MigrationVersion.fromVersion(target); } + + public String getExecutionProfile() { + return executionProfile; + } + + public void setExecutionProfile(String executionProfile) { + this.executionProfile = executionProfile; + } + + public Keyspace getKeyspace() { + return keyspace; + } + + public void setKeyspace(Keyspace keyspace) { + this.keyspace = keyspace; + } + + public DefaultConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public void setConsistencyLevel(DefaultConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + /** + * Indicates if the underlying configuration is valid. Currently, a configuration is considered + * valid if a keyspace name or an instance of keyspace is provided. + * + * @return true if the configuration is valid, false otherwise. + */ + public boolean isValid() { + return keyspace != null; + } + + + public enum MigrationProperty { + SCRIPTS_ENCODING("cassandra.migration.scripts.encoding", "Encoding for CQL scripts"), + SCRIPTS_LOCATIONS("cassandra.migration.scripts.locations", "Locations of the migration scripts in CSV format"), + ALLOW_OUTOFORDER("cassandra.migration.scripts.allowoutoforder", "Allow out of order migration"), + TARGET_VERSION("cassandra.migration.version.target", "The target version. Migrations with a higher version number will be ignored."), + EXECUTION_PROFILE("cassandra.migration.execution.profile", "Execution Profile"); + + private String name; + private String description; + + MigrationProperty(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + } + + } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/config/NetworkStrategy.java b/src/main/java/com/contrastsecurity/cassandra/migration/config/NetworkStrategy.java new file mode 100644 index 0000000..d184e61 --- /dev/null +++ b/src/main/java/com/contrastsecurity/cassandra/migration/config/NetworkStrategy.java @@ -0,0 +1,52 @@ +package com.contrastsecurity.cassandra.migration.config; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.lang.String.join; +import static java.util.Collections.unmodifiableMap; +import static java.util.stream.Collectors.toSet; +import static com.contrastsecurity.cassandra.migration.utils.Ensure.notNullOrEmpty; + +/** + * @author Patrick Kranz + */ +public class NetworkStrategy implements ReplicationStrategy { + private final Map dataCenters = new HashMap<>(); + + @Override + public String getName() { + return "NetworkTopologyStrategy"; + } + + @Override + public String createCqlStatement() { + if (getDataCenters().isEmpty()) { + throw new IllegalStateException("There has to be at least one datacenter in order to use NetworkTopologyStrategy."); + } + + StringBuilder builder = new StringBuilder(); + builder.append("{") + .append("'class':'").append(getName()).append("',"); + + builder.append(join(",", + dataCenters.keySet().stream().map(dc -> "'" + dc + "':" + dataCenters.get(dc)) + .collect(Collectors.toSet()))); + builder.append("}"); + return builder.toString(); + } + + public NetworkStrategy with(String datacenter, int replicationFactor) { + notNullOrEmpty(datacenter, "datacenter"); + if (replicationFactor < 1) { + throw new IllegalArgumentException("Replication Factor must be greater than zero"); + } + dataCenters.put(datacenter, replicationFactor); + return this; + } + + public Map getDataCenters() { + return unmodifiableMap(dataCenters); + } +} diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/config/ReplicationStrategy.java b/src/main/java/com/contrastsecurity/cassandra/migration/config/ReplicationStrategy.java new file mode 100644 index 0000000..2d16385 --- /dev/null +++ b/src/main/java/com/contrastsecurity/cassandra/migration/config/ReplicationStrategy.java @@ -0,0 +1,10 @@ +package com.contrastsecurity.cassandra.migration.config; + +/** + * @author Patrick Kranz + */ +public interface ReplicationStrategy { + String getName(); + String createCqlStatement(); + +} diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/config/SimpleStrategy.java b/src/main/java/com/contrastsecurity/cassandra/migration/config/SimpleStrategy.java new file mode 100644 index 0000000..4e40b03 --- /dev/null +++ b/src/main/java/com/contrastsecurity/cassandra/migration/config/SimpleStrategy.java @@ -0,0 +1,36 @@ +package com.contrastsecurity.cassandra.migration.config; + +/** + * @author Patrick Kranz + */ +public class SimpleStrategy implements ReplicationStrategy { + private final int replicationFactor; + + public SimpleStrategy(int replicationFactor) { + if (replicationFactor < 1) { + throw new IllegalArgumentException("Replication Factor must be greater than zero"); + } + this.replicationFactor = replicationFactor; + } + + public SimpleStrategy() { + this(1); + } + + @Override + public String getName() { + return "SimpleStrategy"; + } + + public int getReplicationFactor() { + return replicationFactor; + } + + @Override + public String createCqlStatement() { + return "{" + + "'class':'" + getName() + "'," + + "'replication_factor':" + getReplicationFactor() + + "}"; + } +} diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/dao/SchemaVersionDAO.java b/src/main/java/com/contrastsecurity/cassandra/migration/dao/SchemaVersionDAO.java index 9027474..57177da 100755 --- a/src/main/java/com/contrastsecurity/cassandra/migration/dao/SchemaVersionDAO.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/dao/SchemaVersionDAO.java @@ -1,43 +1,108 @@ package com.contrastsecurity.cassandra.migration.dao; +import com.contrastsecurity.cassandra.migration.CassandraMigrationException; import com.contrastsecurity.cassandra.migration.config.Keyspace; +import com.contrastsecurity.cassandra.migration.config.MigrationConfigs; import com.contrastsecurity.cassandra.migration.config.MigrationType; import com.contrastsecurity.cassandra.migration.info.AppliedMigration; import com.contrastsecurity.cassandra.migration.info.MigrationVersion; import com.contrastsecurity.cassandra.migration.logging.Log; import com.contrastsecurity.cassandra.migration.logging.LogFactory; import com.contrastsecurity.cassandra.migration.utils.CachePrepareStatement; -import com.datastax.driver.core.*; -import com.datastax.driver.core.exceptions.InvalidQueryException; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DriverException; +import com.datastax.oss.driver.api.core.cql.*; +import com.datastax.oss.driver.api.core.metadata.Metadata; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import java.util.*; -import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.contrastsecurity.cassandra.migration.utils.Ensure.notNull; +import static java.lang.String.format; public class SchemaVersionDAO { private static final Log LOG = LogFactory.getLog(SchemaVersionDAO.class); private static final String COUNTS_TABLE_NAME_SUFFIX = "_counts"; + private final Keyspace keyspace; + private final String tableName; + private final String keyspaceName; + private final String tableCountName; + private final String tableMigrationVersion = MigrationVersion.TABLE; + private final String executionProfileName; + private final CachePrepareStatement cachePs; + private final CqlSession session; + private ConsistencyLevel consistencyLevel = ConsistencyLevel.QUORUM; - private Session session; - private Keyspace keyspace; - private String tableName; - private CachePrepareStatement cachePs; - private ConsistencyLevel consistencyLevel; + /** + * The name of the table that manages the migration scripts + */ + private static final String SCHEMA_CF = "schema_migration"; + /** + * Statement used to create the table that manages the migrations. + */ + private static final String CREATE_MIGRATION_CF = "CREATE TABLE IF NOT EXISTS %s" + + " (version_rank int, installed_rank int, version text, description text," + + " script text, checksum int, type text, installed_by text, installed_on timestamp, " + + " execution_time int, success boolean, ignored boolean, PRIMARY KEY (version))"; + + /** + * Statement used to create the table that knows the current version + */ + private static final String CREATE_MIGRATION_VERSION = "CREATE TABLE IF NOT EXISTS %s" + + " (version text, PRIMARY KEY (version))"; + + private static final String CREATE_MIGRATION_COUNT_CF = "CREATE TABLE IF NOT EXISTS %s" + + " (name text, count counter, PRIMARY KEY (name))"; + + /** + * Insert statement that logs a migration into the schema_migration table. + */ + private static final String ADD_MIGRATION = "insert into %s" + + "(version_rank, installed_rank, version, description, " + + "type, script, checksum, installed_on, installed_by, execution_time, success, ignored) values" + + "(?, ?, ?, ?, ?, ?, ?, dateOf(now()), ?, ?, ?, ?)"; + private static final String UPDATE_MIGRATION_COUNT = "update %s " + + "set count = count + 1 where name = 'installed_rank'"; + private static final String UPDATE_MIGRATION_VERSION_RANK = "update %s " + + "set version_rank = ? where version = ?"; + private static final String SELECT_COUNT_MIGRATION = "select count from %s " + + "where name = 'installed_rank'"; + private static final String SELECT_MIGRATION = "select version, version_rank from %s"; + private static final String ADD_MIGRATION_VERSION = "insert into %s(version) values(?)"; + private static final String UPDATE_MIGRATION_VERSION = "update %s set version = ?"; - public SchemaVersionDAO(Session session, Keyspace keyspace, String tableName) { - this.session = session; - this.keyspace = keyspace; - this.tableName = tableName; + /** + * The query that retrieves current schema version + */ + private static final String VERSION_QUERY = "select version_rank, installed_rank, version, description, " + + "type, script, checksum, installed_on, installed_by, execution_time, success, ignored from %s"; + + private static final String MIGRATION_VERSION_QUERY = "select version from %s"; + + + public SchemaVersionDAO(CqlSession session, Keyspace keyspace) { + this(session, new MigrationConfigs(keyspace)); + } + + public SchemaVersionDAO(CqlSession session, MigrationConfigs configuration) { + this.session = notNull(session, "session"); + if (!configuration.isValid()) { + throw new IllegalArgumentException("The provided configuration is invalid. Please check if all required values are" + + " available. Current configuration is: " + System.lineSeparator() + configuration); + } + this.keyspace = configuration.getKeyspace(); this.cachePs = new CachePrepareStatement(session); //If running on a single host, don't force ConsistencyLevel.ALL this.consistencyLevel = - session.getCluster().getMetadata().getAllHosts().size() > 1 ? ConsistencyLevel.ALL : ConsistencyLevel.ONE; + session.getMetadata().getNodes().size() > 1 ? ConsistencyLevel.ALL : ConsistencyLevel.ONE; + this.keyspaceName = keyspace.getName(); + this.executionProfileName = configuration.getExecutionProfile(); + this.tableName = createTableName(configuration.getTablePrefix(), SCHEMA_CF); + this.tableCountName = createTableName(configuration.getTablePrefix(), SCHEMA_CF + COUNTS_TABLE_NAME_SUFFIX); + createKeyspaceIfRequired(); + useKeyspace(); + ensureSchemaTable(); } public Keyspace getKeyspace() { @@ -48,90 +113,20 @@ public void createTablesIfNotExist() { if (tablesExist()) { return; } - - Statement statement = new SimpleStatement( - "CREATE TABLE IF NOT EXISTS " + keyspace.getName() + "." + tableName + "(" + - " version_rank int," + - " installed_rank int," + - " version text," + - " description text," + - " script text," + - " checksum int," + - " type text," + - " installed_by text," + - " installed_on timestamp," + - " execution_time int," + - " success boolean," + - " PRIMARY KEY (version)" + - ");"); - statement.setConsistencyLevel(this.consistencyLevel); - session.execute(statement); - - statement = new SimpleStatement( - "CREATE TABLE IF NOT EXISTS " + keyspace.getName() + "." + tableName + COUNTS_TABLE_NAME_SUFFIX + " (" + - " name text," + - " count counter," + - " PRIMARY KEY (name)" + - ");"); - statement.setConsistencyLevel(this.consistencyLevel); - session.execute(statement); - } - - public boolean tablesExist() { - boolean schemaVersionTableExists = false; - boolean schemaVersionCountsTableExists = false; - - Statement schemaVersionStatement = QueryBuilder - .select() - .countAll() - .from(keyspace.getName(), tableName); - - Statement schemaVersionCountsStatement = QueryBuilder - .select() - .countAll() - .from(keyspace.getName(), tableName + COUNTS_TABLE_NAME_SUFFIX); - - schemaVersionStatement.setConsistencyLevel(this.consistencyLevel); - schemaVersionCountsStatement.setConsistencyLevel(this.consistencyLevel); - - try { - ResultSet resultsSchemaVersion = session.execute(schemaVersionStatement); - if (resultsSchemaVersion.one() != null) { - schemaVersionTableExists = true; - } - } catch (InvalidQueryException e) { - LOG.debug("No schema version table found with a name of " + tableName); - } - - try { - ResultSet resultsSchemaVersionCounts = session.execute(schemaVersionCountsStatement); - if (resultsSchemaVersionCounts.one() != null) { - schemaVersionCountsTableExists = true; - } - } catch (InvalidQueryException e) { - LOG.debug("No schema version counts table found with a name of " + tableName + COUNTS_TABLE_NAME_SUFFIX); - } - - return schemaVersionTableExists && schemaVersionCountsTableExists; + createSchemaTable(); } + /** + * Inserts the result of the migration into the migration table + * + * @param appliedMigration the migration that was executed + */ public void addAppliedMigration(AppliedMigration appliedMigration) { createTablesIfNotExist(); - MigrationVersion version = appliedMigration.getVersion(); - int versionRank = calculateVersionRank(version); - PreparedStatement statement = cachePs.prepare( - "INSERT INTO " + keyspace.getName() + "." + tableName + - " (version_rank, installed_rank, version, description, type, script, checksum, installed_on," + - " installed_by, execution_time, success)" + - " VALUES" + - " (?, ?, ?, ?, ?, ?, ?, dateOf(now()), ?, ?, ?);" - ); - - statement.setConsistencyLevel(this.consistencyLevel); - session.execute(statement.bind( - versionRank, + PreparedStatement addMigrationStatement = cachePs.prepare(format(ADD_MIGRATION, getTableName())); + BoundStatement boundStatement = addMigrationStatement.bind(versionRank, calculateInstalledRank(), version.toString(), appliedMigration.getDescription(), @@ -140,11 +135,27 @@ public void addAppliedMigration(AppliedMigration appliedMigration) { appliedMigration.getChecksum(), appliedMigration.getInstalledBy(), appliedMigration.getExecutionTime(), - appliedMigration.isSuccess() - )); + appliedMigration.isSuccess(), + appliedMigration.isIgnored()); + executeStatement(boundStatement, this.consistencyLevel); LOG.debug("Schema version table " + tableName + " successfully updated to reflect changes"); } + public void addMigrationVersion(String version) { + createTablesIfNotExist(); + PreparedStatement addMigrationStatement = cachePs.prepare(format(ADD_MIGRATION_VERSION, getTableMigrationVersion())); + BoundStatement boundStatement = addMigrationStatement.bind(version); + executeStatement(boundStatement, this.consistencyLevel); + LOG.debug("Added schema version"); + } + + public void updateMigrationVersion(String version) { + PreparedStatement updateMigrationVersion = cachePs.prepare(format(UPDATE_MIGRATION_VERSION, getTableMigrationVersion())); + BoundStatement boundStatement = updateMigrationVersion.bind(version); + executeStatement(boundStatement, this.consistencyLevel); + LOG.debug("Updated schema version to " + version); + } + /** * Retrieve the applied migrations from the metadata table. * @@ -154,26 +165,9 @@ public List findAppliedMigrations() { if (!tablesExist()) { return new ArrayList<>(); } - - Select select = QueryBuilder - .select() - .column("version_rank") - .column("installed_rank") - .column("version") - .column("description") - .column("type") - .column("script") - .column("checksum") - .column("installed_on") - .column("installed_by") - .column("execution_time") - .column("success") - .from(keyspace.getName(), tableName); - - select.setConsistencyLevel(this.consistencyLevel); - ResultSet results = session.execute(select); + ResultSet resultSet = executeStatement(format(VERSION_QUERY, getTableName())); List resultsList = new ArrayList<>(); - for (Row row : results) { + for (Row row : resultSet) { resultsList.add(new AppliedMigration( row.getInt("version_rank"), row.getInt("installed_rank"), @@ -182,40 +176,39 @@ public List findAppliedMigrations() { MigrationType.valueOf(row.getString("type")), row.getString("script"), row.isNull("checksum") ? null : row.getInt("checksum"), - row.getTimestamp("installed_on"), + Date.from(row.getInstant("installed_on")), row.getString("installed_by"), row.getInt("execution_time"), - row.getBool("success") + row.getBoolean("success"), + row.getBoolean("ignored") )); } - - //order by version_rank not necessary here as it eventually gets saved in TreeMap that uses natural ordering - return resultsList; } + public boolean versionNotFound() { + if (!tablesExist()) { + return true; + } + ResultSet resultSet = executeStatement(format(MIGRATION_VERSION_QUERY, getTableMigrationVersion())); + return resultSet.all().isEmpty(); + } + + /** * Calculates the installed rank for the new migration to be inserted. * * @return The installed rank. */ private int calculateInstalledRank() { - Statement statement = new SimpleStatement( - "UPDATE " + keyspace.getName() + "." + tableName + COUNTS_TABLE_NAME_SUFFIX + - " SET count = count + 1" + - "WHERE name = 'installed_rank';"); - session.execute(statement); - Select select = QueryBuilder - .select("count") - .from(tableName + COUNTS_TABLE_NAME_SUFFIX); - select.where(eq("name", "installed_rank")); - select.setConsistencyLevel(this.consistencyLevel); - ResultSet result = session.execute(select); + Statement statement = SimpleStatement.newInstance(format(UPDATE_MIGRATION_COUNT, getTableCountName())); + executeStatement(statement, consistencyLevel); + ResultSet result = executeStatement(format(SELECT_COUNT_MIGRATION, getTableCountName())); return (int) result.one().getLong("count"); } - class MigrationMetaHolder { - private int versionRank; + static class MigrationMetaHolder { + private final int versionRank; public MigrationMetaHolder(int versionRank) { this.versionRank = versionRank; @@ -233,13 +226,7 @@ public int getVersionRank() { * @return The rank. */ private int calculateVersionRank(MigrationVersion version) { - Statement statement = QueryBuilder - .select() - .column("version") - .column("version_rank") - .from(keyspace.getName(), tableName); - statement.setConsistencyLevel(this.consistencyLevel); - ResultSet versionRows = session.execute(statement); + ResultSet versionRows = executeStatement(format(SELECT_MIGRATION, getTableName())); List migrationVersions = new ArrayList<>(); HashMap migrationMetaHolders = new HashMap<>(); @@ -252,11 +239,8 @@ private int calculateVersionRank(MigrationVersion version) { Collections.sort(migrationVersions); - BatchStatement batchStatement = new BatchStatement(); - PreparedStatement preparedStatement = cachePs.prepare( - "UPDATE " + keyspace.getName() + "." + tableName + - " SET version_rank = ?" + - " WHERE version = ?;"); + BatchStatement batchStatement = BatchStatement.newInstance(BatchType.LOGGED); + PreparedStatement preparedStatement = cachePs.prepare(format(UPDATE_MIGRATION_VERSION_RANK, tableName)); for (int i = 0; i < migrationVersions.size(); i++) { if (version.compareTo(migrationVersions.get(i)) < 0) { @@ -265,13 +249,92 @@ private int calculateVersionRank(MigrationVersion version) { batchStatement.add(preparedStatement.bind( migrationMetaHolders.get(migrationVersionStr).getVersionRank() + 1, migrationVersionStr)); - batchStatement.setConsistencyLevel(this.consistencyLevel); } return i + 1; } } - session.execute(batchStatement); + executeStatement(batchStatement, consistencyLevel); return migrationVersions.size() + 1; } + private ResultSet executeStatement(String statement) throws DriverException { + return executeStatement(SimpleStatement.newInstance(statement), this.consistencyLevel); + } + + private ResultSet executeStatement(Statement statement, ConsistencyLevel consistencyLevel) throws DriverException { + return session.execute(statement + .setExecutionProfileName(executionProfileName) + .setConsistencyLevel(consistencyLevel)); + } + + private void useKeyspace() { + LOG.info("Changing keyspace of the session to '" + keyspaceName + "'"); + session.execute("USE " + keyspaceName); + } + + private static String createTableName(String tablePrefix, String tableName) { + if (tablePrefix == null || tablePrefix.isEmpty()) { + return tableName; + } + return String.format("%s_%s", tablePrefix, tableName); + } + + private void createKeyspaceIfRequired() { + if (keyspace == null || keyspaceExists()) { + return; + } + try { + executeStatement(this.keyspace.getCqlStatement()); + } catch (DriverException exception) { + throw new CassandraMigrationException(format("Unable to create keyspace %s.", keyspaceName), exception); + } + } + + /** + * Makes sure the schema migration table exists. If it is not available it will be created. + */ + private void ensureSchemaTable() { + if (tablesExist()) { + return; + } + createSchemaTable(); + } + + private boolean tablesExist() { + Metadata metadata = session.getMetadata(); + + return isTableExisting(metadata, tableName) + && isTableExisting(metadata, tableCountName) + && isTableExisting(metadata, tableMigrationVersion); + } + + private boolean isTableExisting(Metadata metadata, String tableName) { + return metadata + .getKeyspace(keyspaceName) + .map(keyspaceMetadata -> keyspaceMetadata.getTable(tableName).isPresent()) + .orElse(false); + } + + private void createSchemaTable() { + executeStatement(format(CREATE_MIGRATION_CF, getTableName())); + executeStatement(format(CREATE_MIGRATION_COUNT_CF, getTableCountName())); + executeStatement(format(CREATE_MIGRATION_VERSION, getTableMigrationVersion())); + } + + + private boolean keyspaceExists() { + return session.getMetadata().getKeyspace(keyspaceName).isPresent(); + } + + public String getTableName() { + return tableName; + } + + public String getTableCountName() { + return tableCountName; + } + + public String getTableMigrationVersion() { + return tableMigrationVersion; + } } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/info/AppliedMigration.java b/src/main/java/com/contrastsecurity/cassandra/migration/info/AppliedMigration.java index 4cbbd25..103dce2 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/info/AppliedMigration.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/info/AppliedMigration.java @@ -78,6 +78,11 @@ public class AppliedMigration implements Comparable { */ private boolean success; + /** + * Flag indicating whether the migration failure can be ignored. + */ + private boolean ignored; + /** * Creates a new applied migration. Only called from the RowMapper. * @@ -96,6 +101,28 @@ public class AppliedMigration implements Comparable { public AppliedMigration(int versionRank, int installedRank, MigrationVersion version, String description, MigrationType type, String script, Integer checksum, Date installedOn, String installedBy, int executionTime, boolean success) { + this(versionRank, installedRank, version, description, type, script, checksum, installedOn, installedBy, executionTime, success, false); + } + + /** + * Creates a new applied migration. Only called from the RowMapper. + * + * @param versionRank The position of this version amongst all others. (For easy order by sorting) + * @param installedRank The order in which this migration was applied amongst all others. (For out of order detection) + * @param version The target version of this migration. + * @param description The description of the migration. + * @param type The type of migration (INIT, CQL, ...) + * @param script The name of the script to execute for this migration, relative to its classpath location. + * @param checksum The checksum of the migration. (Optional) + * @param installedOn The timestamp when this migration was installed. + * @param installedBy The user that installed this migration. + * @param executionTime The execution time (in millis) of this migration. + * @param success Flag indicating whether the migration was successful or not. + * @param ignored Flag indicating whether the migration failure can be ignored. + */ + public AppliedMigration(int versionRank, int installedRank, MigrationVersion version, String description, MigrationType type, + String script, Integer checksum, Date installedOn, + String installedBy, int executionTime, boolean success, boolean ignored) { this.versionRank = versionRank; this.installedRank = installedRank; this.version = version; @@ -107,6 +134,7 @@ public AppliedMigration(int versionRank, int installedRank, MigrationVersion ver this.installedBy = installedBy; this.executionTime = executionTime; this.success = success; + this.ignored = ignored; } /** @@ -123,6 +151,24 @@ public AppliedMigration(int versionRank, int installedRank, MigrationVersion ver */ public AppliedMigration(MigrationVersion version, String description, MigrationType type, String script, Integer checksum, String installedBy, int executionTime, boolean success) { + this(version, description, type, script, checksum, installedBy, executionTime, success, false); + } + + /** + * Creates a new applied migration. + * + * @param version The target version of this migration. + * @param description The description of the migration. + * @param type The type of migration (INIT, CQL, ...) + * @param script The name of the script to execute for this migration, relative to its classpath location. + * @param checksum The checksum of the migration. (Optional) + * @param installedBy The user that installed this migration. + * @param executionTime The execution time (in millis) of this migration. + * @param success Flag indicating whether the migration was successful or not. + * @param ignored Flag indicating whether the migration failure can be ignored. + */ + public AppliedMigration(MigrationVersion version, String description, MigrationType type, String script, + Integer checksum, String installedBy, int executionTime, boolean success, boolean ignored) { this.version = version; this.description = abbreviateDescription(description); this.type = type; @@ -131,6 +177,7 @@ public AppliedMigration(MigrationVersion version, String description, MigrationT this.installedBy = installedBy; this.executionTime = executionTime; this.success = success; + this.ignored = ignored; } /** @@ -246,6 +293,13 @@ public boolean isSuccess() { return success; } + /** + * @return Flag indicating whether the migration failure can be ignored. + */ + public boolean isIgnored() { + return ignored; + } + @SuppressWarnings("SimplifiableIfStatement") @Override public boolean equals(Object o) { @@ -257,6 +311,7 @@ public boolean equals(Object o) { if (executionTime != that.executionTime) return false; if (installedRank != that.installedRank) return false; if (success != that.success) return false; + if (ignored != that.ignored) return false; if (versionRank != that.versionRank) return false; if (checksum != null ? !checksum.equals(that.checksum) : that.checksum != null) return false; if (!description.equals(that.description)) return false; @@ -280,6 +335,7 @@ public int hashCode() { result = 31 * result + (installedBy != null ? installedBy.hashCode() : 0); result = 31 * result + executionTime; result = 31 * result + (success ? 1 : 0); + result = 31 * result + (ignored ? 1 : 0); return result; } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationInfoService.java b/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationInfoService.java index f7e9a5c..219dea1 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationInfoService.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationInfoService.java @@ -67,7 +67,7 @@ public void refresh() { migrationInfos = mergeAvailableAndAppliedMigrations(availableMigrations, appliedMigrations); - if (MigrationVersion.CURRENT == target) { + if (MigrationVersion.CURRENT == target && current() != null) { target = current().getVersion(); } } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationVersion.java b/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationVersion.java index f27c47a..d0b2137 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationVersion.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/info/MigrationVersion.java @@ -28,7 +28,7 @@ public class MigrationVersion implements Comparable { public static final MigrationVersion LATEST = new MigrationVersion(BigInteger.valueOf(-1), "<< Latest Version >>"); public static final MigrationVersion CURRENT = new MigrationVersion(BigInteger.valueOf(-2), "<< Current Version >>"); - private static final String TABLE = "cassandra_migration_version"; + public static final String TABLE = "migration_version"; private List versionParts; private String displayText; diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/resolver/MigrationExecutor.java b/src/main/java/com/contrastsecurity/cassandra/migration/resolver/MigrationExecutor.java index d44a8ac..af04940 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/resolver/MigrationExecutor.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/resolver/MigrationExecutor.java @@ -15,11 +15,11 @@ */ package com.contrastsecurity.cassandra.migration.resolver; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Executes a migration. */ public interface MigrationExecutor { - void execute(Session session); + void execute(CqlSession session); } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/resolver/cql/CqlMigrationExecutor.java b/src/main/java/com/contrastsecurity/cassandra/migration/resolver/cql/CqlMigrationExecutor.java index d831675..0bec6d5 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/resolver/cql/CqlMigrationExecutor.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/resolver/cql/CqlMigrationExecutor.java @@ -3,7 +3,7 @@ import com.contrastsecurity.cassandra.migration.resolver.MigrationExecutor; import com.contrastsecurity.cassandra.migration.script.CqlScript; import com.contrastsecurity.cassandra.migration.utils.scanner.Resource; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Database migration based on a cql file. @@ -34,7 +34,7 @@ public CqlMigrationExecutor(Resource cqlScriptResource, String encoding) { } @Override - public void execute(Session session) { + public void execute(CqlSession session) { CqlScript cqlScript = new CqlScript(cqlScriptResource, encoding); cqlScript.execute(session); } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/resolver/java/JavaMigrationExecutor.java b/src/main/java/com/contrastsecurity/cassandra/migration/resolver/java/JavaMigrationExecutor.java index f85a913..97df78e 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/resolver/java/JavaMigrationExecutor.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/resolver/java/JavaMigrationExecutor.java @@ -3,7 +3,7 @@ import com.contrastsecurity.cassandra.migration.CassandraMigrationException; import com.contrastsecurity.cassandra.migration.api.JavaMigration; import com.contrastsecurity.cassandra.migration.resolver.MigrationExecutor; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Adapter for executing migrations implementing JavaMigration. @@ -24,7 +24,7 @@ public JavaMigrationExecutor(JavaMigration javaMigration) { } @Override - public void execute(Session session) { + public void execute(CqlSession session) { try { javaMigration.migrate(session); } catch (Exception e) { diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/script/CqlScript.java b/src/main/java/com/contrastsecurity/cassandra/migration/script/CqlScript.java index 5807604..9ec44ae 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/script/CqlScript.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/script/CqlScript.java @@ -20,7 +20,7 @@ import com.contrastsecurity.cassandra.migration.logging.LogFactory; import com.contrastsecurity.cassandra.migration.utils.StringUtils; import com.contrastsecurity.cassandra.migration.utils.scanner.Resource; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; import java.io.BufferedReader; import java.io.IOException; @@ -89,7 +89,7 @@ public Resource getResource() { * Executes this script against the database. * @param session Cassandra session */ - public void execute(final Session session) { + public void execute(final CqlSession session) { for (String cqlStatement : cqlStatements) { LOG.debug("Executing CQL: " + cqlStatement); session.execute(cqlStatement); diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/utils/CachePrepareStatement.java b/src/main/java/com/contrastsecurity/cassandra/migration/utils/CachePrepareStatement.java index 9241035..8971348 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/utils/CachePrepareStatement.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/utils/CachePrepareStatement.java @@ -2,15 +2,15 @@ import java.util.concurrent.ConcurrentHashMap; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; public class CachePrepareStatement { private ConcurrentHashMap cacheStatement = new ConcurrentHashMap<>(); - private Session session; + private final CqlSession session; - public CachePrepareStatement(Session session) { + public CachePrepareStatement(CqlSession session) { this.session = session; } diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/utils/Ensure.java b/src/main/java/com/contrastsecurity/cassandra/migration/utils/Ensure.java new file mode 100644 index 0000000..13f5b8e --- /dev/null +++ b/src/main/java/com/contrastsecurity/cassandra/migration/utils/Ensure.java @@ -0,0 +1,44 @@ +package com.contrastsecurity.cassandra.migration.utils; + +/** + * @author Patrick Kranz + */ +public final class Ensure { + private Ensure() { + } + + /** + * Checks if the given argument is null and throws an exception with a + * message containing the argument name if that it true. + * + * @param argument the argument to check for null + * @param argumentName the name of the argument to check. + * This is used in the exception message. + * @param the type of the argument + * @return the argument itself + * @throws IllegalArgumentException in case argument is null + */ + public static T notNull(T argument, String argumentName) { + if (argument == null) { + throw new IllegalArgumentException("Argument " + argumentName + " must not be null."); + } + return argument; + } + + /** + * Checks if the given String is null or contains only whitespaces. + * The String is trimmed before the empty check. + * + * @param argument the String to check for null or emptiness + * @param argumentName the name of the argument to check. + * This is used in the exception message. + * @return the String that was given as argument + * @throws IllegalArgumentException in case argument is null or empty + */ + public static String notNullOrEmpty(String argument, String argumentName) { + if (argument == null || argument.trim().isEmpty()) { + throw new IllegalArgumentException("Argument " + argumentName + " must not be null or empty."); + } + return argument; + } +} diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/BaseIT.java b/src/test/java/com/contrastsecurity/cassandra/migration/BaseIT.java index 1a810c9..1310730 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/BaseIT.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/BaseIT.java @@ -1,24 +1,30 @@ package com.contrastsecurity.cassandra.migration; import com.contrastsecurity.cassandra.migration.config.Keyspace; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.core.Statement; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import java.net.InetSocketAddress; +import java.time.Duration; + public abstract class BaseIT { public static final String CASSANDRA__KEYSPACE = "cassandra_migration_test"; public static final String CASSANDRA_CONTACT_POINT = "localhost"; public static final int CASSANDRA_PORT = 9147; public static final String CASSANDRA_USERNAME = "cassandra"; public static final String CASSANDRA_PASSWORD = "cassandra"; + private static final int REQUEST_TIMEOUT_IN_SECONDS = 30; - private Session session; + private CqlSession session; @BeforeClass public static void beforeSuite() throws Exception { @@ -35,7 +41,7 @@ public static void afterSuite() { @Before public void createKeyspace() { - Statement statement = new SimpleStatement( + Statement statement = SimpleStatement.newInstance( "CREATE KEYSPACE " + CASSANDRA__KEYSPACE + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" ); @@ -44,7 +50,7 @@ public void createKeyspace() { @After public void dropKeyspace() { - Statement statement = new SimpleStatement( + Statement statement = SimpleStatement.newInstance( "DROP KEYSPACE " + CASSANDRA__KEYSPACE + ";" ); getSession(getKeyspace()).execute(statement); @@ -60,19 +66,29 @@ protected Keyspace getKeyspace() { return ks; } - private Session getSession(Keyspace keyspace) { + private CqlSession createSession() { + DriverConfigLoader loader = DriverConfigLoader.programmaticBuilder() + .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(REQUEST_TIMEOUT_IN_SECONDS)) + .withBoolean(DefaultDriverOption.REQUEST_WARN_IF_SET_KEYSPACE, false) + .build(); + return new CqlSessionBuilder() + .addContactPoint(new InetSocketAddress(CASSANDRA_CONTACT_POINT, CASSANDRA_PORT)) + //.withKeyspace(CASSANDRA__KEYSPACE) + .withConfigLoader(loader) + .withLocalDatacenter("datacenter1") + .build(); + } + + + private CqlSession getSession(Keyspace keyspace) { if (session != null && !session.isClosed()) return session; - com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder(); - builder.addContactPoints(CASSANDRA_CONTACT_POINT).withPort(CASSANDRA_PORT); - builder.withCredentials(keyspace.getCluster().getUsername(), keyspace.getCluster().getPassword()); - Cluster cluster = builder.build(); - session = cluster.connect(); + session = createSession(); return session; } - protected Session getSession() { + protected CqlSession getSession() { return session; } } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java b/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java index 4505a31..e8bea0d 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java @@ -4,10 +4,11 @@ import com.contrastsecurity.cassandra.migration.info.MigrationInfo; import com.contrastsecurity.cassandra.migration.info.MigrationInfoDumper; import com.contrastsecurity.cassandra.migration.info.MigrationInfoService; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.select.Select; import org.junit.Assert; import org.junit.Test; @@ -15,7 +16,7 @@ import java.io.IOException; import java.io.InputStreamReader; -import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; @@ -40,27 +41,38 @@ public void runApiTest() { assertThat(info.getType().name(), is(MigrationType.JAVA_DRIVER.name())); assertThat(info.getScript().contains(".java"), is(true)); - Select select = QueryBuilder.select().column("value").from("test1"); - select.where(eq("space", "web")).and(eq("key", "facebook")); - ResultSet result = getSession().execute(select); + Select select = QueryBuilder.selectFrom("test1") + .column("value") + .whereColumn("space") + .isEqualTo(literal("web")) + .whereColumn("key") + .isEqualTo(literal("facebook")); + ResultSet result = getSession().execute(select.build()); assertThat(result.one().getString("value"), is("facebook.com")); } else if (info.getVersion().equals("3.0")) { assertThat(info.getDescription(), is("Third")); assertThat(info.getType().name(), is(MigrationType.JAVA_DRIVER.name())); assertThat(info.getScript().contains(".java"), is(true)); - Select select = QueryBuilder.select().column("value").from("test1"); - select.where(eq("space", "web")).and(eq("key", "google")); - ResultSet result = getSession().execute(select); + Select select = QueryBuilder.selectFrom("test1") + .column("value") + .whereColumn("space") + .isEqualTo(literal("web")) + .whereColumn("key") + .isEqualTo(literal("google")); + ResultSet result = getSession().execute(select.build()); assertThat(result.one().getString("value"), is("google.com")); } else if (info.getVersion().equals("2.0.0")) { assertThat(info.getDescription(), is("Second")); assertThat(info.getType().name(), is(MigrationType.CQL.name())); assertThat(info.getScript().contains(".cql"), is(true)); - Select select = QueryBuilder.select().column("title").column("message").from("contents"); - select.where(eq("id", 1)); - Row row = getSession().execute(select).one(); + Select select = QueryBuilder.selectFrom("contents") + .column("title") + .column("message") + .whereColumn("id") + .isEqualTo(literal(1)); + Row row = getSession().execute(select.build()).one(); assertThat(row.getString("title"), is("foo")); assertThat(row.getString("message"), is("bar")); } else if (info.getVersion().equals("1.0.0")) { @@ -68,9 +80,13 @@ public void runApiTest() { assertThat(info.getType().name(), is(MigrationType.CQL.name())); assertThat(info.getScript().contains(".cql"), is(true)); - Select select = QueryBuilder.select().column("value").from("test1"); - select.where(eq("space", "foo")).and(eq("key", "bar")); - ResultSet result = getSession().execute(select); + Select select = QueryBuilder.selectFrom("test1") + .column("value") + .whereColumn("space") + .isEqualTo(literal( "foo")) + .whereColumn("key") + .isEqualTo(literal("bar")); + ResultSet result = getSession().execute(select.build()); assertThat(result.one().getString("value"), is("profit!")); } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/DummyAbstractJavaMigration.java b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/DummyAbstractJavaMigration.java index e8d5b7f..9a885e7 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/DummyAbstractJavaMigration.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/DummyAbstractJavaMigration.java @@ -16,15 +16,15 @@ package com.contrastsecurity.cassandra.migration.resolver.java.dummy; import com.contrastsecurity.cassandra.migration.api.JavaMigration; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Test for abstract class support. */ public abstract class DummyAbstractJavaMigration implements JavaMigration { - public final void migrate(Session session) throws Exception { + public final void migrate(CqlSession session) throws Exception { doMigrate(session); } - public abstract void doMigrate(Session session) throws Exception; + public abstract void doMigrate(CqlSession session) throws Exception; } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/SabotageEnum.java b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/SabotageEnum.java index 752d9f2..5410943 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/SabotageEnum.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/SabotageEnum.java @@ -16,7 +16,7 @@ package com.contrastsecurity.cassandra.migration.resolver.java.dummy; import com.contrastsecurity.cassandra.migration.api.JavaMigration; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Trips up the ClassPathScanner. See issue 801. @@ -25,7 +25,7 @@ public enum SabotageEnum implements JavaMigration { FAIL { @Override - public void migrate(Session session) throws Exception { + public void migrate(CqlSession session) throws Exception { } } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V2__InterfaceBasedMigration.java b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V2__InterfaceBasedMigration.java index 732e1ca..2df045f 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V2__InterfaceBasedMigration.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V2__InterfaceBasedMigration.java @@ -16,13 +16,13 @@ package com.contrastsecurity.cassandra.migration.resolver.java.dummy; import com.contrastsecurity.cassandra.migration.api.JavaMigration; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Test migration. */ public class V2__InterfaceBasedMigration implements JavaMigration { - public void migrate(Session session) throws Exception { + public void migrate(CqlSession session) throws Exception { //Do nothing. } } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V4__DummyExtendedAbstractJdbcMigration.java b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V4__DummyExtendedAbstractJdbcMigration.java index 65363a7..7b63d77 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V4__DummyExtendedAbstractJdbcMigration.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/V4__DummyExtendedAbstractJdbcMigration.java @@ -15,14 +15,14 @@ */ package com.contrastsecurity.cassandra.migration.resolver.java.dummy; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Test class that extends and abstract class instead of implementing JdbcMigration directly. */ public class V4__DummyExtendedAbstractJdbcMigration extends DummyAbstractJavaMigration { @Override - public void doMigrate(Session session) throws Exception { + public void doMigrate(CqlSession session) throws Exception { // DO nothing } } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/Version3dot5.java b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/Version3dot5.java index f90ffc0..f66d1d8 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/Version3dot5.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/dummy/Version3dot5.java @@ -18,13 +18,13 @@ import com.contrastsecurity.cassandra.migration.api.MigrationChecksumProvider; import com.contrastsecurity.cassandra.migration.api.MigrationInfoProvider; import com.contrastsecurity.cassandra.migration.info.MigrationVersion; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Test migration. */ public class Version3dot5 extends DummyAbstractJavaMigration implements MigrationInfoProvider, MigrationChecksumProvider { - public void doMigrate(Session session) throws Exception { + public void doMigrate(CqlSession session) throws Exception { //Do nothing. } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/error/BrokenJdbcMigration.java b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/error/BrokenJdbcMigration.java index 6ebc284..c8ba039 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/error/BrokenJdbcMigration.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/resolver/java/error/BrokenJdbcMigration.java @@ -16,7 +16,7 @@ package com.contrastsecurity.cassandra.migration.resolver.java.error; import com.contrastsecurity.cassandra.migration.api.JavaMigration; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; /** * Test for exception in constructor support. @@ -26,7 +26,7 @@ public BrokenJdbcMigration() { throw new IllegalStateException("Expected!"); } - public final void migrate(Session session) throws Exception { + public final void migrate(CqlSession session) throws Exception { // Do nothing } } diff --git a/src/test/java/migration/integ/java/V3_0_1__Three_zero_one.java b/src/test/java/migration/integ/java/V3_0_1__Three_zero_one.java index 1fc5dd1..8434b01 100644 --- a/src/test/java/migration/integ/java/V3_0_1__Three_zero_one.java +++ b/src/test/java/migration/integ/java/V3_0_1__Three_zero_one.java @@ -1,18 +1,18 @@ package migration.integ.java; import com.contrastsecurity.cassandra.migration.api.JavaMigration; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.Insert; -import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; public class V3_0_1__Three_zero_one implements JavaMigration { @Override - public void migrate(Session session) throws Exception { - Insert insert = QueryBuilder.insertInto("test1"); - insert.value("space", "web"); - insert.value("key", "facebook"); - insert.value("value", "facebook.com"); - - session.execute(insert); + public void migrate(CqlSession session) throws Exception { + session.execute(QueryBuilder.insertInto("test1") + .value("space", literal("web")) + .value("key", literal("facebook")) + .value("value", literal("facebook.com")) + .build()); } } diff --git a/src/test/java/migration/integ/java/V3_0__Third.java b/src/test/java/migration/integ/java/V3_0__Third.java index 3183c7a..a6339a0 100644 --- a/src/test/java/migration/integ/java/V3_0__Third.java +++ b/src/test/java/migration/integ/java/V3_0__Third.java @@ -1,19 +1,18 @@ package migration.integ.java; import com.contrastsecurity.cassandra.migration.api.JavaMigration; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.Insert; -import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; public class V3_0__Third implements JavaMigration { @Override - public void migrate(Session session) throws Exception { - Insert insert = QueryBuilder.insertInto("test1"); - insert.value("space", "web"); - insert.value("key", "google"); - insert.value("value", "google.com"); - - session.execute(insert); + public void migrate(CqlSession session) throws Exception { + session.execute(QueryBuilder.insertInto("test1") + .value("space", literal("web")) + .value("key", literal("google")) + .value("value", literal("google.com")).build()); } }