diff --git a/.env.dev.defaults b/.env.dev.defaults index 2b37bbc354..a4dac6d29a 100644 --- a/.env.dev.defaults +++ b/.env.dev.defaults @@ -73,7 +73,7 @@ GEOSERVER_HOST=localhost ################################################################################ # PostgreSQL -DB_VERSION=pg17-postgis3.5.1 +DB_VERSION=pg17-postgis3.6.1-timescale2.24.0 DB_DATA_VOLUME_NAME=db-data-17 POSTGRES_PORT=5432 POSTGRES_USER=postgres @@ -163,6 +163,15 @@ MONITORENV_BATCH_SIZE=30 ################################################################################ # KAFKA -MONITORENV_KAFKA_AIS_ENABLED=false -MONITORENV_KAFKA_AIS_SERVERS=localhost:9092 +MONITORENV_KAFKA_AIS_ENABLED=true +MONITORENV_KAFKA_AIS_PRODUCER_ENABLED=false +MONITORENV_KAFKA_AIS_SERVERS=localhost:9093 MONITORENV_KAFKA_AIS_GROUP_ID=test +MONITORENV_KAFKA_AIS_TRUSTSTORE=${PWD}/infra/kafka/certs/monitorenv/monitorenv-truststore.jks +MONITORENV_KAFKA_AIS_KEYSTORE=${PWD}/infra/kafka/certs/monitorenv/monitorenv.p12 +MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=changeit +MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=changeit +MONITORENV_KAFKA_AIS_KEY_PASSWORD=changeit +MONITORENV_KAFKA_AIS_BATCH_SIZE=100 +MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position +MONITORENV_KAFKA_AIS_TIMEOUT=30000 diff --git a/.env.infra.example b/.env.infra.example index 904376954e..cd08d47f8f 100644 --- a/.env.infra.example +++ b/.env.infra.example @@ -160,3 +160,11 @@ MONITORENV_BATCH_SIZE=100 MONITORENV_KAFKA_AIS_ENABLED= MONITORENV_KAFKA_AIS_SERVERS= MONITORENV_KAFKA_AIS_GROUP_ID= +MONITORENV_KAFKA_AIS_TRUSTSTORE= +MONITORENV_KAFKA_AIS_KEYSTORE= +MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD= +MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD= +MONITORENV_KAFKA_AIS_KEY_PASSWORD= +MONITORENV_KAFKA_AIS_BATCH_SIZE=100 +MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position +MONITORENV_KAFKA_AIS_TIMEOUT=30000 diff --git a/.env.test.defaults b/.env.test.defaults index 85c109009d..207b6fab99 100644 --- a/.env.test.defaults +++ b/.env.test.defaults @@ -61,7 +61,7 @@ MONITORENV_JVM_OPTIONS= ################################################################################ # PostgreSQL -DB_VERSION=pg17-postgis3.5.1 +DB_VERSION=pg17-postgis3.6.1-timescale2.24.0 DB_DATA_VOLUME_NAME=db-data-17 POSTGRES_PORT=5432 POSTGRES_USER=postgres @@ -157,6 +157,15 @@ MONITORENV_BATCH_SIZE=100 ################################################################################ # KAFKA -MONITORENV_KAFKA_AIS_ENABLED=true -MONITORENV_KAFKA_AIS_SERVERS=localhost:9092 -MONITORENV_KAFKA_AIS_GROUP_ID=test +MONITORENV_KAFKA_AIS_ENABLED= +MONITORENV_KAFKA_AIS_PRODUCER_ENABLED= +MONITORENV_KAFKA_AIS_SERVERS= +MONITORENV_KAFKA_AIS_GROUP_ID= +MONITORENV_KAFKA_AIS_TRUSTSTORE= +MONITORENV_KAFKA_AIS_KEYSTORE= +MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD= +MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD= +MONITORENV_KAFKA_AIS_KEY_PASSWORD= +MONITORENV_KAFKA_AIS_BATCH_SIZE=100 +MONITORENV_KAFKA_AIS_TOPIC=monitorenv.ais.position +MONITORENV_KAFKA_AIS_TIMEOUT=30000 diff --git a/Makefile b/Makefile index 0da4ca9438..90601cef1f 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ BACKEND_CONFIGURATION_FOLDER=$(shell pwd)/infra/configurations/backend/ HOST_MIGRATIONS_FOLDER=$(shell pwd)/backend/src/main/resources/db/migration PIPELINE_TEST_ENV_FILE=$(shell pwd)/pipeline/.env.test +PWD=$(shell pwd) ifneq (,$(wildcard .env)) include .env diff --git a/backend/build.gradle.kts b/backend/build.gradle.kts index e8bc518af1..b00836cae5 100644 --- a/backend/build.gradle.kts +++ b/backend/build.gradle.kts @@ -61,7 +61,7 @@ tasks.withType { } val ktorVersion = "3.3.2" -val testcontainersVersion = "1.21.3" +val testcontainersVersion = "1.21.4" val sentryVersion = "8.26.0" val flywayVersion = "11.17.0" @@ -137,6 +137,7 @@ dependencies { testImplementation("org.testcontainers:testcontainers:$testcontainersVersion") testImplementation("org.testcontainers:postgresql:$testcontainersVersion") testImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion") + testImplementation("org.testcontainers:kafka:$testcontainersVersion") testImplementation("io.ktor:ktor-client-mock:$ktorVersion") testImplementation("jakarta.servlet:jakarta.servlet-api:6.1.0") testImplementation("com.squareup.okhttp3:mockwebserver:5.3.0") diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISConsumerConfig.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISConsumerConfig.kt index f8069c0406..6737a59c93 100644 --- a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISConsumerConfig.kt +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISConsumerConfig.kt @@ -1,6 +1,6 @@ package fr.gouv.cacem.monitorenv.config -import org.apache.kafka.clients.consumer.ConsumerConfig +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.autoconfigure.kafka.KafkaProperties @@ -10,6 +10,7 @@ import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer @EnableKafka @Configuration @@ -18,31 +19,17 @@ class KafkaAISConsumerConfig( val kafkaProperties: KafkaProperties, ) { @Bean - fun consumerFactory(): ConsumerFactory { - val props: MutableMap = HashMap() - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - kafkaProperties.bootstrapServers, + fun consumerFactory(): ConsumerFactory = + DefaultKafkaConsumerFactory( + kafkaProperties.buildConsumerProperties(), + StringDeserializer(), + JsonDeserializer().apply { addTrustedPackages("*") }, ) - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - kafkaProperties.consumer.groupId, - ) - props.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer::class.java, - ) - props.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer::class.java, - ) - return DefaultKafkaConsumerFactory(props) - } @Bean - fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { val factory = - ConcurrentKafkaListenerContainerFactory() + ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = consumerFactory() return factory } diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISProducerConfig.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISProducerConfig.kt new file mode 100644 index 0000000000..c5d154eaf0 --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISProducerConfig.kt @@ -0,0 +1,38 @@ +package fr.gouv.cacem.monitorenv.config + +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +/** + * ⚠ For dev and testing purpose only ⚠. Do not use it for production + */ +@Configuration +@ConditionalOnProperty( + value = ["monitorenv.kafka.ais.enabled", "monitorenv.kafka.ais.producer.enabled"], + havingValue = "true", +) +class KafkaAISProducerConfig( + private val kafkaProperties: KafkaProperties, +) { + @Bean + fun producerFactory(): ProducerFactory { + val props = kafkaProperties.buildProducerProperties() + + return DefaultKafkaProducerFactory( + props, + { StringSerializer() }, + { JsonSerializer() }, + ) + } + + @Bean + fun kafkaTemplate(): KafkaTemplate = KafkaTemplate(producerFactory()) +} diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISProperties.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISProperties.kt new file mode 100644 index 0000000000..ebc61260ad --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/config/KafkaAISProperties.kt @@ -0,0 +1,12 @@ +package fr.gouv.cacem.monitorenv.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component + +@Component +@ConfigurationProperties(prefix = "monitorenv.kafka.ais") +class KafkaAISProperties( + var topic: String = "monitorenv.ais.position", + var timeout: Long = 30000L, + var batchSize: Int = 100, +) diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/model/AISPositionModel.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/model/AISPositionModel.kt new file mode 100644 index 0000000000..6b21e6a5b8 --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/model/AISPositionModel.kt @@ -0,0 +1,48 @@ +package fr.gouv.cacem.monitorenv.infrastructure.database.model + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.annotation.JsonSerialize +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload +import jakarta.persistence.Embeddable +import jakarta.persistence.EmbeddedId +import jakarta.persistence.Entity +import jakarta.persistence.Table +import org.locationtech.jts.geom.Geometry +import org.locationtech.jts.io.WKTReader +import org.n52.jackson.datatype.jts.GeometryDeserializer +import org.n52.jackson.datatype.jts.GeometrySerializer +import java.io.Serializable +import java.time.ZonedDateTime +import kotlin.math.roundToInt + +@Entity +@Table(name = "ais_positions") +data class AISPositionModel( + @EmbeddedId + val id: AISPositionPK, + @JsonSerialize(using = GeometrySerializer::class) + @JsonDeserialize(contentUsing = GeometryDeserializer::class) + val coord: Geometry?, + val status: String?, + val course: Short?, + val heading: Short?, + val speed: Short?, +) { + companion object { + fun toAISPositionModel(aisPosition: AISPayload): AISPositionModel = + AISPositionModel( + id = AISPositionPK(mmsi = aisPosition.mmsi, ts = aisPosition.ts), + coord = aisPosition.coord.let { WKTReader().read(it) }, + status = aisPosition.status, + course = aisPosition.course?.let { (it * 100).roundToInt().toShort() }, + speed = aisPosition.speed?.let { (it * 100).roundToInt().toShort() }, + heading = aisPosition.heading?.let { (it * 100).roundToInt().toShort() }, + ) + } +} + +@Embeddable +data class AISPositionPK( + val ts: ZonedDateTime?, + val mmsi: Int?, +) : Serializable diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/JpaAISPositionRepository.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/JpaAISPositionRepository.kt new file mode 100644 index 0000000000..9ba43fc0cd --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/JpaAISPositionRepository.kt @@ -0,0 +1,26 @@ +package fr.gouv.cacem.monitorenv.infrastructure.database.repositories + +import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionModel +import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces.IDBAISPositionRepository +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload +import jakarta.transaction.Transactional +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Repository + +@Repository +class JpaAISPositionRepository( + private val dbAISPositionRepository: IDBAISPositionRepository, +) { + private val logger: Logger = LoggerFactory.getLogger(JpaAISPositionRepository::class.java) + + @Transactional + fun save(aisPosition: AISPayload) { + dbAISPositionRepository.save(AISPositionModel.toAISPositionModel(aisPosition)) + } + + @Transactional + fun saveAll(aisPositions: List) { + dbAISPositionRepository.saveAll(aisPositions.map { AISPositionModel.toAISPositionModel(it) }) + } +} diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/interfaces/IDBAISPositionRepository.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/interfaces/IDBAISPositionRepository.kt new file mode 100644 index 0000000000..bca99331ce --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/interfaces/IDBAISPositionRepository.kt @@ -0,0 +1,7 @@ +package fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces + +import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionModel +import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionPK +import org.springframework.data.jpa.repository.JpaRepository + +interface IDBAISPositionRepository : JpaRepository diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListener.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListener.kt index 047133a5ff..500d3fba31 100644 --- a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListener.kt +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListener.kt @@ -1,20 +1,67 @@ package fr.gouv.cacem.monitorenv.infrastructure.kafka +import fr.gouv.cacem.monitorenv.config.KafkaAISProperties +import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.JpaAISPositionRepository +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload +import jakarta.annotation.PostConstruct +import jakarta.annotation.PreDestroy import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.kafka.annotation.KafkaListener import org.springframework.stereotype.Component +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import kotlin.concurrent.thread @Component @ConditionalOnProperty(value = ["monitorenv.kafka.ais.enabled"], havingValue = "true") -class AISListener { +class AISListener( + private val jpaAISPositionRepository: JpaAISPositionRepository, + private val kafkaAISProperties: KafkaAISProperties, +) { private val logger = LoggerFactory.getLogger(AISListener::class.java) - val messages = mutableListOf() + private val queue = LinkedBlockingQueue() - @KafkaListener(topics = ["ais"]) - fun listenAIS(message: String) { - messages.add(message) - logger.info("AIS received message: $message") + @KafkaListener(topics = ["\${monitorenv.kafka.ais.topic:monitorenv.ais.position}"]) + fun listenAIS(payload: AISPayload) { + queue.put(payload) + } + + @PostConstruct + fun startBatching() { + thread(isDaemon = true, name = "ais-batch-thread") { + val batchAisPayloadToSave = mutableListOf() + + while (!Thread.currentThread().isInterrupted) { + try { + val first = queue.take() + batchAisPayloadToSave.add(first) + + val deadline = System.currentTimeMillis() + kafkaAISProperties.timeout + + while (batchAisPayloadToSave.size < kafkaAISProperties.batchSize) { + val remainingTime = deadline - System.currentTimeMillis() + if (remainingTime <= 0) break + + val aisPayload = queue.poll(remainingTime, TimeUnit.MILLISECONDS) + if (aisPayload != null) { + batchAisPayloadToSave.add(aisPayload) + } + } + + jpaAISPositionRepository.saveAll(batchAisPayloadToSave) + } catch (ex: Exception) { + logger.error("Could not save AIS batch", ex) + } finally { + batchAisPayloadToSave.clear() + } + } + } + } + + @PreDestroy + fun shutdown() { + logger.info("Shutting down AISListener batch thread") } } diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISProducer.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISProducer.kt new file mode 100644 index 0000000000..98124477a8 --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISProducer.kt @@ -0,0 +1,57 @@ +package fr.gouv.cacem.monitorenv.infrastructure.kafka + +import fr.gouv.cacem.monitorenv.config.KafkaAISProperties +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import java.time.ZonedDateTime +import java.util.UUID +import kotlin.random.Random + +/** + * ⚠ For dev and testing purpose only ⚠. Do not use it for production. + */ +@Component +@ConditionalOnProperty( + value = ["monitorenv.kafka.ais.enabled", "monitorenv.kafka.ais.producer.enabled"], + havingValue = "true", +) +class AISProducer( + private val kafkaTemplate: KafkaTemplate, + private val kafkaAISProperties: KafkaAISProperties, +) { + private val logger = LoggerFactory.getLogger(AISProducer::class.java) + + companion object { + fun generateRandomPoint(): String { + val longitude: Double = Random.nextDouble() * 360 - 180 // -180 à 180 + val latitude: Double = Random.nextDouble() * 180 - 90 // -90 à 90 + return "POINT($longitude $latitude)" + } + } + + @Scheduled(fixedRate = 15000) + fun sendMessage() { + try { + logger.info("Sending AIS positions...") + kafkaTemplate.send( + kafkaAISProperties.topic, + AISPayload( + mmsi = Random.nextInt(0, 999999999), + coord = generateRandomPoint(), + status = UUID.randomUUID().toString(), + course = Random.nextDouble(), + heading = Random.nextDouble(), + speed = Random.nextDouble(), + ts = ZonedDateTime.now(), + ), + ) + } catch (ex: Exception) { + logger.error(ex.message) + throw ex + } + } +} diff --git a/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/adapters/AISPayload.kt b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/adapters/AISPayload.kt new file mode 100644 index 0000000000..31bc1c2dbd --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/adapters/AISPayload.kt @@ -0,0 +1,13 @@ +package fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters + +import java.time.ZonedDateTime + +class AISPayload( + val mmsi: Int?, + val coord: String?, + val status: String?, + val course: Double?, + val heading: Double?, + val speed: Double?, + val ts: ZonedDateTime?, +) diff --git a/backend/src/main/resources/application.yml b/backend/src/main/resources/application.yml index cb31d66541..9241aabcba 100644 --- a/backend/src/main/resources/application.yml +++ b/backend/src/main/resources/application.yml @@ -11,49 +11,55 @@ host: ip: ${MONITORENV_URL} monitorenv: - kafka: - ais: - enabled: ${MONITORENV_KAFKA_AIS_ENABLED} - ajp: - port: 8000 - version: ${VERSION} - sentry: - enabled: ${MONITORENV_SENTRY_ENABLED} - environment: ${ENVIRONMENT} - dsn: ${SENTRY_DSN} - oidc: - enabled: ${MONITORENV_OIDC_ENABLED} - bypassEmailDomainsFilter: ${MONITORENV_OIDC_BYPASS_DOMAINS_FILTER:false} - clientId: ${MONITORENV_OIDC_CLIENT_ID:} - clientSecret: ${MONITORENV_OIDC_CLIENT_SECRET:} - redirectUri: ${MONITORENV_OIDC_REDIRECT_URI:} - loginUrl: ${MONITORENV_OIDC_LOGIN_URL} - successUrl: ${MONITORENV_OIDC_SUCCESS_URL} - errorUrl: ${MONITORENV_OIDC_ERROR_URL} - authorizedEmailDomains: ${MONITORENV_OIDC_AUTHORIZED_EMAIL_DOMAINS:} - issuerUri: ${MONITORENV_OIDC_ISSUER_URI:} - issuerUriExternal: ${MONITORENV_OIDC_ISSUER_URI_EXTERNAL} - authorizationUri: ${MONITORENV_OIDC_AUTHORIZATION_URI:} - tokenUri: ${MONITORENV_OIDC_TOKEN_URI:} - userInfoUri: ${MONITORENV_OIDC_USER_INFO_URI:} - jwkSetUri: ${MONITORENV_OIDC_JWK_SET_URI:} - proxyUrl: ${MONITORENV_OIDC_PROXY_URL:} - api: - protected: - paths: /bff/v1/* - super-user-paths: /bff/v1/missions/*,/bff/v1/reportings/*,/bff/v1/semaphores/*,/bff/v1/stations/* - public-paths: /api/v1/authorization/management/* - api-key: ${MONITORENV_API_KEY} - brief: - templatePath: /template_export_brief.docx - tmpDocxPath: tmp_brief.docx - tmpOdtPath: tmp_brief.odt - ext: - id: ${MONITORENV_EXT_ID} - password: ${MONITORENV_EXT_PASSWORD} - legicem: - id: ${MONITORENV_LEGICEM_ID} - password: ${MONITORENV_LEGICEM_PASSWORD} + kafka: + ais: + batch-size: ${MONITORENV_KAFKA_AIS_BATCH_SIZE} + topic: ${MONITORENV_KAFKA_AIS_TOPIC} + timeout: ${MONITORENV_KAFKA_AIS_TIMEOUT} + enabled: ${MONITORENV_KAFKA_AIS_ENABLED} + producer: + enabled: ${MONITORENV_KAFKA_AIS_PRODUCER_ENABLED:false} + + ajp: + port: 8000 + version: ${VERSION} + sentry: + enabled: ${MONITORENV_SENTRY_ENABLED} + environment: ${ENVIRONMENT} + dsn: ${SENTRY_DSN} + oidc: + enabled: ${MONITORENV_OIDC_ENABLED} + bypassEmailDomainsFilter: ${MONITORENV_OIDC_BYPASS_DOMAINS_FILTER:false} + clientId: ${MONITORENV_OIDC_CLIENT_ID:} + clientSecret: ${MONITORENV_OIDC_CLIENT_SECRET:} + redirectUri: ${MONITORENV_OIDC_REDIRECT_URI:} + loginUrl: ${MONITORENV_OIDC_LOGIN_URL} + successUrl: ${MONITORENV_OIDC_SUCCESS_URL} + errorUrl: ${MONITORENV_OIDC_ERROR_URL} + authorizedEmailDomains: ${MONITORENV_OIDC_AUTHORIZED_EMAIL_DOMAINS:} + issuerUri: ${MONITORENV_OIDC_ISSUER_URI:} + issuerUriExternal: ${MONITORENV_OIDC_ISSUER_URI_EXTERNAL} + authorizationUri: ${MONITORENV_OIDC_AUTHORIZATION_URI:} + tokenUri: ${MONITORENV_OIDC_TOKEN_URI:} + userInfoUri: ${MONITORENV_OIDC_USER_INFO_URI:} + jwkSetUri: ${MONITORENV_OIDC_JWK_SET_URI:} + proxyUrl: ${MONITORENV_OIDC_PROXY_URL:} + api: + protected: + paths: /bff/v1/* + super-user-paths: /bff/v1/missions/*,/bff/v1/reportings/*,/bff/v1/semaphores/*,/bff/v1/stations/* + public-paths: /api/v1/authorization/management/* + api-key: ${MONITORENV_API_KEY} + brief: + templatePath: /template_export_brief.docx + tmpDocxPath: tmp_brief.docx + tmpOdtPath: tmp_brief.odt + ext: + id: ${MONITORENV_EXT_ID} + password: ${MONITORENV_EXT_PASSWORD} + legicem: + id: ${MONITORENV_LEGICEM_ID} + password: ${MONITORENV_LEGICEM_PASSWORD} spring: jmx: @@ -87,6 +93,18 @@ spring: group-id: ${MONITORENV_KAFKA_AIS_GROUP_ID:test} auto-offset-reset: latest + properties: + security.protocol: SSL + + ssl.keystore.location: ${MONITORENV_KAFKA_AIS_KEYSTORE} + ssl.keystore.password: ${MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD} + ssl.key.password: ${MONITORENV_KAFKA_AIS_KEY_PASSWORD} + ssl.keystore.type: PKCS12 + + ssl.truststore.location: ${MONITORENV_KAFKA_AIS_TRUSTSTORE} + ssl.truststore.password: ${MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD} + ssl.truststore.type: JKS + management: endpoints: web: diff --git a/backend/src/main/resources/db/migration/internal/V0.202__create_table_ais_positions.sql b/backend/src/main/resources/db/migration/internal/V0.202__create_table_ais_positions.sql new file mode 100644 index 0000000000..82983c349b --- /dev/null +++ b/backend/src/main/resources/db/migration/internal/V0.202__create_table_ais_positions.sql @@ -0,0 +1,16 @@ +CREATE EXTENSION IF NOT EXISTS timescaledb; + +CREATE TABLE public.ais_positions +( + id SERIAL, + mmsi INT, + coord GEOMETRY, + status TEXT, + course SMALLINT, + heading SMALLINT, + speed SMALLINT, + ts TIMESTAMPTZ, + PRIMARY KEY (mmsi, ts) +); + +SELECT create_hypertable('ais_positions', by_range('ts', INTERVAL '1 day'), if_not_exists => TRUE); diff --git a/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractDBTests.kt b/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractDBTests.kt index b2163836a7..95a9b1ee61 100644 --- a/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractDBTests.kt +++ b/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractDBTests.kt @@ -2,7 +2,6 @@ package fr.gouv.cacem.monitorenv.infrastructure.database.repositories import fr.gouv.cacem.monitorenv.MonitorEnvApplication import org.springframework.boot.test.context.SpringBootTest -import org.springframework.kafka.test.context.EmbeddedKafka import org.springframework.test.context.DynamicPropertyRegistry import org.springframework.test.context.DynamicPropertySource import org.testcontainers.containers.GenericContainer @@ -14,7 +13,6 @@ import org.testcontainers.junit.jupiter.Testcontainers import java.time.Duration import java.time.temporal.ChronoUnit -@EmbeddedKafka(topics = ["ais"]) @Testcontainers @SpringBootTest( classes = [MonitorEnvApplication::class], @@ -25,7 +23,7 @@ abstract class AbstractDBTests { @JvmStatic val container: GenericContainer = GenericContainer( - "ghcr.io/mtes-mct/monitorenv/monitorenv-database:pg17-postgis3.5.1", + "ghcr.io/mtes-mct/monitorenv/monitorenv-database:pg17-postgis3.6.1-timescale2.24.0", ).apply { withExposedPorts(5432) withEnv("POSTGRES_DB", "testdb") diff --git a/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractKafkaTests.kt b/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractKafkaTests.kt new file mode 100644 index 0000000000..63717ea68b --- /dev/null +++ b/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/database/repositories/AbstractKafkaTests.kt @@ -0,0 +1,55 @@ +package fr.gouv.cacem.monitorenv.infrastructure.database.repositories + +import fr.gouv.cacem.monitorenv.MonitorEnvApplication +import org.junit.jupiter.api.BeforeEach +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.config.KafkaListenerEndpointRegistry +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.kafka.ConfluentKafkaContainer + +@SpringBootTest( + classes = [MonitorEnvApplication::class], + properties = ["monitorenv.scheduling.enabled=false"], +) +@Testcontainers +abstract class AbstractKafkaTests : AbstractDBTests() { + @Autowired + lateinit var registry: KafkaListenerEndpointRegistry + + @BeforeEach + fun setUp() { + registry.listenerContainers.forEach { container -> + ContainerTestUtils.waitForAssignment( + container, + container.containerProperties.topics?.size ?: 1, + ) + } + } + + companion object { + @JvmStatic + @Container + val kafka = + ConfluentKafkaContainer("confluentinc/cp-kafka:8.1.0").apply { + withExposedPorts(9092) + waitingFor( + Wait.forLogMessage(".*Kafka Server started.*\\s", 2), + ) + start() + } + + @JvmStatic + @DynamicPropertySource + fun props(reg: DynamicPropertyRegistry) { + reg.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers) + reg.add("monitorenv.kafka.ais.enabled", { true }) + reg.add("monitorenv.kafka.ais.producer.enabled", { true }) + } + } +} diff --git a/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListenerITests.kt b/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListenerITests.kt index 2f2f3cad68..ca41f185a5 100644 --- a/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListenerITests.kt +++ b/backend/src/test/kotlin/fr/gouv/cacem/monitorenv/infrastructure/kafka/AISListenerITests.kt @@ -1,34 +1,66 @@ package fr.gouv.cacem.monitorenv.infrastructure.kafka -import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.AbstractDBTests +import fr.gouv.cacem.monitorenv.config.KafkaAISProperties +import fr.gouv.cacem.monitorenv.infrastructure.database.model.AISPositionPK +import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.AbstractKafkaTests +import fr.gouv.cacem.monitorenv.infrastructure.database.repositories.interfaces.IDBAISPositionRepository +import fr.gouv.cacem.monitorenv.infrastructure.kafka.adapters.AISPayload +import jakarta.transaction.Transactional import org.assertj.core.api.Assertions.assertThat import org.awaitility.Awaitility import org.junit.jupiter.api.Test +import org.locationtech.jts.geom.Point +import org.locationtech.jts.io.WKTReader import org.springframework.beans.factory.annotation.Autowired +import org.springframework.data.repository.findByIdOrNull import org.springframework.kafka.core.KafkaTemplate +import java.time.ZonedDateTime import java.util.concurrent.TimeUnit -class AISListenerITests : AbstractDBTests() { +class AISListenerITests : AbstractKafkaTests() { @Autowired - lateinit var kafkaTemplate: KafkaTemplate + lateinit var kafkaTemplate: KafkaTemplate @Autowired - lateinit var aisListener: AISListener + lateinit var dbAISPositionRepository: IDBAISPositionRepository + @Autowired + lateinit var kafkaAISProperties: KafkaAISProperties + + @Transactional @Test - fun `listenAIS should return message that comes from AIS topic`() { + fun `listenAIS should save AISPosition that comes from AIS topic`() { // Given - val message = "Hello world" + val coord = "POINT(-2.7335 47.6078)" + val mmsi = 1234567890 + val ts = ZonedDateTime.parse("2025-01-01T00:00:00.00Z") + val aisPosition = + AISPayload( + mmsi = mmsi, + coord = coord, + status = "status", + course = 12.12, + heading = 10.12, + speed = 10.12, + ts = ts, + ) - // When - kafkaTemplate.send("ais", message) + kafkaTemplate.send(kafkaAISProperties.topic, aisPosition).get(10, TimeUnit.SECONDS) - // Then Awaitility .await() - .atMost(5, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .atMost(kafkaAISProperties.timeout, TimeUnit.SECONDS) .untilAsserted { - assertThat(aisListener.messages).contains(message) + val saved = dbAISPositionRepository.findByIdOrNull(AISPositionPK(mmsi = mmsi, ts = ts)) + assertThat(saved).isNotNull() + assertThat(saved?.id?.mmsi).isEqualTo(aisPosition.mmsi) + assertThat(saved?.id?.ts).isEqualTo(aisPosition.ts) + assertThat(saved?.coord).isEqualTo(WKTReader().read(coord) as Point) + assertThat(saved?.status).isEqualTo(aisPosition.status) + assertThat(saved?.course).isEqualTo(1212) + assertThat(saved?.heading).isEqualTo(1012) + assertThat(saved?.speed).isEqualTo(1012) } } } diff --git a/backend/src/test/resources/application.yml b/backend/src/test/resources/application.yml index 3b7e1eb85f..c183923e4a 100644 --- a/backend/src/test/resources/application.yml +++ b/backend/src/test/resources/application.yml @@ -1,13 +1,14 @@ monitorenv: - kafka: - ais: - enabled: true oidc: enabled: false brief: templatePath: /template_export_brief.docx tmpDocxPath: tmp_brief.docx tmpOdtPath: tmp_brief.odt + kafka: + ais: + batch-size: 1 + timeout: 5000 logging: level: @@ -21,5 +22,7 @@ spring: flyway: locations: classpath:/db/migration,classpath:/db/testdata kafka: + bootstrap-servers: localhost:9092 consumer: group-id: test + auto-offset-reset: latest diff --git a/datascience/tests/conftest.py b/datascience/tests/conftest.py index 3d91004734..c8fbda6af2 100644 --- a/datascience/tests/conftest.py +++ b/datascience/tests/conftest.py @@ -1,24 +1,22 @@ +import docker import itertools import os -import re -from dataclasses import dataclass, field -from pathlib import Path -from time import sleep -from typing import List - -import docker import pytest -from dotenv import dotenv_values -from pytest import MonkeyPatch -from sqlalchemy import text - +import re from config import ( HOST_MIGRATIONS_FOLDER, LOCAL_MIGRATIONS_FOLDER, ROOT_DIRECTORY, TEST_DATA_LOCATION, ) +from dataclasses import dataclass, field +from dotenv import dotenv_values +from pathlib import Path +from pytest import MonkeyPatch +from sqlalchemy import text from src.db_config import create_engine +from time import sleep +from typing import List local_migrations_folders = [ Path(LOCAL_MIGRATIONS_FOLDER) / "internal", @@ -45,6 +43,7 @@ test_data_scripts_folder = TEST_DATA_LOCATION / Path("remote_database") + ################################## Handle migrations ################################## @@ -132,7 +131,7 @@ def start_remote_database_container( client = create_docker_client print("Starting database container") remote_database_container = client.containers.run( - "ghcr.io/mtes-mct/monitorenv/monitorenv-database:pg17-postgis3.5.1", + "ghcr.io/mtes-mct/monitorenv/monitorenv-database:pg17-postgis3.6.1-timescale2.24.0", environment={ "POSTGRES_PASSWORD": os.environ["MONITORENV_REMOTE_DB_PWD"], "POSTGRES_USER": os.environ["MONITORENV_REMOTE_DB_USER"], diff --git a/docker-compose-test.yml b/docker-compose-test.yml index 3b0c89a991..24bf8f2304 100644 --- a/docker-compose-test.yml +++ b/docker-compose-test.yml @@ -1,28 +1,28 @@ services: db: command: - - "-c" + - "-c" - "shared_buffers=4096MB" - - "-c" + - "-c" - "work_mem=5006kB" - - "-c" + - "-c" - "maintenance_work_mem=1955MB" - - "-c" + - "-c" - "effective_io_concurrency=200" - - "-c" + - "-c" - "max_worker_processes=19" - - "-c" + - "-c" - "max_parallel_workers_per_gather=4" - - "-c" + - "-c" - "max_parallel_workers=8" - - "-c" + - "-c" - "wal_buffers=16MB" - - "-c" + - "-c" - "max_wal_size=1GB" - - "-c" - - "min_wal_size=512MB" - "-c" - - "shared_preload_libraries=pg_stat_statements" + - "min_wal_size=512MB" + - "-c" + - "shared_preload_libraries=pg_stat_statements,timescaledb" - "-c" - "pg_stat_statements.max=10000" - "-c" @@ -40,4 +40,4 @@ services: - "-c" - "max_locks_per_transaction=128" - "-c" - - "jit=off" \ No newline at end of file + - "jit=off" diff --git a/docker-compose.yml b/docker-compose.yml index e3bd6cffec..0e9afeb5df 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,8 +35,17 @@ services: - MONITORENV_EXT_ID=${MONITORENV_EXT_ID} - MONITORENV_EXT_PASSWORD=${MONITORENV_EXT_PASSWORD} - MONITORENV_KAFKA_AIS_ENABLED=${MONITORENV_KAFKA_AIS_ENABLED} + - MONITORENV_KAFKA_AIS_PRODUCER_ENABLED=${MONITORENV_KAFKA_AIS_PRODUCER_ENABLED} - MONITORENV_KAFKA_AIS_SERVERS=${MONITORENV_KAFKA_AIS_SERVERS} - MONITORENV_KAFKA_AIS_GROUP_ID=${MONITORENV_KAFKA_AIS_GROUP_ID} + - MONITORENV_KAFKA_AIS_KEYSTORE=${MONITORENV_KAFKA_AIS_KEYSTORE} + - MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD=${MONITORENV_KAFKA_AIS_KEYSTORE_PASSWORD} + - MONITORENV_KAFKA_AIS_KEY_PASSWORD=${MONITORENV_KAFKA_AIS_KEY_PASSWORD} + - MONITORENV_KAFKA_AIS_TRUSTSTORE=${MONITORENV_KAFKA_AIS_TRUSTSTORE} + - MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD=${MONITORENV_KAFKA_AIS_TRUSTSTORE_PASSWORD} + - MONITORENV_KAFKA_AIS_BATCH_SIZE=${MONITORENV_KAFKA_AIS_BATCH_SIZE} + - MONITORENV_KAFKA_AIS_TOPIC=${MONITORENV_KAFKA_AIS_TOPIC} + - MONITORENV_KAFKA_AIS_TIMEOUT=${MONITORENV_KAFKA_AIS_TIMEOUT} - MONITORENV_OIDC_ENABLED=${MONITORENV_OIDC_ENABLED} - MONITORENV_OIDC_LOGIN_URL=${MONITORENV_OIDC_LOGIN_URL} - MONITORENV_OIDC_SUCCESS_URL=${MONITORENV_OIDC_SUCCESS_URL} @@ -196,40 +205,38 @@ services: retries: 30 kafka: + container_name: kafka profiles: [ dev, test ] image: confluentinc/cp-kafka:8.1.0 ports: - - "9092:9092" + - "9093:9093" + volumes: + - ./infra/kafka/certs/kafka/:/etc/kafka/secrets environment: KAFKA_PROCESS_ROLES: broker,controller KAFKA_NODE_ID: 1 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "SSL:SSL,CONTROLLER:SSL" + KAFKA_LISTENERS: SSL://0.0.0.0:9093,CONTROLLER://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_NUM_PARTITIONS: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_BROKER_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - command: > - bash -c " - /etc/confluent/docker/run & - - echo 'Attente du broker Kafka...'; - until kafka-topics --bootstrap-server localhost:9092 --list >/dev/null 2>&1; do - sleep 2; - done; - - echo 'Kafka est prêt — début envoi.'; - i=0; - while true; do - echo \"auto message $i\" | kafka-console-producer --bootstrap-server localhost:9092 --topic ais; - echo \"Message envoyé: auto message $i\"; - i=\$((i+1)); - sleep 15; - done - " + KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL + KAFKA_SSL_KEYSTORE_TYPE: PKCS12 + KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.p12 + KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka-truststore.jks + KAFKA_SSL_KEYSTORE_FILENAME: kafka.p12 + KAFKA_SSL_KEYSTORE_PASSWORD: changeit + KAFKA_SSL_TRUSTSTORE_FILENAME: kafka-truststore.jks + KAFKA_SSL_TRUSTSTORE_PASSWORD: changeit + KAFKA_SSL_KEY_CREDENTIALS: kafka.keypass + KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka.keypass + KAFKA_SSL_TRUSTSTORE_CREDENTIALS: kafka.keypass + KAFKA_SSL_CLIENT_AUTH: required networks: backend_network: diff --git a/infra/kafka/certs/monitorenv/.gitkeep b/infra/kafka/certs/monitorenv/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/infra/kafka/scripts/create_certificate.sh b/infra/kafka/scripts/create_certificate.sh new file mode 100755 index 0000000000..d5ffabb158 --- /dev/null +++ b/infra/kafka/scripts/create_certificate.sh @@ -0,0 +1,34 @@ +# 1. Create CA +openssl genrsa -out ../certs/kafka/ca.key 4096 +openssl req -x509 -new -key ../certs/kafka/ca.key -days 3650 -out ../certs/kafka/ca.crt -subj "/CN=MyKafkaCA" + +# 2. Server key + CSR +openssl genrsa -out ../certs/kafka/kafka.key 4096 +openssl req -new -key ../certs/kafka/kafka.key -out ../certs/kafka/kafka.csr -subj "/CN=localhost" + +# 3. Sign server cert with CA +openssl x509 -req -in ../certs/kafka/kafka.csr -CA ../certs/kafka/ca.crt -CAkey ../certs/kafka/ca.key -CAcreateserial \ + -out ../certs/kafka/kafka.crt -days 3650 + +# 4. Client key + CSR +openssl genrsa -out ../certs/monitorenv/monitorenv.key 4096 +openssl req -new -key ../certs/monitorenv/monitorenv.key -out ../certs/monitorenv/monitorenv.csr -subj "/CN=monitorenv" + +openssl x509 -req -in ../certs/monitorenv/monitorenv.csr -CA ../certs/kafka/ca.crt -CAkey ../certs/kafka/ca.key -CAcreateserial \ + -out ../certs/monitorenv/monitorenv.crt -days 3650 + +# 5. PKCS12 keystores +openssl pkcs12 -export -in ../certs/kafka/kafka.crt -inkey ../certs/kafka/kafka.key -out ../certs/kafka/kafka.p12 -name kafka -password pass:changeit +openssl pkcs12 -export -in ../certs/monitorenv/monitorenv.crt -inkey ../certs/monitorenv/monitorenv.key -out ../certs/monitorenv/monitorenv.p12 -name monitorenv -password pass:changeit + +# 6. Truststores +keytool -import -trustcacerts -alias CARoot -file ../certs/kafka/ca.crt -keystore ../certs/kafka/kafka-truststore.jks -storepass changeit -noprompt +keytool -import -trustcacerts -alias CARoot -file ../certs/kafka/ca.crt -keystore ../certs/monitorenv/monitorenv-truststore.jks -storepass changeit -noprompt + +# 7. Copy Client truststore and keystore +cp ../certs/monitorenv/monitorenv-truststore.jks ../certs/kafka/monitorenv-truststore.jks +cp ../certs/monitorenv/monitorenv.p12 ../certs/kafka/monitorenv.p12 + +# 8. Create keypass that contains password +touch ../certs/kafka/kafka.keypass +echo "changeit" > ../certs/kafka/kafka.keypass diff --git a/pipeline/tests/conftest.py b/pipeline/tests/conftest.py index c8ccbbe808..4478c972c3 100644 --- a/pipeline/tests/conftest.py +++ b/pipeline/tests/conftest.py @@ -1,25 +1,23 @@ +import docker import itertools import os -import re -from dataclasses import dataclass, field -from pathlib import Path -from time import sleep -from typing import List - -import docker import pytest -from dotenv import get_key -from prefect.testing.utilities import prefect_test_harness -from pytest import MonkeyPatch -from sqlalchemy import text - +import re from config import ( DOTENV_PATH, HOST_MIGRATIONS_FOLDER, LOCAL_MIGRATIONS_FOLDER, TEST_DATA_LOCATION, ) +from dataclasses import dataclass, field +from dotenv import get_key +from pathlib import Path +from prefect.testing.utilities import prefect_test_harness +from pytest import MonkeyPatch +from sqlalchemy import text from src.db_config import create_engine +from time import sleep +from typing import List local_migrations_folders = [ Path(LOCAL_MIGRATIONS_FOLDER) / "internal", @@ -46,6 +44,7 @@ test_data_scripts_folder = TEST_DATA_LOCATION / Path("remote_database") + ################################## Handle migrations ################################## @@ -144,7 +143,7 @@ def start_remote_database_container( client = create_docker_client print("Starting database container") remote_database_container = client.containers.run( - "ghcr.io/mtes-mct/monitorenv/monitorenv-database:pg17-postgis3.5.1", + "ghcr.io/mtes-mct/monitorenv/monitorenv-database:pg17-postgis3.6.1-timescale2.24.0", environment={ "POSTGRES_PASSWORD": get_key( DOTENV_PATH, "MONITORENV_REMOTE_DB_PWD"