Skip to content

Upgraded to cassandra driver 4.0 #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<groupId>com.contrastsecurity</groupId>
<artifactId>cassandra-migration</artifactId>
<name>Cassandra Migration</name>
<version>0.9-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
<description>
Database migration tool for Cassandra
</description>
Expand Down Expand Up @@ -82,6 +82,17 @@
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.14.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -175,6 +186,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
import com.contrastsecurity.cassandra.migration.config.ScriptsLocations;
import com.contrastsecurity.cassandra.migration.dao.SchemaVersionDAO;
import com.contrastsecurity.cassandra.migration.info.MigrationInfoService;
import com.contrastsecurity.cassandra.migration.info.MigrationVersion;
import com.contrastsecurity.cassandra.migration.logging.Log;
import com.contrastsecurity.cassandra.migration.logging.LogFactory;
import com.contrastsecurity.cassandra.migration.resolver.CompositeMigrationResolver;
import com.contrastsecurity.cassandra.migration.resolver.MigrationResolver;
import com.contrastsecurity.cassandra.migration.utils.StringUtils;
import com.contrastsecurity.cassandra.migration.utils.VersionPrinter;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.util.List;
import java.net.InetSocketAddress;
import java.util.Collection;

public class CassandraMigration {

Expand All @@ -29,12 +31,19 @@ public class CassandraMigration {
private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
private Keyspace keyspace;
private MigrationConfigs configs;
private CqlSession session;

public CassandraMigration() {
this.keyspace = new Keyspace();
this.configs = new MigrationConfigs();
}

public CassandraMigration(CqlSession session, MigrationConfigs configs) {
this.session = session;
this.keyspace = configs.getKeyspace();
this.configs = configs;
}

public ClassLoader getClassLoader() {
return classLoader;
}
Expand Down Expand Up @@ -66,11 +75,11 @@ private MigrationResolver createMigrationResolver() {

public int migrate() {
return execute(new Action<Integer>() {
public Integer execute(Session session) {
new Initialize().run(session, keyspace, MigrationVersion.CURRENT.getTable());
public Integer execute(CqlSession session) {
new Initialize().run(session, keyspace);

MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace);
Migrate migrate = new Migrate(migrationResolver, configs.getTarget(), schemaVersionDAO, session,
keyspace.getCluster().getUsername(), configs.isAllowOutOfOrder());

Expand All @@ -81,9 +90,9 @@ public Integer execute(Session session) {

public MigrationInfoService info() {
return execute(new Action<MigrationInfoService>() {
public MigrationInfoService execute(Session session) {
public MigrationInfoService execute(CqlSession session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace);
MigrationInfoService migrationInfoService =
new MigrationInfoService(migrationResolver, schemaVersionDAO, configs.getTarget(), false, true);
migrationInfoService.refresh();
Expand All @@ -94,21 +103,21 @@ public MigrationInfoService execute(Session session) {
}

public void validate() {
String validationError = execute(new Action<String>() {
@Override
public String execute(Session session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false);
return validate.run();
}
});
if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
String validationError = execute(new Action<String>() {
@Override
public String execute(CqlSession session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace);
Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false);
return validate.run();
}
});

if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
}

public void baseline() {
//TODO
throw new NotImplementedException();
Expand All @@ -119,11 +128,11 @@ private String getConnectionInfo(Metadata metadata) {
sb.append("Connected to cluster: ");
sb.append(metadata.getClusterName());
sb.append("\n");
for (Host host : metadata.getAllHosts()) {
for (Node node : metadata.getNodes().values()) {
sb.append("Data center: ");
sb.append(host.getDatacenter());
sb.append(node.getDatacenter());
sb.append("; Host: ");
sb.append(host.getAddress());
sb.append(node.getBroadcastAddress());
}
return sb.toString();
}
Expand All @@ -132,38 +141,44 @@ <T> T execute(Action<T> action) {
T result;

VersionPrinter.printVersion(classLoader);

com.datastax.driver.core.Cluster cluster = null;
Session session = null;
try {
if (null == keyspace)
throw new IllegalArgumentException("Unable to establish Cassandra session. Keyspace is not configured.");

if (null == keyspace.getCluster())
throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured.");

com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder();
builder.addContactPoints(keyspace.getCluster().getContactpoints()).withPort(keyspace.getCluster().getPort());
if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) {
if (null != keyspace.getCluster().getPassword() && !keyspace.getCluster().getPassword().trim().isEmpty()) {
builder.withCredentials(keyspace.getCluster().getUsername(),
keyspace.getCluster().getPassword());
} else {
throw new IllegalArgumentException("Password must be provided with username.");

if (session == null) {

if (null == keyspace.getCluster())
throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured.");

CqlSessionBuilder cqlSessionBuilder = new CqlSessionBuilder()
.withKeyspace(keyspace.getName());
if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) {
if (null != keyspace.getCluster().getPassword() && !keyspace.getCluster().getPassword().trim().isEmpty()) {
cqlSessionBuilder.withAuthCredentials(keyspace.getCluster().getUsername(),
keyspace.getCluster().getPassword());
} else {
throw new IllegalArgumentException("Password must be provided with username.");
}
}
if (StringUtils.hasText(keyspace.getCluster().getLocalDatacenter())) {
cqlSessionBuilder.withLocalDatacenter(keyspace.getCluster().getLocalDatacenter());
}
for (String contactPoint : keyspace.getCluster().getContactpoints()) {
cqlSessionBuilder.addContactPoint(new InetSocketAddress(contactPoint, keyspace.getCluster().getPort()));
}
session = cqlSessionBuilder.build();
}
cluster = builder.build();

Metadata metadata = cluster.getMetadata();
Metadata metadata = session.getMetadata();
LOG.info(getConnectionInfo(metadata));

session = cluster.newSession();
if (null == keyspace.getName() || keyspace.getName().trim().length() == 0)
throw new IllegalArgumentException("Keyspace not specified.");
List<KeyspaceMetadata> keyspaces = metadata.getKeyspaces();
Collection<KeyspaceMetadata> keyspaces = metadata.getKeyspaces().values();
boolean keyspaceExists = false;
for (KeyspaceMetadata keyspaceMetadata : keyspaces) {
if (keyspaceMetadata.getName().equalsIgnoreCase(keyspace.getName()))
if (keyspaceMetadata.getName().asInternal().equalsIgnoreCase(keyspace.getName()))
keyspaceExists = true;
}
if (keyspaceExists)
Expand All @@ -176,20 +191,14 @@ <T> T execute(Action<T> action) {
if (null != session && !session.isClosed())
try {
session.close();
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Error closing Cassandra session");
}
if (null != cluster && !cluster.isClosed())
try {
cluster.close();
} catch(Exception e) {
LOG.warn("Error closing Cassandra cluster");
}
}
return result;
}

interface Action<T> {
T execute(Session session);
T execute(CqlSession session);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import com.contrastsecurity.cassandra.migration.config.Keyspace;
import com.contrastsecurity.cassandra.migration.dao.SchemaVersionDAO;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

public class Initialize {

public void run(Session session, Keyspace keyspace, String migrationVersionTableName) {
SchemaVersionDAO dao = new SchemaVersionDAO(session, keyspace, migrationVersionTableName);
public void run(CqlSession session, Keyspace keyspace) {
SchemaVersionDAO dao = new SchemaVersionDAO(session, keyspace);
dao.createTablesIfNotExist();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@
import com.contrastsecurity.cassandra.migration.resolver.MigrationResolver;
import com.contrastsecurity.cassandra.migration.utils.StopWatch;
import com.contrastsecurity.cassandra.migration.utils.TimeFormat;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

public class Migrate {
private static final Log LOG = LogFactory.getLog(Migrate.class);

private final MigrationVersion target;
private final SchemaVersionDAO schemaVersionDAO;
private final MigrationResolver migrationResolver;
private final Session session;
private final CqlSession session;
private final String user;
private final boolean allowOutOfOrder;

public Migrate(MigrationResolver migrationResolver, MigrationVersion target, SchemaVersionDAO schemaVersionDAO,
Session session, String user, boolean allowOutOfOrder) {
CqlSession session, String user, boolean allowOutOfOrder) {
this.migrationResolver = migrationResolver;
this.schemaVersionDAO = schemaVersionDAO;
this.session = session;
Expand All @@ -36,6 +36,7 @@ public int run() {
stopWatch.start();

int migrationSuccessCount = 0;
boolean firstTimeMigration = schemaVersionDAO.versionNotFound();
while (true) {
final boolean firstRun = migrationSuccessCount == 0;

Expand Down Expand Up @@ -76,11 +77,18 @@ public int run() {
MigrationInfo[] pendingMigrations = infoService.pending();

if (pendingMigrations.length == 0) {
if (infoService.current() != null) {
if (firstTimeMigration) {
schemaVersionDAO.addMigrationVersion(infoService.current().getVersion().getVersion());
} else if (migrationSuccessCount > 0){
schemaVersionDAO.updateMigrationVersion(infoService.current().getVersion().getVersion());
}
}
break;
}

boolean isOutOfOrder = pendingMigrations[0].getVersion().compareTo(currentSchemaVersion) < 0;
MigrationVersion mv = applyMigration(pendingMigrations[0], isOutOfOrder);
MigrationVersion mv = applyMigration(pendingMigrations[0], isOutOfOrder, firstTimeMigration);
if(mv == null) {
//no more migrations
break;
Expand All @@ -96,11 +104,10 @@ public int run() {
return migrationSuccessCount;
}

private MigrationVersion applyMigration(final MigrationInfo migration, boolean isOutOfOrder) {
private MigrationVersion applyMigration(final MigrationInfo migration, boolean isOutOfOrder, boolean firstTimeMigration) {
MigrationVersion version = migration.getVersion();
LOG.info("Migrating keyspace " + schemaVersionDAO.getKeyspace().getName() + " to version " + version + " - " + migration.getDescription() +
(isOutOfOrder ? " (out of order)" : ""));

StopWatch stopWatch = new StopWatch();
stopWatch.start();

Expand All @@ -114,16 +121,21 @@ private MigrationVersion applyMigration(final MigrationInfo migration, boolean i
LOG.debug("Successfully completed and committed migration of keyspace " +
schemaVersionDAO.getKeyspace().getName() + " to version " + version);
} catch (CassandraMigrationException e) {
String failedMsg = "Migration of keyspace " + schemaVersionDAO.getKeyspace().getName() +
" to version " + version + " failed!";

LOG.error(failedMsg + " Please restore backups and roll back database and code!");

stopWatch.stop();
int executionTime = (int) stopWatch.getTotalTimeMillis();
if (firstTimeMigration) {
AppliedMigration appliedMigration = new AppliedMigration(version, migration.getDescription(),
migration.getType(), migration.getScript(), migration.getChecksum(), user, executionTime, true, firstTimeMigration);
schemaVersionDAO.addAppliedMigration(appliedMigration);
LOG.error("Failed applying migration but since migration is being run first time it will be ignored", e);
return version;
}
AppliedMigration appliedMigration = new AppliedMigration(version, migration.getDescription(),
migration.getType(), migration.getScript(), migration.getChecksum(), user, executionTime, false);
schemaVersionDAO.addAppliedMigration(appliedMigration);
String failedMsg = "Migration of keyspace " + schemaVersionDAO.getKeyspace().getName() +
" to version " + version + " failed!";
LOG.error(failedMsg + " Please restore backups and roll back database and code!");
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.contrastsecurity.cassandra.migration.api;

import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

public interface JavaMigration {
void migrate(Session session) throws Exception;
void migrate(CqlSession session) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public String getDescription() {
private int port = 9042;
private String username;
private String password;
private String localDatacenter;

public Cluster() {
String contactpointsP = System.getProperty(ClusterProperty.CONTACTPOINTS.getName());
Expand Down Expand Up @@ -80,4 +81,12 @@ public String getPassword() {
public void setPassword(String password) {
this.password = password;
}

public String getLocalDatacenter() {
return localDatacenter;
}

public void setLocalDatacenter(String localDatacenter) {
this.localDatacenter = localDatacenter;
}
}
Loading