Skip to content

Commit

Permalink
Use rate limiter for CloudWatch logs
Browse files Browse the repository at this point in the history
  • Loading branch information
orbang committed Jan 31, 2024
1 parent 3e738c5 commit faf3360
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ lazy val CommonsIoVersion = "2.8.0"
lazy val CommonsLangVersion = "3.12.0"
lazy val CloudformationTemplateGeneratorVersion = "3.10.4"
lazy val SclapVersion = "1.5.1"
lazy val SprintVersion = "0.3.2"
lazy val SprintVersion = "0.3.5"
lazy val CirceVersion = "0.14.1"
lazy val SprayJsonVersion = "1.3.6"
lazy val PulsarVersion = "2.10.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.amazonaws.services.logs.model.LogStream
import com.amazonaws.services.logs.model.OrderBy
import com.amazonaws.services.logs.model.OutputLogEvent
import io.jobial.sprint.util.CatsUtils
import io.jobial.sprint.util.RateLimiter

import java.lang.System.currentTimeMillis
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -99,11 +100,12 @@ trait CloudWatchLogsClient[F[_]] extends AwsClient[F] with CatsUtils[F] {
)(f: FilteredLogEvent => F[T])(implicit awsContext: AwsContext, concurrent: Concurrent[F], timer: Timer[F]): F[Unit] =
forLogEventSequences(group, stream, from, to, filterPattern, delay, seen)(_.map(f).sequence)

def streams(from: Long, groupLimit: Int = 1000, streamLimit: Int = 10)(implicit awsContext: AwsContext, concurrent: Concurrent[F], parallel: Parallel[F]) =
def streams(from: Long, groupLimit: Int = 1000, streamLimit: Int = 10)(implicit awsContext: AwsContext, concurrent: Concurrent[F], parallel: Parallel[F], timer: Timer[F]) =
for {
rateLimiter <- RateLimiter(5)
groups <- describeLogGroups(groupLimit)
groupsAndStreams <- groups.map { g =>
describeLogStreams(g.getLogGroupName, streamLimit).map(s => g.getLogGroupName -> s.take(1))
rateLimiter(describeLogStreams(g.getLogGroupName, streamLimit)).map(s => g.getLogGroupName -> s.take(1))
}.parSequence
r = for {
(group, streams) <- groupsAndStreams if streams.headOption.flatMap(s => Option(s.getLastEventTimestamp).map(_ > from)).getOrElse(false)
Expand All @@ -118,7 +120,7 @@ trait CloudWatchLogsClient[F[_]] extends AwsClient[F] with CatsUtils[F] {
} yield r

def forGroupOrStream[T](groupOrStream: String, stream: Option[String], from: Long, to: Option[Long] = None)(f: (String, Option[String]) => F[T])
(implicit awsContext: AwsContext, concurrent: Concurrent[F], parallel: Parallel[F]) = {
(implicit awsContext: AwsContext, concurrent: Concurrent[F], parallel: Parallel[F], timer: Timer[F]) = {
val streamFilter = stream match {
case Some(stream) =>
stream
Expand Down

0 comments on commit faf3360

Please sign in to comment.