Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ lazy val managementClusterBootstrap = pekkoModule("management-cluster-bootstrap"
libraryDependencies := Dependencies.managementClusterBootstrap,
mimaPreviousArtifactsSet)
.dependsOn(management)
.dependsOn(managementPki)

lazy val leaseKubernetes = pekkoModule("lease-kubernetes")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ pekko.management {

# Max amount of jitter to be added on retries
probe-interval-jitter = 0.2

http-client {
# set this to your HTTPS certificate path if you want to setup a HTTPS trust store
ca-path = ""
# the TLS version to use when connecting to the API server
tls-version = "TLSv1.2"
}
}

join-decider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) {
object contactPoint {
private val contactPointConfig = bootConfig.getConfig("contact-point")

object httpClient {
private val httpClientConfig = contactPointConfig.getConfig("http-client")

val caPath: String = httpClientConfig.getString("ca-path")
val tlsVersion: String = httpClientConfig.getString("tls-version")
}

val fallbackPort: Int =
contactPointConfig
.optDefinedValue("fallback-port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package org.apache.pekko.management.cluster.bootstrap.internal

import java.time.LocalDateTime
import java.security.{ KeyStore, SecureRandom }
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeoutException
import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager }
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.Actor
import pekko.actor.ActorLogging
Expand All @@ -29,7 +32,9 @@ import pekko.actor.Timers
import pekko.annotation.InternalApi
import pekko.cluster.Cluster
import pekko.discovery.ServiceDiscovery.ResolvedTarget
import pekko.http.scaladsl.ConnectionContext
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.HttpsConnectionContext
import pekko.http.scaladsl.model.HttpResponse
import pekko.http.scaladsl.model.StatusCodes
import pekko.http.scaladsl.model.Uri
Expand All @@ -41,6 +46,7 @@ import pekko.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol
import pekko.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol }
import pekko.pattern.after
import pekko.pattern.pipe
import pekko.pki.kubernetes.PemManagersProvider

@InternalApi
private[bootstrap] object HttpContactPointBootstrap {
Expand All @@ -56,6 +62,26 @@ private[bootstrap] object HttpContactPointBootstrap {

private case object ProbeTick extends DeadLetterSuppression
private val ProbingTimerKey = "probing-key"

def generateSSLContext(settings: ClusterBootstrapSettings): SSLContext = {
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(null)
factory.init(keyStore, Array.empty)
val km: Array[KeyManager] = factory.getKeyManagers
val caPath = settings.contactPoint.httpClient.caPath.trim
val tm: Array[TrustManager] = if (caPath.isEmpty) {
Array.empty
} else {
val certificates = PemManagersProvider.loadCertificates(caPath)
PemManagersProvider.buildTrustManagers(certificates)
}
val tlsVersion = settings.contactPoint.httpClient.tlsVersion.trim
val random: SecureRandom = new SecureRandom
val sslContext = SSLContext.getInstance(tlsVersion)
sslContext.init(km, tm, random)
sslContext
}
}

/**
Expand Down Expand Up @@ -88,7 +114,12 @@ private[bootstrap] class HttpContactPointBootstrap(
}

private implicit val sys: ActorSystem = context.system

private lazy val clientSslContext: HttpsConnectionContext =
ConnectionContext.httpsClient(HttpContactPointBootstrap.generateSSLContext(settings))

private val http = Http()

private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0)
import context.dispatcher

Expand All @@ -111,7 +142,13 @@ private[bootstrap] class HttpContactPointBootstrap(
override def receive = {
case ProbeTick =>
log.debug("Probing [{}] for seed nodes...", probeRequest.uri)
val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse)
val reply = if (probeRequest.uri.scheme == "https") {
http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries,
connectionContext = clientSslContext)
} else {
http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries)
}.flatMap(handleResponse)

val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout)
Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self)

Expand Down
18 changes: 18 additions & 0 deletions management-cluster-bootstrap/src/test/files/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p
a3ViZUNBMB4XDTE3MTIxMjEzMzY1MVoXDTI3MTIxMDEzMzY1MVowFTETMBEGA1UE
AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMrk
QcE8e2L3Rnm8K51y1Y4CHWwx4XwD0SqPwGq9nnygFaBsibIIrex89Im4f73QaqR5
h87ypi0dyqlaTZdleZN7Q4hNSpWF1t/zSGanm7QOSl76FlTAFNm/eVNamfuGRf1x
OYWGWRwdct3Six5K+R/qHh6oJ9XDli9LuV4vxHTDB/mr/2Xgyz1MDrIdRDYpiqev
3HNJqnfXFT3eGWXk4ENZsc+I/R5LbSXA+cSQd9xrkrBhbreHLk99pif7eAKwVKNZ
Rcsp9QBgMOUAoFgk+sU6YeVrasXIF1R4BB7g+LpqpM3F6jqmD79j2mREMIU3kjEQ
eXMqi1W31i9ug1VxwTUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW
MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3
DQEBCwUAA4IBAQAyRchLY4Jhu1EBFlhYebGLrEO/twZCu2NQyM0by5XUoXApJeqf
S00Q7A67CRcQlbtRAH5vqhpCxutlKc26dF5Y1MmJmkGT7WIjujV0UIF/jJDnmwKK
+DRQl1UgA1e4WS6XOwUaSo9ltgJQ+GJfgkg3Xs3pzjjIpX94eF4V9ArJ8npRVO+w
cCxE01P+Nm9U5H24QnlY+1IxNeszitm34SGiRy6SqoKSfYQoNyQadG9KVybs4FAs
7aeYAB10I7FLFt4+Ji93zZjnWcKXjv59vz7NBDPtCsaXhJ82983GsfV2z+WQ3kRZ
R2XVTsdz8yu0rgmyewxVKH7Roo5Ts+qpZFbi
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@
package org.apache.pekko.management.cluster.bootstrap.internal

import java.util.concurrent.atomic.AtomicReference

import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem, Props }
import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import pekko.discovery.{ Lookup, MockDiscovery }
import pekko.http.scaladsl.model.Uri
import com.typesafe.config.ConfigFactory
import pekko.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping
import pekko.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider }
import org.scalatest.concurrent.Eventually
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class BootstrapCoordinatorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually {
val serviceName = "bootstrap-coordinator-test-service"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,82 @@

package org.apache.pekko.management.cluster.bootstrap.internal

import java.nio.file.NoSuchFileException

import org.apache.pekko
import pekko.actor.ActorPath
import pekko.actor.{ ActorPath, ActorSystem }
import pekko.event.Logging
import pekko.management.cluster.bootstrap.ClusterBootstrapSettings
import pekko.http.scaladsl.model.Uri.Host
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers {

"HttpContactPointBootstrap" should {
"use a safe name when connecting over IPv6" in {
val name = HttpContactPointBootstrap.name(Host("[fe80::1013:2070:258a:c662]"), 443)
ActorPath.isValidPathElement(name) should be(true)
}
"generate SSLContext with default config" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val settings = new ClusterBootstrapSettings(sys.settings.config, log)
HttpContactPointBootstrap.generateSSLContext(settings) should not be null
} finally {
sys.terminate()
}
}
"generate SSLContext with cert" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val cfg = ConfigFactory.parseString("""
pekko.management.cluster.bootstrap.contact-point.http-client {
ca-path = "management-cluster-bootstrap/src/test/files/ca.crt"
}""").withFallback(sys.settings.config)
val settings = new ClusterBootstrapSettings(cfg, log)
HttpContactPointBootstrap.generateSSLContext(settings) should not be null
} finally {
sys.terminate()
}
}
"fail to generate SSLContext with missing cert" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val cfg = ConfigFactory.parseString("""
pekko.management.cluster.bootstrap.contact-point.http-client {
ca-path = "management-cluster-bootstrap/src/test/files/non-existent.crt"
}""").withFallback(sys.settings.config)
val settings = new ClusterBootstrapSettings(cfg, log)
intercept[NoSuchFileException] {
HttpContactPointBootstrap.generateSSLContext(settings)
}
} finally {
sys.terminate()
}
}
"fail to generate SSLContext with bad tls-version" in {
val sys = ActorSystem("HttpContactPointBootstrapSpec")
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
try {
val cfg = ConfigFactory.parseString("""
pekko.management.cluster.bootstrap.contact-point.http-client {
ca-path = "management-cluster-bootstrap/src/test/files/ca.crt"
tls-version = "BAD_VERSION"
}""").withFallback(sys.settings.config)
val settings = new ClusterBootstrapSettings(cfg, log)
val nsae = intercept[java.security.NoSuchAlgorithmException] {
HttpContactPointBootstrap.generateSSLContext(settings)
}
nsae.getMessage.contains("BAD_VERSION") should be(true)
} finally {
sys.terminate()
}
}

}
}