diff --git a/README.md b/README.md
index d3b190487..8e5a31393 100644
--- a/README.md
+++ b/README.md
@@ -46,15 +46,8 @@ Spark Compatibility
| Crossdata Version | Spark Version |
|-------------------|:--------------|
-| 1.7.X | 1.6.X |
-| 1.6.X | 1.6.X |
-| 1.5.X | 1.6.X |
-| 1.4.X | 1.6.X |
-| 1.3.X | 1.6.X |
-| 1.2.X | 1.5.X |
-| 1.1.X | 1.5.X |
-| 1.0.X | 1.5.X |
-
+| 1.3.X - 1.9.X | 1.6.X |
+| 1.0.X - 1.2.X | 1.5.X |
=============
Documentation
diff --git a/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala b/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala
index abb29e775..4a2a64547 100644
--- a/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala
+++ b/common/src/main/scala/com/stratio/crossdata/common/serializers/RowSerializer.scala
@@ -16,6 +16,7 @@
package com.stratio.crossdata.common.serializers
import java.sql.Timestamp
+import java.math.{BigDecimal => JBigDecimal}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
@@ -50,6 +51,7 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case (_: DecimalType, v: JNumber) =>
v match {
case JInt(v) => Decimal(v.toString)
+ case JLong(v) => Decimal(v.toString)
case JDecimal(v) => Decimal(v)
case JDouble(v) => Decimal(v)
}
@@ -94,9 +96,16 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case (FloatType, v: Float) => JDouble(v)
case (DoubleType, v: Double) => JDouble(v)
case (LongType, v: Long) => JInt(v)
+ case (_: DecimalType, v: JBigDecimal) =>
+ import scala.collection.JavaConverters._
+ JDecimal(v)
+ case (_: DecimalType, v: BigDecimal) => JDecimal(v)
case (_: DecimalType, v: Decimal) => JDecimal(v.toBigDecimal)
+ case (_: DecimalType, v: String) => JDecimal(BigDecimal(v))
case (_: DecimalType, v: Double) => JDecimal(BigDecimal(v))
case (_: DecimalType, v: Float) => JDecimal(BigDecimal(v))
+ case (_: DecimalType, v: Long) => JDecimal(BigDecimal(v))
+ case (_: DecimalType, v: Int) => JDecimal(BigDecimal(v))
case (ByteType, v: Byte) => JInt(v.toInt)
case (BinaryType, v: Array[Byte]) => JString(new String(v))
case (BooleanType, v: Boolean) => JBool(v)
diff --git a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala
index cbf5018e1..ea0c406d8 100644
--- a/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala
+++ b/common/src/test/scala/com/stratio/crossdata/common/serializers/RowSerializerSpec.scala
@@ -117,8 +117,15 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
it should " be able to recover Double values when their schema type is misleading" in {
- val schema = StructType(List(StructField("decimaldouble", DecimalType(10,1),true)))
- val row = Row.fromSeq(Array(32.1))
+ val row = Row.fromSeq(
+ Array(32.0, 32.0F, BigDecimal(32.0), new java.math.BigDecimal(32.0), "32.0", 32L, 32)
+ )
+
+ val schema = StructType (
+ (0 until row.size) map { idx =>
+ StructField(s"decimaldouble$idx", DecimalType(10,1), true)
+ } toList
+ )
val formats = json4sJacksonFormats + new RowSerializer(schema)
@@ -126,7 +133,9 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
val extracted = parse(serialized, false).extract[Row](formats, implicitly[Manifest[Row]])
inside(extracted) {
- case r: Row => r.get(0) shouldBe Decimal(32.1)
+ case r: Row => r.toSeq foreach { cellValue =>
+ cellValue shouldBe Decimal(32.0)
+ }
}
}
diff --git a/core/pom.xml b/core/pom.xml
index ea790d65a..5367edd4a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -131,6 +131,16 @@
reference.conf
+
+
+ org.apache.curator
+ shaded.org.apache.curator
+
+
+ org.apache.zookeeper
+ shaded.org.apache.zookeeper
+
+
@@ -145,4 +155,3 @@
-
diff --git a/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/DerbyCatalog.scala b/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/DerbyCatalog.scala
index cc5fdabca..a360910ac 100644
--- a/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/DerbyCatalog.scala
+++ b/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/DerbyCatalog.scala
@@ -18,12 +18,11 @@ package org.apache.spark.sql.crossdata.catalog.persistent
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import com.stratio.crossdata.util.using
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.crossdata.CrossdataVersion
-import org.apache.spark.sql.crossdata.catalog.{IndexIdentifierNormalized, TableIdentifierNormalized, StringNormalized, XDCatalog, persistent}
+import org.apache.spark.sql.crossdata.catalog._
import scala.annotation.tailrec
-import scala.util.Try
// TODO refactor SQL catalog implementations
object DerbyCatalog {
diff --git a/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/PersistentCatalogWithCache.scala b/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/PersistentCatalogWithCache.scala
index 036b82fc8..88deb4cd5 100644
--- a/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/PersistentCatalogWithCache.scala
+++ b/core/src/main/scala/org/apache/spark/sql/crossdata/catalog/persistent/PersistentCatalogWithCache.scala
@@ -78,8 +78,8 @@ abstract class PersistentCatalogWithCache(catalystConf: CatalystConf) extends XD
throw new UnsupportedOperationException(msg)
} else {
logInfo(s"Persisting view ${viewIdentifier.unquotedString}")
- viewCache.put(viewIdentifier, plan)
persistViewMetadata(viewIdentifier, sqlText)
+ viewCache.put(viewIdentifier, plan)
}
}
@@ -91,8 +91,8 @@ abstract class PersistentCatalogWithCache(catalystConf: CatalystConf) extends XD
throw new UnsupportedOperationException(s"The table $tableIdentifier already exists")
} else {
logInfo(s"Persisting table ${crossdataTable.tableIdentifier.table}")
- tableCache.put(tableIdentifier, table)
persistTableMetadata(crossdataTable.copy(schema = Option(table.schema)))
+ tableCache.put(tableIdentifier, table)
}
}
@@ -105,8 +105,8 @@ abstract class PersistentCatalogWithCache(catalystConf: CatalystConf) extends XD
throw new UnsupportedOperationException(s"The index $indexIdentifier already exists")
} else {
logInfo(s"Persisting index ${crossdataIndex.indexIdentifier}")
- indexCache.put(crossdataIndex.tableIdentifier, crossdataIndex)
persistIndexMetadata(crossdataIndex)
+ indexCache.put(crossdataIndex.tableIdentifier, crossdataIndex)
}
}
diff --git a/docker/catalog-config.sh b/docker/catalog-config.sh
index 138891280..fae238e3d 100644
--- a/docker/catalog-config.sh
+++ b/docker/catalog-config.sh
@@ -1,35 +1,34 @@
#!/bin/bash -xe
+
+CATALOG_CLASS_PREFIX="org.apache.spark.sql.crossdata.catalog.persistent"
+
function jdbcCatalog() {
- crossdata_core_catalog_jdbc_driver=${1:-3306}
- crossdata_core_catalog_jdbc_url=$2
- crossdata_core_catalog_jdbc_name=$3
- crossdata_core_catalog_jdbc_user=$4
- crossdata_core_catalog_jdbc_pass=$5
+ export crossdata_core_catalog_jdbc_driver=$1
+ export crossdata_core_catalog_jdbc_url=$2
+ export crossdata_core_catalog_jdbc_name=$3
+ export crossdata_core_catalog_jdbc_user=$4
+ export crossdata_core_catalog_jdbc_pass=$5
}
function zookeeperCatalog() {
- crossdata_core_catalog_zookeeper_connectionString=${1:-localhost:2181}
- crossdata_core_catalog_zookeeper_connectionTimeout=${2:-15000}
- crossdata_core_catalog_zookeeper_sessionTimeout=${3:-60000}
- crossdata_core_catalog_zookeeper_retryAttempts=${4:-5}
- crossdata_core_catalog_zookeeper_retryInterval=${5:-10000}
+ export crossdata_core_catalog_zookeeper_connectionString=${1:-localhost:2181}
+ export crossdata_core_catalog_zookeeper_connectionTimeout=${2:-15000}
+ export crossdata_core_catalog_zookeeper_sessionTimeout=${3:-60000}
+ export crossdata_core_catalog_zookeeper_retryAttempts=${4:-5}
+ export crossdata_core_catalog_zookeeper_retryInterval=${5:-10000}
}
-if [$# > 0 ]; then
-if [ "x$1x" != "xx" ]; then
- crossdata_core_catalog_class="\"org.apache.spark.sql.crossdata.catalog.persistent.$1Catalog\""
- if [ "$1" == "MySQL" ]; then
- jdbcCatalog "org.mariadb.jdbc.Driver" ${XD_CATALOG_HOST} ${XD_CATALOG_DB_NAME} ${XD_CATALOG_DB_USER} ${XD_CATALOG_DB_PASS}
- fi
- if [ "$1" == "PostgreSQL" ]; then
- jdbcCatalog "org.postgresql.Driver" ${XD_CATALOG_HOST} ${XD_CATALOG_DB_NAME} ${XD_CATALOG_DB_USER} ${XD_CATALOG_DB_PASS}
- fi
- if [ "$1" == "Zookeeper" ]; then
- zookeeperCatalog ${XD_CATALOG_ZOOKEEPER_CONNECTION_STRING} ${XD_CATALOG_ZOOKEEPER_CONNECTION_TIMEOUT} ${XD_CATALOG_ZOOKEEPER_SESSION_TIMEOUT} ${XD_CATALOG_ZOOKEEPER_RETRY_ATTEMPS} ${XD_CATALOG_ZOOKEEPER_RETRY_INTERVAL}
- fi
- if [ "x$2x" != "xx" ]; then
- crossdata_core_catalog_prefix=${2:-crossdataCluster}
- fi
+export crossdata_core_catalog_class="${CATALOG_CLASS_PREFIX}.$1Catalog"
+if [ "$1" == "MySQL" ]; then
+ jdbcCatalog "org.mariadb.jdbc.Driver" ${XD_CATALOG_HOST} ${XD_CATALOG_DB_NAME} ${XD_CATALOG_DB_USER} ${XD_CATALOG_DB_PASS}
+fi
+if [ "$1" == "PostgreSQL" ]; then
+ jdbcCatalog "org.postgresql.Driver" ${XD_CATALOG_HOST} ${XD_CATALOG_DB_NAME} ${XD_CATALOG_DB_USER} ${XD_CATALOG_DB_PASS}
+fi
+if [ "$1" == "Zookeeper" ]; then
+ zookeeperCatalog ${XD_CATALOG_ZOOKEEPER_CONNECTION_STRING} ${XD_CATALOG_ZOOKEEPER_CONNECTION_TIMEOUT} ${XD_CATALOG_ZOOKEEPER_SESSION_TIMEOUT} ${XD_CATALOG_ZOOKEEPER_RETRY_ATTEMPS} ${XD_CATALOG_ZOOKEEPER_RETRY_INTERVAL}
+fi
+if [ "x$2x" != "xx" ]; then
+ export crossdata_core_catalog_prefix=${2:-crossdataCluster}
fi
-fi
\ No newline at end of file
diff --git a/docker/crossdata-config.sh b/docker/crossdata-config.sh
index 1e348979e..e68e03d17 100644
--- a/docker/crossdata-config.sh
+++ b/docker/crossdata-config.sh
@@ -21,12 +21,12 @@ function setDriverConfig() {
}
function standaloneConfig() {
- AKKAIP="akka.tcp://CrossdataServerCluster@${DOCKER_HOST}:13420"
+ export AKKAIP="akka.tcp://CrossdataServerCluster@${DOCKER_HOST}:13420"
#TODO: Test instead of XD_SEED : CROSSDATA_SERVER_AKKA_CLUSTER_SEED_NODES
if [ -z "$XD_SEED" ]; then
export CROSSDATA_SERVER_AKKA_CLUSTER_SEED_NODES=${AKKAIP}
else
- SEED_IP="akka.tcp://CrossdataServerCluster@${XD_SEED}:13420"
+ export SEED_IP="akka.tcp://CrossdataServerCluster@${XD_SEED}:13420"
export CROSSDATA_SERVER_AKKA_CLUSTER_SEED_NODES=${SEED_IP},${AKKAIP}
# TODO: Study whether it is worth of making hazelcast nodes available when auto discovery is disabled.
# If so, find a better way of editing hazelcast.xml. The method commented below is as flimsy as it gets.
@@ -43,9 +43,9 @@ function standaloneConfig() {
export CROSSDATA_SERVER_AKKA_REMOTE_NETTY_TCP_BIND_HOSTNAME=${DOCKER_HOST}
if [ -z "$XD_SEED" ]; then
- crossdata_driver_config_cluster_hosts="\[${DOCKER_HOST}:13420\]"
+ export crossdata_driver_config_cluster_hosts="\[${DOCKER_HOST}:13420\]"
else
- crossdata_driver_config_cluster_hosts="\[${DOCKER_HOST}:13420, ${XD_SEED}\]"
+ export crossdata_driver_config_cluster_hosts="\[${DOCKER_HOST}:13420, ${XD_SEED}\]"
fi
}
@@ -54,8 +54,8 @@ function marathonConfig() {
####################################################
#Memory
####################################################
- RAM_AVAIL=$(echo $MARATHON_APP_RESOURCE_MEM | cut -d "." -f1)
- CROSSDATA_JAVA_OPT="-Xmx${RAM_AVAIL}m -Xms${RAM_AVAIL}m"
+ export RAM_AVAIL=$(echo $MARATHON_APP_RESOURCE_MEM | cut -d "." -f1)
+ export CROSSDATA_JAVA_OPT="-Xmx${RAM_AVAIL}m -Xms${RAM_AVAIL}m"
sed -i "s|# CROSSDATA_LIB|#CROSSDATA_JAVA_OPTS\nCROSSDATA_JAVA_OPTS=\"${CROSSDATA_JAVA_OPT}\"\n# CROSSDATA_LIB|" /etc/sds/crossdata/server/crossdata-env.sh
#Spark UI port
@@ -63,43 +63,22 @@ function marathonConfig() {
########################################################################################################
- #If XD_EXTERNAL_IP and MARATHON_APP_LABEL_HAPROXY_1_PORT are not specified assume we are working in HTTP mode
- #Scenary: HAProxy exposing Akka http port, and creating an internal cluster using netty and autodiscovery through Zookeeper
+ # Working in HTTP mode
+ # Scenary: HAProxy exposing Akka http port, and creating an internal cluster using netty and
+ # autodiscovery through Zookeeper
########################################################################################################
- if [ -z ${XD_EXTERNAL_IP} ] && [ -z ${MARATHON_APP_LABEL_HAPROXY_1_PORT} ]; then
- setCrossdataDir ${HOST} ${PORT_13420}
- setCrossdataBindHost ${HOST} ${PORT_13420}
- setHazelcastConfig ${HOST} ${PORT_5701}
- setDriverConfig ${HOST} ${PORT_13420}
- # CROSSDATA_SERVER_CONFIG_HTTP_SERVER_PORT is set with the port provided by Marathon-LB
- export CROSSDATA_SERVER_CONFIG_HTTP_SERVER_PORT=$PORT_13422
- else
- #Scenary: HAProxy exposing the akka netty port with the external IP. Supported only for one instance of Crossdata
- if [ -z ${XD_EXTERNAL_IP} ] || [ -z ${MARATHON_APP_LABEL_HAPROXY_1_PORT} ]; then
- echo "ERROR: Env var XD_EXTERNAL_IP and label HAPROXY_1_PORT must be provided together using Marathon&Haproxy in TCP mode" 1>&2
- exit 1 # terminate and indicate error
- else
- #Hostname and port of haproxy
- setCrossdataDir ${XD_EXTERNAL_IP} ${MARATHON_APP_LABEL_HAPROXY_1_PORT}
- #Bind address for local
- setCrossdataBindHost ${DOCKER_HOST} ${PORT_13420}
- #Driver
- setDriverConfig ${XD_EXTERNAL_IP} ${MARATHON_APP_LABEL_HAPROXY_1_PORT}
- fi
- # When using ClusterClient External IP, the hosts-files get updated in order to keep a consistent
- # binding address in AKKA.
- NAMEADDR="$(hostname -i)"
- if [ -n "$HAPROXY_SERVER_INTERNAL_ADDRESS" ]; then
- NAMEADDR=$HAPROXY_SERVER_INTERNAL_ADDRESS
- fi
- echo -e "$NAMEADDR\t$XD_EXTERNAL_IP" >> /etc/hosts
- fi
+ setCrossdataDir ${HOST} ${PORT_13420}
+ setCrossdataBindHost ${HOST} ${PORT_13420}
+ setHazelcastConfig ${HOST} ${PORT_5701}
+ setDriverConfig ${HOST} ${PORT_13420}
+ # CROSSDATA_SERVER_CONFIG_HTTP_SERVER_PORT is set with the port provided by Marathon-LB
+ export CROSSDATA_SERVER_CONFIG_HTTP_SERVER_PORT=$PORT_13422
+
}
####################################################
## Main
####################################################
-
if [ -z ${MARATHON_APP_ID} ]; then
standaloneConfig
else
diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh
index 6d0fa0c13..af3bfe52b 100755
--- a/docker/docker-entrypoint.sh
+++ b/docker/docker-entrypoint.sh
@@ -2,15 +2,16 @@
DOCKER_HOST="hostname -f"
if [[ "$(hostname -f)" =~ \. ]]; then
- DOCKER_HOST="$(hostname -f)"
+ export DOCKER_HOST="$(hostname -f)"
else
- DOCKER_HOST="$(hostname -i)"
+ export DOCKER_HOST="$(hostname -i)"
fi
####################################################
## XD Catalog
####################################################
-source catalog-config.sh $XD_CATALOG $XD_CATALOG_PREFIX
+CROSSDATA_CATALOG=${XD_CATALOG:-Derby}
+source catalog-config.sh $CROSSDATA_CATALOG $XD_CATALOG_PREFIX
####################################################
@@ -23,7 +24,7 @@ source catalog-config.sh $XD_CATALOG $XD_CATALOG_PREFIX
####################################################
## Crossdata Config
####################################################
-source crossdata-config.sh
+source crossdata-config.sh $1
case "$SERVER_MODE" in
"shell") # This mode will launch a crossdata shell instead of a crossdata server
if [ "$SHELL_SERVERADDR" -a "$SHELL_SERVERPORT" ]; then
diff --git a/docker/shell-config.sh b/docker/shell-config.sh
index bae2bffcd..023d3d6a3 100644
--- a/docker/shell-config.sh
+++ b/docker/shell-config.sh
@@ -1,30 +1,30 @@
#!/bin/bash -xe
-CROSSDATA_DRIVER_CONFIG_HTTP_SERVER_HOST=$SHELL_SERVERADDR
-CROSSDATA_DRIVER_CONFIG_HTTP_SERVER_PORT=$SHELL_SERVERPORT
+export CROSSDATA_DRIVER_CONFIG_HTTP_SERVER_HOST=$SHELL_SERVERADDR
+export CROSSDATA_DRIVER_CONFIG_HTTP_SERVER_PORT=$SHELL_SERVERPORT
# Prepare options string from docker environment settings
-OPTIONS=""
+export OPTIONS=""
if [ -n "$SHELL_USER" ]; then
- OPTIONS="$OPTIONS --user $SHELL_USER"
+ export OPTIONS="$OPTIONS --user $SHELL_USER"
fi;
if [ "$CONNECTION_TIMEOUT" ]; then
- OPTIONS="$OPTIONS --timeout $CONNECTION_TIMEOUT"
+ export OPTIONS="$OPTIONS --timeout $CONNECTION_TIMEOUT"
fi;
-OPTIONS="$OPTIONS --http"
+export OPTIONS="$OPTIONS --http"
if [ "$SHELL_MODE" == "async" ]; then
- OPTIONS="$OPTIONS --async"
+ export OPTIONS="$OPTIONS --async"
fi;
if [ "$SHELL_CERTIFICATE_PATH" -a "$SHELL_CERTIFICATE_PASSWORD" -a "$SHELL_TRUSTSTORE_PATH" -a "$SHELL_TRUSTSTORE_PASSWORD" ]; then
- CROSSDATA_DRIVER_AKKA_HTTP_SSL_ENABLE="true"
- CROSSDATA_DRIVER_AKKA_HTTP_SSL_KEYSTORE=$SHELL_CERTIFICATE_PATH
- CROSSDATA_DRIVER_AKKA_HTTP_SSL_KEYSTORE_PASSWORD=$SHELL_CERTIFICATE_PASSWORD
- CROSSDATA_DRIVER_AKKA_HTTP_SSL_TRUSTSTORE=$SHELL_TRUSTSTORE_PATH
- CROSSDATA_DRIVER_AKKA_HTTP_SSL_TRUSTSTORE_PASSWORD=$SHELL_TRUSTSTORE_PASSWORD
+ export CROSSDATA_DRIVER_AKKA_HTTP_SSL_ENABLE="true"
+ export CROSSDATA_DRIVER_AKKA_HTTP_SSL_KEYSTORE=$SHELL_CERTIFICATE_PATH
+ export CROSSDATA_DRIVER_AKKA_HTTP_SSL_KEYSTORE_PASSWORD=$SHELL_CERTIFICATE_PASSWORD
+ export CROSSDATA_DRIVER_AKKA_HTTP_SSL_TRUSTSTORE=$SHELL_TRUSTSTORE_PATH
+ export CROSSDATA_DRIVER_AKKA_HTTP_SSL_TRUSTSTORE_PASSWORD=$SHELL_TRUSTSTORE_PASSWORD
fi
\ No newline at end of file
diff --git a/docker/streaming-config.sh b/docker/streaming-config.sh
index b3fa93dad..2fa06ed58 100644
--- a/docker/streaming-config.sh
+++ b/docker/streaming-config.sh
@@ -1,7 +1,7 @@
#!/bin/bash -xe
sed -i "s|//crossdata-core.streaming*|crossdata-core.streaming|" /etc/sds/crossdata/server/core-application.conf
-crossdata_core_catalog_zookeeper_connectionString=$1
-crossdata_core_streaming_receiver_zk_connection=$2
-crossdata_core_streaming_receiver_kafka_connection=$3
-crossdata_core_streaming_spark_master=$4
\ No newline at end of file
+export crossdata_core_catalog_zookeeper_connectionString=$1
+export crossdata_core_streaming_receiver_zk_connection=$2
+export crossdata_core_streaming_receiver_kafka_connection=$3
+export crossdata_core_streaming_spark_master=$4
\ No newline at end of file
diff --git a/events.csv/._SUCCESS.crc b/events.csv/._SUCCESS.crc
new file mode 100644
index 000000000..3b7b04493
Binary files /dev/null and b/events.csv/._SUCCESS.crc differ
diff --git a/events.csv/.part-00000.crc b/events.csv/.part-00000.crc
new file mode 100644
index 000000000..9c46cabe4
Binary files /dev/null and b/events.csv/.part-00000.crc differ
diff --git a/events.csv/_SUCCESS b/events.csv/_SUCCESS
new file mode 100644
index 000000000..e69de29bb
diff --git a/events.csv/part-00000 b/events.csv/part-00000
new file mode 100644
index 000000000..653ed6980
--- /dev/null
+++ b/events.csv/part-00000
@@ -0,0 +1,11 @@
+ident,money
+5,15.2
+1,11.2
+8,18.2
+0,10.2
+2,12.2
+4,14.2
+7,17.2
+6,16.2
+9,19.2
+3,13.2
diff --git a/scripts/dockerGen/Dockerfile b/scripts/dockerGen/Dockerfile
new file mode 100644
index 000000000..1b8bc909b
--- /dev/null
+++ b/scripts/dockerGen/Dockerfile
@@ -0,0 +1,24 @@
+FROM qa.stratio.com/stratio/ubuntu-base:16.04
+MAINTAINER Stratio Crossdata team "crossdata@stratio.com"
+
+# USAGE: build --build-arg PKG= -t
+# USAGE Example: docker build --build-arg PKG=1.8.0-RC2-SNAPSHOT -t crossdata-enterprise .
+
+COPY dockerfiles/* /
+
+VOLUME /usr/lib/mesos
+
+ARG PKG
+
+RUN wget -q "http://apt.repository.stratio.com/pool/trusty/1.7/main/stratio-release_1.0.0_all.deb" \
+ && dpkg -i stratio-release_1.0.0_all.deb \
+ && rm -rf stratio-release_1.0.0_all.deb \
+ && apt-get update \
+ && ./dependencyfix.sh stratio-crossdata-mesosphere-scala211-${PKG}.all.deb \
+ && dpkg -i stratio-crossdata-mesosphere-scala211-${PKG}.all.deb
+
+ENTRYPOINT ["/docker-entrypoint.sh"]
+ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk-amd64
+
+CMD tail -f /var/log/*
+
diff --git a/scripts/dockerGen/commons.sh b/scripts/dockerGen/commons.sh
new file mode 100644
index 000000000..acb861f52
--- /dev/null
+++ b/scripts/dockerGen/commons.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+function changedir() {
+ cd $1
+ echo "Current dir: $PWD"
+}
+
+#Get Crossdata version from parent pom
+tmp1=`grep -m2 "" ../../pom.xml | tail -n1`
+tmp2=${tmp1//}
+tmp3=${tmp2/<\/version>/}
+XD_VERSION=${tmp3// }
diff --git a/scripts/dockerGen/dockerGen.sh b/scripts/dockerGen/dockerGen.sh
new file mode 100755
index 000000000..f6318c68f
--- /dev/null
+++ b/scripts/dockerGen/dockerGen.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+
+. commons.sh
+
+echo " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "
+echo " >>> NOTE: .deb PACKAGE MUST BE UP-TO-DATE! >>> "
+echo " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "
+echo " "
+
+echo " >>> Executing script for generation Crossdata $XD_VERSION Docker"
+echo " "
+
+debFile="../../dist/target/2.11/stratio-crossdata-mesosphere-scala211-$XD_VERSION.all.deb"
+
+if [[ ! -f $debFile ]]; then
+ echo "$debFile must be created previously"
+ exit 1
+fi
+
+mkdir dockerfiles
+
+echo " >>> Copying $debFile"
+cp $debFile dockerfiles
+
+echo " >>> Copying docker scripts"
+cp ../../docker/* dockerfiles
+
+echo " >>> Building Crossdata $XD_VERSION Docker"
+docker build --build-arg PKG=$XD_VERSION -t crossdata-enterprise:$XD_VERSION .
+echo " >>> Crossdata $XD_VERSION Docker generated"
+
+echo " >>> Cleaning some stuff..."
+rm -rf dockerfiles
+
+echo " >>> Start Crossdata Docker with the Gosec security Manager: docker run [OPTIONS] crossdata-enterprise[:TAG] [COMMAND]"
+echo " >>> Start Crossdata Docker with the default security Manager: docker run [OPTIONS] crossdata-enterprise[:TAG] [COMMAND] skipSecManager"
diff --git a/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala b/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala
index 517752338..19f61b514 100644
--- a/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala
+++ b/server/src/main/scala/com/stratio/crossdata/server/CrossdataServer.scala
@@ -22,18 +22,18 @@ import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import akka.actor.{ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.client.ClusterClientReceptionist
+import akka.cluster.pubsub.DistributedPubSub
+import akka.cluster.pubsub.DistributedPubSubMediator.Put
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.routing.{DefaultResizer, RoundRobinPool}
import akka.stream.{ActorMaterializer, TLSClientAuth}
-import akka.cluster.pubsub.DistributedPubSub
-import akka.cluster.pubsub.DistributedPubSubMediator.Put
import com.stratio.crossdata.common.security.KeyStoreUtils
import com.stratio.crossdata.common.util.akka.keepalive.KeepAliveMaster
import com.stratio.crossdata.server.actors.{ResourceManagerActor, ServerActor}
import com.stratio.crossdata.server.config.ServerConfig
import com.stratio.crossdata.server.discovery.{ServiceDiscoveryConfigHelper => SDCH, ServiceDiscoveryHelper => SDH}
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.ConfigFactory
import org.apache.log4j.Logger
import org.apache.spark.sql.crossdata
import org.apache.spark.sql.crossdata.session.{BasicSessionProvider, HazelcastSessionProvider}
diff --git a/server/src/main/scala/com/stratio/crossdata/server/ServiceDiscoveryProvider.scala b/server/src/main/scala/com/stratio/crossdata/server/ServiceDiscoveryProvider.scala
index d67f22952..177843516 100644
--- a/server/src/main/scala/com/stratio/crossdata/server/ServiceDiscoveryProvider.scala
+++ b/server/src/main/scala/com/stratio/crossdata/server/ServiceDiscoveryProvider.scala
@@ -18,17 +18,18 @@ package com.stratio.crossdata.server
import java.util.UUID
import java.util.concurrent.TimeUnit
-import akka.actor.{ActorSystem, Address}
+import akka.actor.{ActorSystem, Address, Cancellable}
import akka.cluster.Cluster
import com.hazelcast.config.{XmlConfigBuilder, Config => HzConfig}
import com.stratio.crossdata.server.discovery.{ServiceDiscoveryConfigHelper => SDCH, ServiceDiscoveryHelper => SDH}
-import com.typesafe.config.ConfigValueFactory
+import com.typesafe.config.{Config, ConfigValueFactory}
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.utils.ZKPaths
import org.apache.log4j.Logger
import org.apache.spark.sql.crossdata.session.{HazelcastSessionProvider, XDSessionProvider}
+import org.apache.zookeeper.data.Stat
import scala.collection.JavaConversions._
import scala.concurrent.duration.FiniteDuration
@@ -45,14 +46,17 @@ trait ServiceDiscoveryProvider {
protected val hzConfig: HzConfig = new XmlConfigBuilder().build()
protected[crossdata] var sessionProviderOpt: Option[XDSessionProvider] = None //TODO Remove [crossdata]
+ /**
+ * Get public address according to the initial configuration of the servers' cluster.
+ *
+ */
private def getLocalSeed: String =
s"${Try(serverConfig.getString("akka.remote.netty.tcp.hostname")).getOrElse("127.0.0.1")}:${Try(serverConfig.getInt("akka.remote.netty.tcp.port")).getOrElse("13420")}"
- private def getLocalSeed(xdCluster: Cluster): String = {
- val selfAddress = xdCluster.selfAddress
- s"${selfAddress.host.getOrElse("127.0.0.1")}:${selfAddress.port.getOrElse("13420")}"
- }
-
+ /**
+ * Get public address according to the initial configuration of the service provider.
+ *
+ */
private def getLocalMember: String = {
val defaultAddr = "127.0.0.1"
val defaultPort = "5701"
@@ -63,13 +67,12 @@ trait ServiceDiscoveryProvider {
} getOrElse s"$defaultAddr:$defaultPort"
}
- private def getLocalMember(hsp: HazelcastSessionProvider): String = {
- val selfAddress = hsp.gelLocalMember.getAddress
- s"${selfAddress.getHost}:${selfAddress.getPort}"
- }
-
-
- protected def startServiceDiscovery(sdch: SDCH) = {
+ /**
+ * It starts the subscription to the Crossdata cluster according to the initial configuration
+ * and the information provided in the remote server of the service discovery.
+ *
+ */
+ protected def startServiceDiscovery(sdch: SDCH): SDH = {
// Start ZK connection
val curatorClient = startDiscoveryClient(sdch)
@@ -85,10 +88,8 @@ trait ServiceDiscoveryProvider {
}
/**
- * Create and start the curator client
+ * Create and start the curator client.
*
- * @param sdConfig
- * @return
*/
private def startDiscoveryClient(sdConfig: SDCH): CuratorFramework = {
val curatorClient = CuratorFrameworkFactory.newClient(
@@ -102,13 +103,11 @@ trait ServiceDiscoveryProvider {
}
/**
- * Waiting to try to be the leader???
+ * Non-blocking call to acquire cluster leadership. In every moment, there is only one cluster leader.
+ * This cluster leader updates the list of current members of the cluster every x seconds (300 by default).
*
- * @param dClient
- * @param sdc
- * @return
*/
- private def requestClusterLeadership(dClient: CuratorFramework, sdc: SDCH) = {
+ private def requestClusterLeadership(dClient: CuratorFramework, sdc: SDCH): Future[Unit] = {
val cLeaderPath = sdc.getOrElse(SDCH.ClusterLeaderPath, SDCH.DefaultClusterLeaderPath)
logger.debug(s"Service discovery - cluster leadership path: $cLeaderPath")
@@ -139,15 +138,12 @@ trait ServiceDiscoveryProvider {
leadershipFuture
}
-
/**
- * Trying to get who is the leader???
+ * Wait until subscription leadership is acquired in order to: write down this node as part of the seeds
+ * and join to the cluster.
*
- * @param dClient
- * @param sdc
- * @return
*/
- private def requestSubscriptionLeadership(dClient: CuratorFramework, sdc: SDCH) = {
+ private def requestSubscriptionLeadership(dClient: CuratorFramework, sdc: SDCH): Try[LeaderLatch] = {
val sLeaderPath = sdc.getOrElse(SDCH.SubscriptionPath, SDCH.DefaultSubscriptionPath)
@@ -171,7 +167,12 @@ trait ServiceDiscoveryProvider {
}
}
- private def generateFinalConfig(dClient: CuratorFramework, sdc: SDCH) = {
+ /**
+ * Generate contact points in the config according to the content of the remote server of the service discovery.
+ * In addition, it adds itself to the content of the contact points before generating the final config.
+ *
+ */
+ private def generateFinalConfig(dClient: CuratorFramework, sdc: SDCH): (Config, HzConfig) = {
val pathForSeeds = sdc.getOrElse(SDCH.SeedsPath, SDCH.DefaultSeedsPath)
logger.debug(s"Service Discovery - seeds path: $pathForSeeds")
@@ -182,16 +183,18 @@ trait ServiceDiscoveryProvider {
val localSeed = getLocalSeed
ZKPaths.mkdirs(dClient.getZookeeperClient.getZooKeeper, pathForSeeds)
val currentSeeds = new String(dClient.getData.forPath(pathForSeeds))
- val newSeeds = (Set(localSeed) ++ currentSeeds.split(",").toSet).map(m => m.trim).filter(_.nonEmpty)
+ val newSeeds = (localSeed +: currentSeeds.split(",")).map(m => m.trim).filter(_.nonEmpty)
dClient.setData.forPath(pathForSeeds, newSeeds.mkString(",").getBytes)
+ logger.info(s"Service discovery config - Cluster seeds: ${newSeeds.mkString(",")}")
+
val protocol = s"akka.${
if (Try(serverConfig.getBoolean("akka.remote.netty.ssl.enable-ssl")).getOrElse(false)) "ssl." else ""
}tcp"
val modifiedAkkaConfig = serverConfig.withValue(
"akka.cluster.seed-nodes",
- ConfigValueFactory.fromIterable(newSeeds.map { s =>
+ ConfigValueFactory.fromIterable(newSeeds.toSeq.map { s =>
val hostPort = s.split(":")
new Address(protocol,
serverConfig.getString("config.cluster.name"),
@@ -206,11 +209,13 @@ trait ServiceDiscoveryProvider {
val currentMembers = new String(dClient.getData.forPath(pathForMembers))
val newMembers = (if (localMember.split(":").head != "127.0.0.1") {
- currentMembers.split(",").toSet + localMember
+ localMember +: currentMembers.split(",")
} else {
- Set(localMember)
+ Array(localMember)
}).map(m => m.trim).filter(_.nonEmpty)
+ logger.info(s"Service discovery config - Provider members: ${newMembers.mkString(",")}")
+
dClient.setData.forPath(pathForMembers, newMembers.mkString(",").getBytes)
val modifiedHzConfig = hzConfig.setNetworkConfig(
hzConfig.getNetworkConfig.setJoin(
@@ -221,24 +226,26 @@ trait ServiceDiscoveryProvider {
(modifiedAkkaConfig, modifiedHzConfig)
}
- private def updateClusterMembers(h: SDH, hsp: HazelcastSessionProvider) = {
-
- val pathForMembers = h.sdch.getOrElse(SDCH.ProviderPath, SDCH.DefaultProviderPath)
- ZKPaths.mkdirs(h.curatorClient.getZookeeperClient.getZooKeeper, pathForMembers)
-
- val updatedMembers = Set(getLocalMember(hsp)) ++ sessionProviderOpt.map {
- case hzSP: HazelcastSessionProvider =>
- hzSP.getHzMembers.to[Set].map { m =>
- s"${m.getAddress.getHost}:${m.getAddress.getPort}"
- }
- case _ => Set.empty
- }.getOrElse(Set.empty)
+ /**
+ * It creates a scheduled task (every x seconds, 300 by default) that updates the members of the cluster
+ * on the remote server of the server discovery.
+ *
+ */
+ protected def updateServiceDiscovery(xCluster: Cluster, hsp: HazelcastSessionProvider, s: SDH, aSystem: ActorSystem): Cancellable = {
+ val delay = new FiniteDuration(
+ s.sdch.getOrElse(SDCH.ClusterDelayPath, SDCH.DefaultClusterDelay.toString).toLong, TimeUnit.SECONDS)
- logger.info(s"Updating members: ${updatedMembers.mkString(",")}")
- h.curatorClient.setData.forPath(pathForMembers, updatedMembers.mkString(",").getBytes)
+ import scala.concurrent.ExecutionContext.Implicits.global
+ aSystem.scheduler.schedule(delay, delay)(updateSeeds(xCluster, hsp, s))
}
- private def updateSeeds(xCluster: Cluster, hsp: HazelcastSessionProvider, h: SDH) = {
+ /**
+ * It acquires the subscription leadership (in order to avoid race conditions with new members
+ * joining to the cluster at the same time) and triggers the methods to update the contact points for
+ * the members of the Crossdata cluster and the members of the service provider.
+ *
+ */
+ private def updateSeeds(xCluster: Cluster, hsp: HazelcastSessionProvider, h: SDH): Unit = {
val sll = new LeaderLatch(h.curatorClient, h.sdch.getOrElse(SDCH.SubscriptionPath, SDCH.DefaultSubscriptionPath))
sll.start
sll.await
@@ -247,22 +254,46 @@ trait ServiceDiscoveryProvider {
sll.close
}
- private def updateClusterSeeds(xCluster: Cluster, h: SDH) = {
- val currentSeeds = xCluster.state.members.filter(_.roles.contains("server")).map(
- m => s"${m.address.host.getOrElse("127.0.0.1")}:${m.address.port.getOrElse("13420")}") + getLocalSeed(xCluster)
+ /**
+ * Overrides the cluster seeds on the remote server of the service discovery according to the cluster state.
+ *
+ */
+ private def updateClusterSeeds(xCluster: Cluster, h: SDH): Seq[String] = {
+ val currentSeeds: Set[String] = xCluster.state.members.filter(_.roles.contains("server")).map(
+ m => s"${m.address.host.getOrElse("127.0.0.1")}:${m.address.port.getOrElse("13420")}")
+ val newSeeds: Seq[String] = (getLocalSeed +: currentSeeds.toSeq) distinct
val pathForSeeds = h.sdch.getOrElse(SDCH.SeedsPath, SDCH.DefaultSeedsPath)
ZKPaths.mkdirs(h.curatorClient.getZookeeperClient.getZooKeeper, pathForSeeds)
- logger.info(s"Updating seeds: ${currentSeeds.mkString(",")}")
- h.curatorClient.setData.forPath(pathForSeeds, currentSeeds.mkString(",").getBytes)
- currentSeeds
+ val newSeedsStr = newSeeds.mkString(",")
+ logger.info(s"Updating seeds: $newSeedsStr")
+ h.curatorClient.setData.forPath(pathForSeeds, newSeedsStr.getBytes)
+ newSeeds
}
- protected def updateServiceDiscovery(xCluster: Cluster, hsp: HazelcastSessionProvider, s: SDH, aSystem: ActorSystem) = {
- val delay = new FiniteDuration(
- s.sdch.getOrElse(SDCH.ClusterDelayPath, SDCH.DefaultClusterDelay.toString).toLong, TimeUnit.SECONDS)
+ /**
+ * Overrides the service provider members on the remote server of the service discovery according to
+ * the current members.
+ *
+ */
+ private def updateClusterMembers(h: SDH, hsp: HazelcastSessionProvider): Seq[String] = {
- import scala.concurrent.ExecutionContext.Implicits.global
- aSystem.scheduler.schedule(delay, delay)(updateSeeds(xCluster, hsp, s))
+ val pathForMembers = h.sdch.getOrElse(SDCH.ProviderPath, SDCH.DefaultProviderPath)
+ ZKPaths.mkdirs(h.curatorClient.getZookeeperClient.getZooKeeper, pathForMembers)
+
+ val updatedMembers = getLocalMember +: sessionProviderOpt.map {
+ case hzSP: HazelcastSessionProvider =>
+ hzSP.getHzMembers.to[Seq].map { m =>
+ s"${m.getAddress.getHost}:${m.getAddress.getPort}"
+ }
+ case _ => Seq.empty
+ }.getOrElse(Seq.empty)
+
+ val newMembers = updatedMembers distinct
+ val newMembersStr = newMembers.mkString(",")
+
+ logger.info(s"Updating members: $newMembersStr")
+ h.curatorClient.setData.forPath(pathForMembers, newMembersStr.getBytes)
+ newMembers
}
}