Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ lazy val managementClusterBootstrap = pekkoModule("management-cluster-bootstrap"
libraryDependencies := Dependencies.managementClusterBootstrap,
mimaPreviousArtifactsSet)
.dependsOn(management)
.dependsOn(managementPki)

lazy val leaseKubernetes = pekkoModule("lease-kubernetes")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ pekko.management {

# Max amount of jitter to be added on retries
probe-interval-jitter = 0.2

http-client {
# set this to your HTTPS certificate path if you want to setup a HTTPS trust store
ca-path = ""
# the TLS version to use when connecting to contact points
tls-version = "TLSv1.2"
}
}

join-decider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) {
object contactPoint {
private val contactPointConfig = bootConfig.getConfig("contact-point")

object httpClient {
private val httpClientConfig = contactPointConfig.getConfig("http-client")

val caPath: String = httpClientConfig.getString("ca-path")
val tlsVersion: String = httpClientConfig.getString("tls-version")
}

val fallbackPort: Int =
contactPointConfig
.optDefinedValue("fallback-port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package org.apache.pekko.management.cluster.bootstrap.internal

import java.time.LocalDateTime
import java.security.{ KeyStore, SecureRandom }
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeoutException
import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager }
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.Actor
import pekko.actor.ActorLogging
Expand All @@ -29,7 +32,9 @@ import pekko.actor.Timers
import pekko.annotation.InternalApi
import pekko.cluster.Cluster
import pekko.discovery.ServiceDiscovery.ResolvedTarget
import pekko.http.scaladsl.ConnectionContext
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.HttpsConnectionContext
import pekko.http.scaladsl.model.HttpResponse
import pekko.http.scaladsl.model.StatusCodes
import pekko.http.scaladsl.model.Uri
Expand All @@ -41,6 +46,7 @@ import pekko.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol
import pekko.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol }
import pekko.pattern.after
import pekko.pattern.pipe
import pekko.pki.kubernetes.PemManagersProvider

@InternalApi
private[bootstrap] object HttpContactPointBootstrap {
Expand All @@ -56,6 +62,26 @@ private[bootstrap] object HttpContactPointBootstrap {

private case object ProbeTick extends DeadLetterSuppression
private val ProbingTimerKey = "probing-key"

def generateSSLContext(settings: ClusterBootstrapSettings): SSLContext = {
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(null)
factory.init(keyStore, Array.empty)
val km: Array[KeyManager] = factory.getKeyManagers
val caPath = settings.contactPoint.httpClient.caPath.trim
val tm: Array[TrustManager] = if (caPath.isEmpty) {
Array.empty
} else {
val certificates = PemManagersProvider.loadCertificates(caPath)
PemManagersProvider.buildTrustManagers(certificates)
}
val tlsVersion = settings.contactPoint.httpClient.tlsVersion.trim
val random: SecureRandom = new SecureRandom
val sslContext = SSLContext.getInstance(tlsVersion)
sslContext.init(km, tm, random)
sslContext
}
}

/**
Expand Down Expand Up @@ -88,7 +114,12 @@ private[bootstrap] class HttpContactPointBootstrap(
}

private implicit val sys: ActorSystem = context.system

private lazy val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(HttpContactPointBootstrap.generateSSLContext(settings))

private val http = Http()

private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0)
import context.dispatcher

Expand All @@ -111,7 +142,13 @@ private[bootstrap] class HttpContactPointBootstrap(
override def receive = {
case ProbeTick =>
log.debug("Probing [{}] for seed nodes...", probeRequest.uri)
val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse)
val reply = if (probeRequest.uri.scheme == "https") {
http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries,
connectionContext = clientSslContext)
} else {
http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries)
}.flatMap(handleResponse)

val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout)
Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self)

Expand Down
18 changes: 18 additions & 0 deletions management-cluster-bootstrap/src/test/files/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p
a3ViZUNBMB4XDTE3MTIxMjEzMzY1MVoXDTI3MTIxMDEzMzY1MVowFTETMBEGA1UE
AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMrk
QcE8e2L3Rnm8K51y1Y4CHWwx4XwD0SqPwGq9nnygFaBsibIIrex89Im4f73QaqR5
h87ypi0dyqlaTZdleZN7Q4hNSpWF1t/zSGanm7QOSl76FlTAFNm/eVNamfuGRf1x
OYWGWRwdct3Six5K+R/qHh6oJ9XDli9LuV4vxHTDB/mr/2Xgyz1MDrIdRDYpiqev
3HNJqnfXFT3eGWXk4ENZsc+I/R5LbSXA+cSQd9xrkrBhbreHLk99pif7eAKwVKNZ
Rcsp9QBgMOUAoFgk+sU6YeVrasXIF1R4BB7g+LpqpM3F6jqmD79j2mREMIU3kjEQ
eXMqi1W31i9ug1VxwTUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW
MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3
DQEBCwUAA4IBAQAyRchLY4Jhu1EBFlhYebGLrEO/twZCu2NQyM0by5XUoXApJeqf
S00Q7A67CRcQlbtRAH5vqhpCxutlKc26dF5Y1MmJmkGT7WIjujV0UIF/jJDnmwKK
+DRQl1UgA1e4WS6XOwUaSo9ltgJQ+GJfgkg3Xs3pzjjIpX94eF4V9ArJ8npRVO+w
cCxE01P+Nm9U5H24QnlY+1IxNeszitm34SGiRy6SqoKSfYQoNyQadG9KVybs4FAs
7aeYAB10I7FLFt4+Ji93zZjnWcKXjv59vz7NBDPtCsaXhJ82983GsfV2z+WQ3kRZ
R2XVTsdz8yu0rgmyewxVKH7Roo5Ts+qpZFbi
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@
package org.apache.pekko.management.cluster.bootstrap.internal

import java.util.concurrent.atomic.AtomicReference

import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem, Props }
import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import pekko.discovery.{ Lookup, MockDiscovery }
import pekko.http.scaladsl.model.Uri
import com.typesafe.config.ConfigFactory
import pekko.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping
import pekko.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider }
import org.scalatest.concurrent.Eventually
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class BootstrapCoordinatorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually {
val serviceName = "bootstrap-coordinator-test-service"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,82 @@

package org.apache.pekko.management.cluster.bootstrap.internal

import java.nio.file.NoSuchFileException

import org.apache.pekko
import pekko.actor.ActorPath
import pekko.actor.{ ActorPath, ActorSystem }
import pekko.event.Logging
import pekko.management.cluster.bootstrap.ClusterBootstrapSettings
import pekko.http.scaladsl.model.Uri.Host
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers {

"HttpContactPointBootstrap" should {
"use a safe name when connecting over IPv6" in {
val name = HttpContactPointBootstrap.name(Host("[fe80::1013:2070:258a:c662]"), 443)
ActorPath.isValidPathElement(name) should be(true)
}
"generate SSLContext with default config" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val settings = new ClusterBootstrapSettings(sys.settings.config, log)
HttpContactPointBootstrap.generateSSLContext(settings) should not be null
} finally {
sys.terminate()
}
}
"generate SSLContext with cert" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val cfg = ConfigFactory.parseString("""
pekko.management.cluster.bootstrap.contact-point.http-client {
ca-path = "management-cluster-bootstrap/src/test/files/ca.crt"
}""").withFallback(sys.settings.config)
val settings = new ClusterBootstrapSettings(cfg, log)
HttpContactPointBootstrap.generateSSLContext(settings) should not be null
} finally {
sys.terminate()
}
}
"fail to generate SSLContext with missing cert" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val cfg = ConfigFactory.parseString("""
pekko.management.cluster.bootstrap.contact-point.http-client {
ca-path = "management-cluster-bootstrap/src/test/files/non-existent.crt"
}""").withFallback(sys.settings.config)
val settings = new ClusterBootstrapSettings(cfg, log)
intercept[NoSuchFileException] {
HttpContactPointBootstrap.generateSSLContext(settings)
}
} finally {
sys.terminate()
}
}
"fail to generate SSLContext with bad tls-version" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val cfg = ConfigFactory.parseString("""
pekko.management.cluster.bootstrap.contact-point.http-client {
ca-path = "management-cluster-bootstrap/src/test/files/ca.crt"
tls-version = "BAD_VERSION"
}""").withFallback(sys.settings.config)
val settings = new ClusterBootstrapSettings(cfg, log)
val noSuchAlgorithmException = intercept[java.security.NoSuchAlgorithmException] {
HttpContactPointBootstrap.generateSSLContext(settings)
}
noSuchAlgorithmException.getMessage.contains("BAD_VERSION") should be(true)
} finally {
sys.terminate()
}
}

}
}
Loading