Skip to content
18 changes: 10 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
name := "sqsmock"

version := "0.3.2"
version := "0.3.3-SNAPSHOT"

organization := "io.findify"

scalaVersion := "2.11.8"
scalaVersion := "2.12.3"

val akkaVersion = "2.4.9"
val akkaVersion = "2.5.1"

crossScalaVersions := Seq("2.12.3", "2.11.8")

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
"org.scala-lang.modules" %% "scala-xml" % "1.0.5",
"com.typesafe.akka" %% "akka-http" % "10.0.6",
"org.scala-lang.modules" %% "scala-xml" % "1.0.6",
"joda-time" % "joda-time" % "2.9.4",
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.32" % "test"
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"com.amazonaws" % "aws-java-sdk-sqs" % "1.11.126" % "test"
)

licenses += ("MIT", url("https://opensource.org/licenses/MIT"))

bintrayOrganization := Some("findify")

parallelExecution in Test := false
parallelExecution in Test := false
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 0.13.10
sbt.version = 0.13.15
28 changes: 20 additions & 8 deletions src/main/scala/io/findify/sqsmock/SQSBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@ import scala.collection.mutable
class SQSBackend(account:Long, port:Int, system:ActorSystem) {
val log = Logger(this.getClass, "sqs_backend")
val queueCache = mutable.Map[String, QueueCache]()
val createQueueWorker = new CreateQueueWorker(account, port, queueCache, system)
val sendMessageWorker = new SendMessageWorker(account, queueCache, system)
val receiveMessageWorker = new ReceiveMessageWorker(account, queueCache, system)

val createQueueWorker = new CreateQueueWorker(account, port, queueCache, system)
val sendMessageWorker = new SendMessageWorker(account, queueCache, system)
val receiveMessageWorker = new ReceiveMessageWorker(account, queueCache, system)
val sendMessageBatchWorker = new SendMessageBatchWorker(account, queueCache, system)
val deleteMessageWorker = new DeleteMessageWorker(account, queueCache, system)
val deleteMessageWorker = new DeleteMessageWorker(account, queueCache, system)
val listQueuesWorker = new ListQueuesWorker(account, queueCache, system)
val getQueueUrlWorker = new GetQueueUrlWorker(account, queueCache, system)
val getQueueAttributesWorker = new GetQueueAttributesWorker(account, queueCache, system)
val deleteMessageBatchWorker = new DeleteMessageBatchWorker(account, queueCache, system)
val deleteQueueUrlWorker = new DeleteQueueUrlWorker(account, queueCache, system)

def process(fields:Map[String,String]) = {
log.debug(s"processing request for fields $fields")
fields.get("Action") match {
case Some("SendMessage") => sendMessageWorker.process(fields)
case Some("SendMessage") => sendMessageWorker.process(fields)
case Some("SendMessageBatch") => sendMessageBatchWorker.process(fields)
case Some("ReceiveMessage") => receiveMessageWorker.process(fields)
case Some("CreateQueue") => createQueueWorker.process(fields)
case Some("DeleteMessage") => deleteMessageWorker.process(fields)
case Some("ReceiveMessage") => receiveMessageWorker.process(fields)
case Some("CreateQueue") => createQueueWorker.process(fields)
case Some("DeleteMessage") => deleteMessageWorker.process(fields)
case Some("ListQueues") => listQueuesWorker.process(fields)
case Some("GetQueueUrl") => getQueueUrlWorker.process(fields)
case Some("GetQueueAttributes") => getQueueAttributesWorker.process(fields)
case Some("DeleteMessageBatch") => deleteMessageBatchWorker.process(fields)
case Some("DeleteQueue") => deleteQueueUrlWorker.process(fields)
case _ => HttpResponse(StatusCodes.BadRequest, entity = ErrorResponse("Sender", "InvalidParameterValue", "operation not supported").toXML.toString())
}
}
Expand Down
32 changes: 18 additions & 14 deletions src/main/scala/io/findify/sqsmock/SQSService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.collection.JavaConversions._

object SQSService {
def main(args: Array[String]) {
val sqs = new SQSService(8001, 1)
sqs.start()
sqs.block
}

val config = ConfigFactory.parseMap(Map("akka.http.parsing.illegal-header-warnings" -> "off"))
}

/**
* Created by shutty on 3/29/16.
*/
class SQSService(port:Int, account:Int = 1) {
val config = ConfigFactory.parseMap(Map("akka.http.parsing.illegal-header-warnings" -> "off"))
implicit val system = ActorSystem.create("sqsmock", config)
def start():Unit = {
class SQSService(port:Int, account:Int = 1)(implicit system: ActorSystem = ActorSystem.create("sqsmock", SQSService.config)) {

private var bind: Http.ServerBinding = _

def start() = {
val log = Logger(system.getClass, "sqs_client")
implicit val mat = ActorMaterializer()
val http = Http(system)
Expand All @@ -48,17 +59,10 @@ class SQSService(port:Int, account:Int = 1) {
}
}
}
Await.result(http.bindAndHandle(route, "localhost", 8001), Duration.Inf)
bind = Await.result(http.bindAndHandle(route, "localhost", port), Duration.Inf)
bind
}

def shutdown():Unit = Await.result(system.terminate(), Duration.Inf)
def stop():Unit = Await.result(system.terminate(), Duration.Inf)
def block():Unit = Await.result(system.whenTerminated, Duration.Inf)
}

object SQSService {
def main(args: Array[String]) {
val sqs = new SQSService(8001, 1)
sqs.start()
sqs.block
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.findify.sqsmock.actions

import akka.actor.ActorSystem
import akka.event.slf4j.Logger
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import io.findify.sqsmock.messages.{DeleteMessageBatchResponse, ErrorResponse, SendMessageBatchResponse}
import io.findify.sqsmock.model.{DeleteMessageBatchEntry, MessageBatchEntry, QueueCache}

import scala.collection.mutable

/**
* Handle DeleteQueue request.
* @since May 14 2017.
*/
class DeleteMessageBatchWorker(account:Long, queues:mutable.Map[String,QueueCache], system:ActorSystem) extends Worker {

val log = Logger(this.getClass, "delete_message_batch_worker")

val fieldFormat = """DeleteMessageBatchRequestEntry\.([0-9]+)\.([0-9A-Za-z\.]+)""".r

def process(fields:Map[String,String]) = {
val result = for (
queueUrl <- fields.get("QueueUrl");
queue <- queues.get(queueUrl)
) yield {
val deletedMsgs = fields
.filter(_._1.startsWith("DeleteMessageBatchRequestEntry"))
.flatMap { case (key,value) => key match {
case fieldFormat(index, name) => Some((index.toInt, name, value))
case _ => None
}}
.groupBy(_._1)
.values.toList
.flatMap(DeleteMessageBatchEntry(_))
.filter { entry =>
log.debug(s"deleting message ${entry.id} from queue")
queue.delete(entry.receiptHandle)
}

HttpResponse(StatusCodes.OK, entity = DeleteMessageBatchResponse(deletedMsgs).toXML.toString())
}
result.getOrElse {
log.warn("cannot send message: possibly, some request parameter is missing")
HttpResponse(StatusCodes.BadRequest, entity = ErrorResponse("Sender", "InvalidParameterValue", "oops").toXML.toString())
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.findify.sqsmock.actions

import akka.actor.ActorSystem
import akka.event.slf4j.Logger
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import io.findify.sqsmock.messages.{DeleteQueueResponse, ErrorResponse, GetQueueUrlResponse}
import io.findify.sqsmock.model.{QueueCache, Queues}

import scala.collection.mutable

/**
* Worker to respond to DeleteQueue requests.
* @since May 14 2017
*
* @param account
* @param queues
* @param system
*/
class DeleteQueueUrlWorker(account:Long, queues:mutable.Map[String,QueueCache], system:ActorSystem) extends Worker {

val log = Logger(this.getClass, "get_queue_url_worker")
def process(fields:Map[String,String]) = {
val result = for (
queueUrl <- fields.get("QueueUrl")
) yield {
log.debug(s"Deleting queue '$queueUrl'")
queues.remove(queueUrl)
HttpResponse(StatusCodes.OK, entity = DeleteQueueResponse.toXML.toString)
}

result.getOrElse{
log.warn("cannot send message: possibly, some request parameter is missing")
HttpResponse(StatusCodes.BadRequest, entity = ErrorResponse("Sender", "InvalidParameterValue", "oops").toXML.toString())
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.findify.sqsmock.actions

import akka.actor.ActorSystem
import akka.event.slf4j.Logger
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import io.findify.sqsmock.messages.{ErrorResponse, GetQueueAttributesResponse, GetQueueUrlResponse}
import io.findify.sqsmock.model.{QueueCache, Queues}

import scala.collection.mutable

/**
* Worker to respond to GetQueueUrl requests.
* @since May 14 2017
*
* @param account
* @param queues
* @param system
*/
class GetQueueAttributesWorker(account:Long, queues:mutable.Map[String,QueueCache], system:ActorSystem) extends Worker {

val log = Logger(this.getClass, "get_queue_url_worker")
def process(fields:Map[String,String]) = {
// TODO Implement completely. See http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html

log.warn("Not implemented completely. Returning static result.")
HttpResponse(StatusCodes.OK, entity = GetQueueAttributesResponse(Map.empty).toXML.toString())
}
}
52 changes: 52 additions & 0 deletions src/main/scala/io/findify/sqsmock/actions/GetQueueUrlWorker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.findify.sqsmock.actions

import akka.actor.ActorSystem
import akka.event.slf4j.Logger
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import io.findify.sqsmock.messages.{ErrorResponse, GetQueueUrlResponse, ListQueuesResponse, ReceiveMessageResponse}
import io.findify.sqsmock.model.{QueueCache, Queues}

import scala.collection.mutable

/**
* Worker to respond to GetQueueUrl requests.
* @since May 14 2017
*
* @param account
* @param queues
* @param system
*/
class GetQueueUrlWorker(account:Long, queues:mutable.Map[String,QueueCache], system:ActorSystem) extends Worker {

val log = Logger(this.getClass, "get_queue_url_worker")
def process(fields:Map[String,String]) = {
val result = for (
queueName <- fields.get("QueueName")
) yield {
// optional field
val accountId = fields.get("QueueOwnerAWSAccountId")
val validAccount = accountId.map(_ == s"$account").getOrElse(true)

if (validAccount) {
// filter queue on name
Queues.queueWithName(queues.keys, queueName) match {
case Some(url) =>
log.debug(s"Found url $url for queue name $queueName")
HttpResponse(StatusCodes.OK, entity = GetQueueUrlResponse(Option(url)).toXML.toString())
case None =>
log.debug(s"No url found for queue name $queueName")
HttpResponse(StatusCodes.BadRequest, entity = ErrorResponse("Sender", "AWS.SimpleQueueService.NonExistentQueue", s"No queue with name '$queueName'").toXML.toString())
}
} else {
// return empty list
log.debug(s"Given account ${accountId.get} does not match current account $account")
HttpResponse(StatusCodes.OK, entity = GetQueueUrlResponse(None).toXML.toString())
}
}

result.getOrElse{
log.warn("cannot send message: possibly, some request parameter is missing")
HttpResponse(StatusCodes.BadRequest, entity = ErrorResponse("Sender", "InvalidParameterValue", "oops").toXML.toString())
}
}
}
31 changes: 31 additions & 0 deletions src/main/scala/io/findify/sqsmock/actions/ListQueuesWorker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.findify.sqsmock.actions

import akka.actor.ActorSystem
import akka.event.slf4j.Logger
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import io.findify.sqsmock.messages.ListQueuesResponse
import io.findify.sqsmock.model.{QueueCache, Queues}

import scala.collection.mutable

/**
* Worker to respond to ListQueue requests.
* @since May 13 2017
*
* @param account
* @param queues
* @param system
*/
class ListQueuesWorker(account:Long, queues:mutable.Map[String,QueueCache], system:ActorSystem) extends Worker {

val log = Logger(this.getClass, "list_queues_worker")
def process(fields:Map[String,String]) = {
// get queues with prefix or all queues
val queueNames = fields.get("QueueNamePrefix").map(
prefix => Queues.queuesWithPrefix(queues.keys, prefix)
).getOrElse(queues.keys.toList)

log.debug("listing queues: {}", queueNames.mkString(","))
HttpResponse(StatusCodes.OK, entity = ListQueuesResponse(queueNames).toXML.toString)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.findify.sqsmock.messages

import io.findify.sqsmock.model.{DeleteMessageBatchEntry, MessageBatchEntry}

/**
* Response to DeleteMessageBatch request.
* @see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html
* @since May 14 2017
* @param entries List of deleted messages
*/
case class DeleteMessageBatchResponse(entries:List[DeleteMessageBatchEntry]) extends Response {
def toXML =
<DeleteMessageBatchResponse>
<DeleteMessageBatchResult>
{
entries.map { entry =>
<DeleteMessageBatchResultEntry>
<Id>{ entry.id }</Id>
</DeleteMessageBatchResultEntry>
}
}
</DeleteMessageBatchResult>
<ResponseMetadata>
<RequestId>{uuid}</RequestId>
</ResponseMetadata>
</DeleteMessageBatchResponse>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.findify.sqsmock.messages

/**
* Response to DeleteQueue request.
* @see http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteQueue.html
* @since May 14 2017.
*/
case object DeleteQueueResponse extends Response {
def toXML =
<DeleteQueueResponse>
<ResponseMetadata>
<RequestId>{uuid}</RequestId>
</ResponseMetadata>
</DeleteQueueResponse>
}
Loading