Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ lazy val discoveryConsul = pekkoModule("discovery-consul")
libraryDependencies := Dependencies.discoveryConsul,
mimaPreviousArtifactsSet)

lazy val discoveryEureka = pekkoModule("discovery-eureka")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
.settings(
name := "pekko-discovery-eureka",
libraryDependencies := Dependencies.discoveryEureka,
mimaPreviousArtifactsSet)

// gathers all enabled routes and serves them (HTTP or otherwise)
lazy val management = pekkoModule("management")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
Expand Down
32 changes: 32 additions & 0 deletions discovery-eureka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# SPDX-License-Identifier: Apache-2.0

######################################################
# Apache Pekko Service Discovery Eureka Config #
######################################################

pekko.discovery {

eureka {
class = org.apache.pekko.discovery.eureka.EurekaServiceDiscovery
# default eureka schema
eureka-schema = "http"
# default eureka host
eureka-host = "127.0.0.1"
# default eureka port
eureka-port = 8761
# default eureka-path
eureka-path = "eureka"
# default discovery service group.
group-name = "DEFAULT_GROUP"
# default eureka registration status page url
status-page-url = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Roiocam most of these configs are unused - can we remove the unused ones?

# default eureka registration health page url
health-page-url =""
# default eureka registration home page url
home-page-url =""
# default eureka registration service port
service-port = 80
# default eureka renew interval millis
renew-interval = 30000
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.discovery.eureka

object EurekaResponse {
case class Application(name: String, instance: Seq[Instance])
case class Instance(hostName: String, app: String, vipAddress: String, secureVipAddress: String, ipAddr: String,
status: String, port: PortWrapper, securePort: PortWrapper, healthCheckUrl: String, statusPageUrl: String,
homePageUrl: String, appGroupName: String, dataCenterInfo: DataCenterInfo, lastDirtyTimestamp: String)
case class Status()
case class PortWrapper(port: Int, enabled: Boolean)
case class DataCenterInfo(name: String = "MyOwn",
clz: String = "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo")
}

import EurekaResponse._

case class EurekaResponse(application: Application, errorCode: Option[String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.discovery.eureka

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import org.apache.pekko.discovery.eureka.EurekaServiceDiscovery.{ pick, targets }
import org.apache.pekko.discovery.eureka.JsonFormat._
import org.apache.pekko.discovery.{ Lookup, ServiceDiscovery }
import org.apache.pekko.event.{ LogSource, Logging }
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.headers._
import org.apache.pekko.http.scaladsl.model.{ HttpRequest, MediaRange, MediaTypes, Uri }
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal

import java.net.InetAddress
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

object EurekaServiceDiscovery {
private[eureka] def pick(
instances: Seq[EurekaResponse.Instance], group: String): Future[Seq[EurekaResponse.Instance]] = {
Future.successful(instances.collect {
case instance if instance.status == "UP" && instance.appGroupName == group => instance
})
}

private[eureka] def targets(instances: Seq[EurekaResponse.Instance]): Seq[ResolvedTarget] = {
instances.map { instance =>
ResolvedTarget(
host = instance.ipAddr,
port = Some(instance.port.port),
address = Try(InetAddress.getByName(instance.ipAddr)).toOption)
}
}
}

class EurekaServiceDiscovery(implicit system: ActorSystem) extends ServiceDiscovery {

import system.dispatcher

private val log = Logging(system, getClass)(LogSource.fromClass)
private val settings = EurekaSettings(system)
private val (schema, host, port, path, group) =
(settings.schema, settings.host, settings.port, settings.path, settings.groupName)
private val http = Http()

override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = {

val uriPath = Uri.Path.Empty / path / "apps" / lookup.serviceName
val uri = Uri.from(scheme = schema, host = host, port = port).withPath(uriPath)
val request = HttpRequest(uri = uri,
headers = Seq(`Accept-Encoding`(HttpEncodings.gzip), Accept(MediaRange(MediaTypes.`application/json`))))

log.info("Requesting seed nodes by: {}", request.uri)

for {
response <- http.singleRequest(request)
entity <- response.entity.toStrict(resolveTimeout)
response <- {
log.debug("Eureka response: [{}]", entity.data.utf8String)
val unmarshalled = Unmarshal(entity).to[EurekaResponse]
unmarshalled.failed.foreach { _ =>
log.error(
"Failed to unmarshal Eureka response status [{}], entity: [{}], uri: [{}]",
response.status.value,
entity.data.utf8String,
uri)
}
unmarshalled
}
instances <- pick(response.application.instance, group)
} yield Resolved(lookup.serviceName, targets(instances))

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.discovery.eureka

import org.apache.pekko
import pekko.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import pekko.annotation.ApiMayChange

@ApiMayChange
final class EurekaSettings(system: ExtendedActorSystem) extends Extension {
private val eurekaConfig = system.settings.config.getConfig("pekko.discovery.eureka")

val schema: String = eurekaConfig.getString("eureka-schema")
val host: String = eurekaConfig.getString("eureka-host")
val port: Int = eurekaConfig.getInt("eureka-port")
val path: String = eurekaConfig.getString("eureka-path")
val groupName: String = eurekaConfig.getString("group-name")
val statusPageUrl: String = eurekaConfig.getString("status-page-url")
val healthCheckUrl: String = eurekaConfig.getString("health-page-url")
val homePageUrl: String = eurekaConfig.getString("home-page-url")
val servicePort: Int = eurekaConfig.getInt("service-port")
val serviceName: String = system.name
val renewInterval: Long = eurekaConfig.getLong("renew-interval")
}

@ApiMayChange
object EurekaSettings extends ExtensionId[EurekaSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): EurekaSettings = super.get(system)

override def get(system: ClassicActorSystemProvider): EurekaSettings = super.get(system)

override def lookup: EurekaSettings.type = EurekaSettings

override def createExtension(system: ExtendedActorSystem): EurekaSettings = new EurekaSettings(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.discovery.eureka

import org.apache.pekko.discovery.eureka.EurekaResponse.{ Application, DataCenterInfo, Instance, PortWrapper }
import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json._

object JsonFormat extends SprayJsonSupport with DefaultJsonProtocol {
implicit val portFormat: JsonFormat[PortWrapper] = new JsonFormat[PortWrapper] {

override def read(json: JsValue): PortWrapper = {
json.asJsObject.getFields("$", "@enabled") match {
case Seq(JsNumber(port), JsString(enabled)) => PortWrapper(port.toInt, enabled.toBoolean)
case _ => throw DeserializationException("PortWrapper expected")
}
}

override def write(obj: PortWrapper): JsValue = JsObject(
"$" -> JsNumber(obj.port),
"@enabled" -> JsString(obj.enabled.toString))
}
implicit val dataCenterInfoFormat: JsonFormat[DataCenterInfo] = new JsonFormat[DataCenterInfo] {

override def read(json: JsValue): DataCenterInfo = {
json.asJsObject.getFields("name", "@class") match {
case Seq(JsString(name), JsString(clz)) => DataCenterInfo(name, clz)
case _ => throw DeserializationException("DataCenterInfo expected")
}
}

override def write(obj: DataCenterInfo): JsValue = JsObject(
"name" -> JsString(obj.name),
"@class" -> JsString(obj.clz))
}
implicit val instanceFormat: JsonFormat[Instance] = jsonFormat14(Instance.apply)
implicit val applicationFormat: JsonFormat[Application] = jsonFormat2(Application.apply)
implicit val rootFormat: RootJsonFormat[EurekaResponse] = jsonFormat2(EurekaResponse.apply)
}
Loading