From 6d0f807348fa2c5fb2df2c0ce0f20cc48a9ebfee Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Sun, 3 Apr 2016 16:25:29 +0200 Subject: [PATCH 1/2] some reading properties fixed --- .../stratio/datasource/mongodb/config/MongodbConfig.scala | 8 ++++---- .../mongodb/partitioner/MongodbPartitioner.scala | 3 ++- .../stratio/datasource/mongodb/reader/MongodbReader.scala | 3 ++- .../datasource/mongodb/writer/MongodbBatchWriter.scala | 3 ++- .../main/scala/com/stratio/datasource/util/Config.scala | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala index d95be7b..c1e759e 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala @@ -84,8 +84,8 @@ object MongodbConfig { val DefaultSplitSize = 10 val DefaultSplitKey = "_id" val DefaultConnectionsTime = 120000L - val DefaultCursorBatchSize = 101 - val DefaultBulkBatchSize = 1000 + val DefaultCursorBatchSize = "101" + val DefaultBulkBatchSize = "1000" val DefaultIdAsObjectId = "true" /** @@ -105,7 +105,7 @@ object MongodbConfig { val optionalProperties: List[String] = List(Credentials,SSLOptions, UpdateFields) (properties /: optionalProperties){ - /** We will assume credentials are provided like 'user,database,password;user,database,password;...' */ + /** We will assume credentials are provided like 'user,database,password;...;user,database,password' */ case (properties,Credentials) => parameters.get(Credentials).map{ credentialInput => val credentials = credentialInput.split(";").map(_.split(",")).toList @@ -122,7 +122,7 @@ object MongodbConfig { properties + (SSLOptions -> ssloptions) } getOrElse properties - /** We will assume fields are provided like 'user,database,password...' */ + /** We will assume fields are provided like 'field1,field2,...,fieldN' */ case (properties, UpdateFields) => { parameters.get(UpdateFields).map{ updateInputs => val updateFields = updateInputs.split(",") diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/partitioner/MongodbPartitioner.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/partitioner/MongodbPartitioner.scala index 610d126..6020bcf 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/partitioner/MongodbPartitioner.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/partitioner/MongodbPartitioner.scala @@ -55,7 +55,8 @@ class MongodbPartitioner(config: Config) extends Partitioner[MongodbPartition] { private val connectionsTime = config.get[String](MongodbConfig.ConnectionsTime).map(_.toLong) - private val cursorBatchSize = config.getOrElse[Int](MongodbConfig.CursorBatchSize, MongodbConfig.DefaultCursorBatchSize) + private val cursorBatchSize = + config.getOrElse[String](MongodbConfig.CursorBatchSize, MongodbConfig.DefaultCursorBatchSize).toInt override def computePartitions(): Array[MongodbPartition] = { val mongoClient = MongodbClientFactory.getClient(hosts, credentials, ssloptions, clientOptions) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala index 0496ba6..2c7952d 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/reader/MongodbReader.scala @@ -45,7 +45,8 @@ class MongodbReader(config: Config, private var dbCursor: Option[MongoCursorBase] = None - private val batchSize = config.getOrElse[Int](MongodbConfig.CursorBatchSize, MongodbConfig.DefaultCursorBatchSize) + private val batchSize = + config.getOrElse[String](MongodbConfig.CursorBatchSize, MongodbConfig.DefaultCursorBatchSize).toInt private val connectionsTime = config.get[String](MongodbConfig.ConnectionsTime).map(_.toLong) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/writer/MongodbBatchWriter.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/writer/MongodbBatchWriter.scala index 2f8e032..1e6b074 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/writer/MongodbBatchWriter.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/writer/MongodbBatchWriter.scala @@ -29,7 +29,8 @@ class MongodbBatchWriter(config: Config) extends MongodbWriter(config) { private val IdKey = "_id" - private val bulkBatchSize = config.getOrElse[Int](MongodbConfig.BulkBatchSize, MongodbConfig.DefaultBulkBatchSize) + private val bulkBatchSize = + config.getOrElse[String](MongodbConfig.BulkBatchSize, MongodbConfig.DefaultBulkBatchSize).toInt private val pkConfig: Option[Array[String]] = config.get[Array[String]](MongodbConfig.UpdateFields) diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala index bb312b5..ee0b609 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/util/Config.scala @@ -102,7 +102,7 @@ trait Config extends Serializable { val properties: Map[Property, Any] /** Returns the value associated with a key, or a default value if the key is not contained in the configuration object. - + * Generic is required in order to get the correct type of the property, in any case for a type conversion. * @param key Desired property. * @param default Value in case no binding for `key` is found in the map. * @tparam T Result type of the default computation. From 2ec3c04c2599022bdc944e0c2130060443648701 Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Sun, 3 Apr 2016 23:01:47 +0200 Subject: [PATCH 2/2] Property names changed to lower case to avoid issue with CaseInsensitiveMap in ResolvedDataSource class --- CHANGELOG.md | 6 ++- doc/src/site/sphinx/First_Steps.rst | 38 +++++++++---------- .../mongodb/config/MongodbConfig.scala | 38 +++++++++---------- 3 files changed, 43 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62d93d4..43afb28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog -## 0.11.1 (? 2016) +## 0.11.2 (? 2016) + +* Property names changed to lower case + +## 0.11.1 (March 2016) * Max and Min splitVector bounds for not sharded collections (see doc) * Config parameter renamed idasobjectid -> idAsObjectId diff --git a/doc/src/site/sphinx/First_Steps.rst b/doc/src/site/sphinx/First_Steps.rst index b0ef756..9031961 100644 --- a/doc/src/site/sphinx/First_Steps.rst +++ b/doc/src/site/sphinx/First_Steps.rst @@ -89,47 +89,47 @@ Configuration parameters +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | collection | "collectionName" | Yes | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| schema_samplingRatio | 1.0 | No | +| schema_samplingratio | 1.0 | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| writeConcern | mongodb.WriteConcern.ACKNOWLEDGED | No | +| writeconcern | mongodb.WriteConcern.ACKNOWLEDGED | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| splitSize | 10 | No | +| splitsize | 10 | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| splitKey | "fieldName" | No | +| splitkey | "fieldName" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| splitKeyType | "dataTypeName" | No | +| splitkeytype | "dataTypeName" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| splitKeyMin | "minvalue" | No | +| splitkeymin | "minvalue" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| splitKeyMax | "maxvalue" | No | +| splitkeymax | "maxvalue" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | credentials | "user,database,password;user,database,password" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| updateFields | "fieldName,fieldName" | No | +| updatefields | "fieldName,fieldName" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| sslOptions | "/path/keystorefile,keystorepassword,/path/truststorefile,truststorepassword" | No | +| ssloptions | "/path/keystorefile,keystorepassword,/path/truststorefile,truststorepassword" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| readPreference | "nearest" | No | +| readpreference | "nearest" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ | language | "en" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| connectTimeout | "10000" | No | +| connecttimeout | "10000" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| connectionsPerHost | "100" | No | +| connectionsperhost | "100" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| maxWaitTime | "10000" | No | +| maxwaittime | "10000" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| socketTimeout | "1000" | No | +| sockettimeout | "1000" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| threadsAllowedToBlockForConnectionMultiplier | "5" | No | +| threadsallowedtoblockforconnectionmultiplier | "5" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| idAsObjectId | "false" | No | +| idasobjectid | "false" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| connectionsTime | "180000" | No | +| connectionstime | "180000" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| cursorBatchSize | "101" | No | +| cursorbatchsize | "101" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ -| bulkBatchSize | "1000" | No | +| bulkbatchsize | "1000" | No | +-----------------------------------------------+--------------------------------------------------------------------------------+-------------------------+ diff --git a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala index c1e759e..ff28063 100644 --- a/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala +++ b/spark-mongodb/src/main/scala/com/stratio/datasource/mongodb/config/MongodbConfig.scala @@ -29,27 +29,27 @@ object MongodbConfig { val Host = "host" val Database = "database" val Collection = "collection" - val SSLOptions = "sslOptions" - val ReadPreference = "readPreference" - val ConnectTimeout = "connectTimeout" - val ConnectionsPerHost = "connectionsPerHost" - val MaxWaitTime = "maxWaitTime" - val SocketTimeout = "socketTimeout" - val ThreadsAllowedToBlockForConnectionMultiplier = "threadsAllowedToBlockForConnectionMultiplier" - val WriteConcern = "writeConcern" + val SSLOptions = "ssloptions" + val ReadPreference = "readpreference" + val ConnectTimeout = "connecttimeout" + val ConnectionsPerHost = "connectionsperhost" + val MaxWaitTime = "maxwaittime" + val SocketTimeout = "sockettimeout" + val ThreadsAllowedToBlockForConnectionMultiplier = "threadsallowedtoblockforconnectionmultiplier" + val WriteConcern = "writeconcern" val Credentials = "credentials" - val SamplingRatio = "schema_samplingRatio" - val SplitSize = "splitSize" - val SplitKey = "splitKey" - val SplitKeyType = "splitKeyType" - val SplitKeyMin = "splitKeyMin" - val SplitKeyMax = "splitKeyMax" - val UpdateFields = "updateFields" + val SamplingRatio = "schema_samplingratio" + val SplitSize = "splitsize" + val SplitKey = "splitkey" + val SplitKeyType = "splitkeytype" + val SplitKeyMin = "splitkeymin" + val SplitKeyMax = "splitkeymax" + val UpdateFields = "updatefields" val Language = "language" - val ConnectionsTime = "connectionsTime" - val CursorBatchSize = "cursorBatchSize" - val BulkBatchSize = "bulkBatchSize" - val IdAsObjectId = "idAsObjectId" + val ConnectionsTime = "connectionstime" + val CursorBatchSize = "cursorbatchsize" + val BulkBatchSize = "bulkbatchsize" + val IdAsObjectId = "idasobjectid" // List of parameters for mongoClientOptions val ListMongoClientOptions = List(