diff --git a/build.sbt b/build.sbt index 351e9ef9..c3e76ef5 100644 --- a/build.sbt +++ b/build.sbt @@ -151,6 +151,7 @@ lazy val managementClusterBootstrap = pekkoModule("management-cluster-bootstrap" libraryDependencies := Dependencies.managementClusterBootstrap, mimaPreviousArtifactsSet) .dependsOn(management) + .dependsOn(managementPki) lazy val leaseKubernetes = pekkoModule("lease-kubernetes") .enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin) diff --git a/management-cluster-bootstrap/src/main/resources/reference.conf b/management-cluster-bootstrap/src/main/resources/reference.conf index 5332ae2e..0f4d7844 100644 --- a/management-cluster-bootstrap/src/main/resources/reference.conf +++ b/management-cluster-bootstrap/src/main/resources/reference.conf @@ -134,6 +134,13 @@ pekko.management { # Max amount of jitter to be added on retries probe-interval-jitter = 0.2 + + http-client { + # set this to your HTTPS certificate path if you want to setup a HTTPS trust store + ca-path = "" + # the TLS version to use when connecting to contact points + tls-version = "TLSv1.2" + } } join-decider { diff --git a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala index ca6d47e5..ba77a437 100644 --- a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala +++ b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala @@ -134,6 +134,13 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) { object contactPoint { private val contactPointConfig = bootConfig.getConfig("contact-point") + object httpClient { + private val httpClientConfig = contactPointConfig.getConfig("http-client") + + val caPath: String = httpClientConfig.getString("ca-path") + val tlsVersion: String = httpClientConfig.getString("tls-version") + } + val fallbackPort: Int = contactPointConfig .optDefinedValue("fallback-port") diff --git a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala index cea30b9b..f9a2c62a 100644 --- a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala +++ b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala @@ -14,10 +14,13 @@ package org.apache.pekko.management.cluster.bootstrap.internal import java.time.LocalDateTime +import java.security.{ KeyStore, SecureRandom } import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeoutException +import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager } import scala.concurrent.Future import scala.concurrent.duration._ + import org.apache.pekko import pekko.actor.Actor import pekko.actor.ActorLogging @@ -29,7 +32,9 @@ import pekko.actor.Timers import pekko.annotation.InternalApi import pekko.cluster.Cluster import pekko.discovery.ServiceDiscovery.ResolvedTarget +import pekko.http.scaladsl.ConnectionContext import pekko.http.scaladsl.Http +import pekko.http.scaladsl.HttpsConnectionContext import pekko.http.scaladsl.model.HttpResponse import pekko.http.scaladsl.model.StatusCodes import pekko.http.scaladsl.model.Uri @@ -41,6 +46,7 @@ import pekko.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol import pekko.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol } import pekko.pattern.after import pekko.pattern.pipe +import pekko.pki.kubernetes.PemManagersProvider @InternalApi private[bootstrap] object HttpContactPointBootstrap { @@ -56,6 +62,26 @@ private[bootstrap] object HttpContactPointBootstrap { private case object ProbeTick extends DeadLetterSuppression private val ProbingTimerKey = "probing-key" + + def generateSSLContext(settings: ClusterBootstrapSettings): SSLContext = { + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val caPath = settings.contactPoint.httpClient.caPath.trim + val tm: Array[TrustManager] = if (caPath.isEmpty) { + Array.empty + } else { + val certificates = PemManagersProvider.loadCertificates(caPath) + PemManagersProvider.buildTrustManagers(certificates) + } + val tlsVersion = settings.contactPoint.httpClient.tlsVersion.trim + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance(tlsVersion) + sslContext.init(km, tm, random) + sslContext + } } /** @@ -88,7 +114,12 @@ private[bootstrap] class HttpContactPointBootstrap( } private implicit val sys: ActorSystem = context.system + + private lazy val clientSslContext: HttpsConnectionContext = + ConnectionContext.httpsClient(HttpContactPointBootstrap.generateSSLContext(settings)) + private val http = Http() + private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0) import context.dispatcher @@ -111,7 +142,13 @@ private[bootstrap] class HttpContactPointBootstrap( override def receive = { case ProbeTick => log.debug("Probing [{}] for seed nodes...", probeRequest.uri) - val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse) + val reply = if (probeRequest.uri.scheme == "https") { + http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries, + connectionContext = clientSslContext) + } else { + http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries) + }.flatMap(handleResponse) + val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout) Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self) diff --git a/management-cluster-bootstrap/src/test/files/ca.crt b/management-cluster-bootstrap/src/test/files/ca.crt new file mode 100644 index 00000000..7fc98192 --- /dev/null +++ b/management-cluster-bootstrap/src/test/files/ca.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p +a3ViZUNBMB4XDTE3MTIxMjEzMzY1MVoXDTI3MTIxMDEzMzY1MVowFTETMBEGA1UE +AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMrk +QcE8e2L3Rnm8K51y1Y4CHWwx4XwD0SqPwGq9nnygFaBsibIIrex89Im4f73QaqR5 +h87ypi0dyqlaTZdleZN7Q4hNSpWF1t/zSGanm7QOSl76FlTAFNm/eVNamfuGRf1x +OYWGWRwdct3Six5K+R/qHh6oJ9XDli9LuV4vxHTDB/mr/2Xgyz1MDrIdRDYpiqev +3HNJqnfXFT3eGWXk4ENZsc+I/R5LbSXA+cSQd9xrkrBhbreHLk99pif7eAKwVKNZ +Rcsp9QBgMOUAoFgk+sU6YeVrasXIF1R4BB7g+LpqpM3F6jqmD79j2mREMIU3kjEQ +eXMqi1W31i9ug1VxwTUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW +MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4IBAQAyRchLY4Jhu1EBFlhYebGLrEO/twZCu2NQyM0by5XUoXApJeqf +S00Q7A67CRcQlbtRAH5vqhpCxutlKc26dF5Y1MmJmkGT7WIjujV0UIF/jJDnmwKK ++DRQl1UgA1e4WS6XOwUaSo9ltgJQ+GJfgkg3Xs3pzjjIpX94eF4V9ArJ8npRVO+w +cCxE01P+Nm9U5H24QnlY+1IxNeszitm34SGiRy6SqoKSfYQoNyQadG9KVybs4FAs +7aeYAB10I7FLFt4+Ji93zZjnWcKXjv59vz7NBDPtCsaXhJ82983GsfV2z+WQ3kRZ +R2XVTsdz8yu0rgmyewxVKH7Roo5Ts+qpZFbi +-----END CERTIFICATE----- diff --git a/management-cluster-bootstrap/src/test/resources/reference.conf b/management-cluster-bootstrap/src/test/resources/application.conf similarity index 100% rename from management-cluster-bootstrap/src/test/resources/reference.conf rename to management-cluster-bootstrap/src/test/resources/application.conf diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala index 32c72aae..9995694c 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala @@ -14,22 +14,23 @@ package org.apache.pekko.management.cluster.bootstrap.internal import java.util.concurrent.atomic.AtomicReference + import org.apache.pekko import pekko.actor.{ ActorRef, ActorSystem, Props } import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } import pekko.discovery.{ Lookup, MockDiscovery } import pekko.http.scaladsl.model.Uri -import com.typesafe.config.ConfigFactory import pekko.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping import pekko.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider } -import org.scalatest.concurrent.Eventually +import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers import org.scalatest.time.{ Millis, Seconds, Span } +import org.scalatest.wordspec.AnyWordSpec import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec class BootstrapCoordinatorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually { val serviceName = "bootstrap-coordinator-test-service" diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala index ef80e5be..b3f8892d 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala @@ -13,17 +13,82 @@ package org.apache.pekko.management.cluster.bootstrap.internal +import java.nio.file.NoSuchFileException + import org.apache.pekko -import pekko.actor.ActorPath +import pekko.actor.{ ActorPath, ActorSystem } +import pekko.event.Logging +import pekko.management.cluster.bootstrap.ClusterBootstrapSettings import pekko.http.scaladsl.model.Uri.Host +import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers { + "HttpContactPointBootstrap" should { "use a safe name when connecting over IPv6" in { val name = HttpContactPointBootstrap.name(Host("[fe80::1013:2070:258a:c662]"), 443) ActorPath.isValidPathElement(name) should be(true) } + "generate SSLContext with default config" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val settings = new ClusterBootstrapSettings(sys.settings.config, log) + HttpContactPointBootstrap.generateSSLContext(settings) should not be null + } finally { + sys.terminate() + } + } + "generate SSLContext with cert" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/ca.crt" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + HttpContactPointBootstrap.generateSSLContext(settings) should not be null + } finally { + sys.terminate() + } + } + "fail to generate SSLContext with missing cert" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/non-existent.crt" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + intercept[NoSuchFileException] { + HttpContactPointBootstrap.generateSSLContext(settings) + } + } finally { + sys.terminate() + } + } + "fail to generate SSLContext with bad tls-version" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/ca.crt" + tls-version = "BAD_VERSION" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + val noSuchAlgorithmException = intercept[java.security.NoSuchAlgorithmException] { + HttpContactPointBootstrap.generateSSLContext(settings) + } + noSuchAlgorithmException.getMessage.contains("BAD_VERSION") should be(true) + } finally { + sys.terminate() + } + } + } }