Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions .env.dev.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ GEOSERVER_HOST=localhost
################################################################################
# PostgreSQL

DB_VERSION=pg17-postgis3.5.1
DB_VERSION=pg17-postgis3.6.1-timescale2.24.0
DB_DATA_VOLUME_NAME=db-data-17
POSTGRES_PORT=5432
POSTGRES_USER=postgres
Expand Down Expand Up @@ -163,6 +163,15 @@ MONITORENV_BATCH_SIZE=30
################################################################################
# KAFKA

MONITORENV_KAFKA_AIS_ENABLED=false
MONITORENV_KAFKA_AIS_SERVERS=localhost:9092
MONITORENV_KAFKA_AIS_ENABLED=true
MONITORENV_KAFKA_AIS_PRODUCER_ENABLED=false
MONITORENV_KAFKA_AIS_SERVERS=localhost:9093
MONITORENV_KAFKA_AIS_GROUP_ID=test
MONITORENV_KAFKA_AIS_TRUSTSTORE=${PWD}/infra/kafka/certs/monitorenv/monitorenv-truststore.jks
MONITORENV_KAFKA_AIS_KEYSTORE=${PWD}/infra/kafka/certs/monitorenv/monitorenv.p12
MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=changeit
MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=changeit
MONITORENV_KAFKA_AIS_KEY_PASSWORD=changeit
MONITORENV_KAFKA_AIS_BATCH_SIZE=100
MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position
MONITORENV_KAFKA_AIS_TIMEOUT=30000
8 changes: 8 additions & 0 deletions .env.infra.example
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,11 @@ MONITORENV_BATCH_SIZE=100
MONITORENV_KAFKA_AIS_ENABLED=
MONITORENV_KAFKA_AIS_SERVERS=
MONITORENV_KAFKA_AIS_GROUP_ID=
MONITORENV_KAFKA_AIS_TRUSTSTORE=
MONITORENV_KAFKA_AIS_KEYSTORE=
MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=
MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=
MONITORENV_KAFKA_AIS_KEY_PASSWORD=
MONITORENV_KAFKA_AIS_BATCH_SIZE=100
MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position
MONITORENV_KAFKA_AIS_TIMEOUT=30000
17 changes: 13 additions & 4 deletions .env.test.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ MONITORENV_JVM_OPTIONS=
################################################################################
# PostgreSQL

DB_VERSION=pg17-postgis3.5.1
DB_VERSION=pg17-postgis3.6.1-timescale2.24.0
DB_DATA_VOLUME_NAME=db-data-17
POSTGRES_PORT=5432
POSTGRES_USER=postgres
Expand Down Expand Up @@ -157,6 +157,15 @@ MONITORENV_BATCH_SIZE=100
################################################################################
# KAFKA

MONITORENV_KAFKA_AIS_ENABLED=true
MONITORENV_KAFKA_AIS_SERVERS=localhost:9092
MONITORENV_KAFKA_AIS_GROUP_ID=test
MONITORENV_KAFKA_AIS_ENABLED=
MONITORENV_KAFKA_AIS_PRODUCER_ENABLED=
MONITORENV_KAFKA_AIS_SERVERS=
MONITORENV_KAFKA_AIS_GROUP_ID=
MONITORENV_KAFKA_AIS_TRUSTSTORE=
MONITORENV_KAFKA_AIS_KEYSTORE=
MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=
MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=
MONITORENV_KAFKA_AIS_KEY_PASSWORD=
MONITORENV_KAFKA_AIS_BATCH_SIZE=100
MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position
MONITORENV_KAFKA_AIS_TIMEOUT=30000
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
BACKEND_CONFIGURATION_FOLDER=$(shell pwd)/infra/configurations/backend/
HOST_MIGRATIONS_FOLDER=$(shell pwd)/backend/src/main/resources/db/migration
PIPELINE_TEST_ENV_FILE=$(shell pwd)/pipeline/.env.test
PWD=$(shell pwd)

ifneq (,$(wildcard .env))
include .env
Expand Down
3 changes: 2 additions & 1 deletion backend/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ tasks.withType<KotlinCompile> {
}

val ktorVersion = "3.3.2"
val testcontainersVersion = "1.21.3"
val testcontainersVersion = "1.21.4"
val sentryVersion = "8.26.0"
val flywayVersion = "11.17.0"

Expand Down Expand Up @@ -137,6 +137,7 @@ dependencies {
testImplementation("org.testcontainers:testcontainers:$testcontainersVersion")
testImplementation("org.testcontainers:postgresql:$testcontainersVersion")
testImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion")
testImplementation("org.testcontainers:kafka:$testcontainersVersion")
testImplementation("io.ktor:ktor-client-mock:$ktorVersion")
testImplementation("jakarta.servlet:jakarta.servlet-api:6.1.0")
testImplementation("com.squareup.okhttp3:mockwebserver:5.3.0")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fr.gouv.cacem.monitorenv.config

import org.apache.kafka.clients.consumer.ConsumerConfig
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
Expand All @@ -10,6 +10,7 @@ import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer

@EnableKafka
@Configuration
Expand All @@ -18,31 +19,17 @@ class KafkaAISConsumerConfig(
val kafkaProperties: KafkaProperties,
) {
@Bean
fun consumerFactory(): ConsumerFactory<String?, String?> {
val props: MutableMap<String?, Any?> = HashMap()
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaProperties.bootstrapServers,
fun consumerFactory(): ConsumerFactory<String?, AISPayload?> =
DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(),
StringDeserializer(),
JsonDeserializer<AISPayload>().apply { addTrustedPackages("*") },
)
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
kafkaProperties.consumer.groupId,
)
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java,
)
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer::class.java,
)
return DefaultKafkaConsumerFactory<String?, String?>(props)
}

@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String?, String?> {
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String?, AISPayload?> {
val factory =
ConcurrentKafkaListenerContainerFactory<String?, String?>()
ConcurrentKafkaListenerContainerFactory<String?, AISPayload?>()
factory.consumerFactory = consumerFactory()
return factory
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package fr.gouv.cacem.monitorenv.config

import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer

/**
* ⚠ For dev and testing purpose only ⚠. Do not use it for production
*/
@Configuration
@ConditionalOnProperty(
value = ["monitorenv.kafka.ais.enabled", "monitorenv.kafka.ais.producer.enabled"],
havingValue = "true",
)
class KafkaAISProducerConfig(
private val kafkaProperties: KafkaProperties,
) {
@Bean
fun producerFactory(): ProducerFactory<String, AISPayload> {
val props = kafkaProperties.buildProducerProperties()

return DefaultKafkaProducerFactory(
props,
{ StringSerializer() },
{ JsonSerializer() },
)
}

@Bean
fun kafkaTemplate(): KafkaTemplate<String, AISPayload> = KafkaTemplate(producerFactory())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package fr.gouv.cacem.monitorenv.config

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.stereotype.Component

@Component
@ConfigurationProperties(prefix = "monitorenv.kafka.ais")
class KafkaAISProperties(
var topic: String = "monitorenv.ais.position",
var timeout: Long = 30000L,
var batchSize: Int = 100,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package fr.gouv.cacem.monitorenv.infrastructure.database.model

import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
import jakarta.persistence.Embeddable
import jakarta.persistence.EmbeddedId
import jakarta.persistence.Entity
import jakarta.persistence.Table
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.io.WKTReader
import org.n52.jackson.datatype.jts.GeometryDeserializer
import org.n52.jackson.datatype.jts.GeometrySerializer
import java.io.Serializable
import java.time.ZonedDateTime
import kotlin.math.roundToInt

@Entity
@Table(name = "ais_positions")
data class AISPositionModel(
@EmbeddedId
val id: AISPositionPK,
@JsonSerialize(using = GeometrySerializer::class)
@JsonDeserialize(contentUsing = GeometryDeserializer::class)
val coord: Geometry?,
val status: String?,
val course: Short?,
val heading: Short?,
val speed: Short?,
) {
companion object {
fun toAISPositionModel(aisPosition: AISPayload): AISPositionModel =
AISPositionModel(
id = AISPositionPK(mmsi = aisPosition.mmsi, ts = aisPosition.ts),
coord = aisPosition.coord.let { WKTReader().read(it) },
status = aisPosition.status,
course = aisPosition.course?.let { (it * 100).roundToInt().toShort() },
speed = aisPosition.speed?.let { (it * 100).roundToInt().toShort() },
heading = aisPosition.heading?.let { (it * 100).roundToInt().toShort() },
)
}
}

@Embeddable
data class AISPositionPK(
val ts: ZonedDateTime?,
val mmsi: Int?,
) : Serializable
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package fr.gouv.cacem.monitorenv.infrastructure.database.repositories

import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionModel
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces.IDBAISPositionRepository
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
import jakarta.transaction.Transactional
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Repository

@Repository
class JpaAISPositionRepository(
private val dbAISPositionRepository: IDBAISPositionRepository,
) {
private val logger: Logger = LoggerFactory.getLogger(JpaAISPositionRepository::class.java)

@Transactional
fun save(aisPosition: AISPayload) {
dbAISPositionRepository.save(AISPositionModel.toAISPositionModel(aisPosition))
}

@Transactional
fun saveAll(aisPositions: List<AISPayload>) {
dbAISPositionRepository.saveAll(aisPositions.map { AISPositionModel.toAISPositionModel(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces

import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionModel
import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionPK
import org.springframework.data.jpa.repository.JpaRepository

interface IDBAISPositionRepository : JpaRepository<AISPositionModel, AISPositionPK>
Original file line number Diff line number Diff line change
@@ -1,20 +1,67 @@
package fr.gouv.cacem.monitorenv.infrastructure.kafka

import fr.gouv.cacem.monitorenv.config.KafkaAISProperties
import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.JpaAISPositionRepository
import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread

@Component
@ConditionalOnProperty(value = ["monitorenv.kafka.ais.enabled"], havingValue = "true")
class AISListener {
class AISListener(
private val jpaAISPositionRepository: JpaAISPositionRepository,
private val kafkaAISProperties: KafkaAISProperties,
) {
private val logger = LoggerFactory.getLogger(AISListener::class.java)

val messages = mutableListOf<String>()
private val queue = LinkedBlockingQueue<AISPayload>()

@KafkaListener(topics = ["ais"])
fun listenAIS(message: String) {
messages.add(message)
logger.info("AIS received message: $message")
@KafkaListener(topics = ["\${monitorenv.kafka.ais.topic:monitorenv.ais.position}"])
fun listenAIS(payload: AISPayload) {
queue.put(payload)
}

@PostConstruct
fun startBatching() {
thread(isDaemon = true, name = "ais-batch-thread") {
val batchAisPayloadToSave = mutableListOf<AISPayload>()

while (!Thread.currentThread().isInterrupted) {
try {
val first = queue.take()
batchAisPayloadToSave.add(first)

val deadline = System.currentTimeMillis() + kafkaAISProperties.timeout

while (batchAisPayloadToSave.size < kafkaAISProperties.batchSize) {
val remainingTime = deadline - System.currentTimeMillis()
if (remainingTime <= 0) break

val aisPayload = queue.poll(remainingTime, TimeUnit.MILLISECONDS)
if (aisPayload != null) {
batchAisPayloadToSave.add(aisPayload)
}
}

jpaAISPositionRepository.saveAll(batchAisPayloadToSave)
} catch (ex: Exception) {
logger.error("Could not save AIS batch", ex)
} finally {
batchAisPayloadToSave.clear()
}
}
}
}

@PreDestroy
fun shutdown() {
logger.info("Shutting down AISListener batch thread")
}
}
Loading
Loading