Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ gleam = ">= 1.0.0"
# links = [{ title = "Website", href = "https://gleam.run" }]
#
# For a full reference of all the available options, you can have a look at
# https://gleam.run/writing-gleam/gleam-toml/.
# https://gleam.run/writing-gleam/gleam-toml/.

[dependencies]
gleam_stdlib = "~> 0.34 or ~> 1.0"
nessie = "~> 0.1 or ~> 1.0"
# nessie = "~> 0.1 or ~> 1.0"
nessie = { git = "git@github.com:bchase/nessie.git", ref="upgrade-stdlib-removed-dynamic-from" }

gleam_otp = "~> 0.10 or ~> 1.0"
gleam_erlang = "~> 0.25 or ~> 1.0"

Expand Down
12 changes: 6 additions & 6 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
# You typically do not need to edit this file

packages = [
{ name = "gleam_erlang", version = "0.25.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "054D571A7092D2A9727B3E5D183B7507DAB0DA41556EC9133606F09C15497373" },
{ name = "gleam_otp", version = "0.10.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "0B04FE915ACECE539B317F9652CAADBBC0F000184D586AAAF2D94C100945D72B" },
{ name = "gleam_stdlib", version = "0.36.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "C0D14D807FEC6F8A08A7C9EF8DFDE6AE5C10E40E21325B2B29365965D82EB3D4" },
{ name = "gleeunit", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "72CDC3D3F719478F26C4E2C5FED3E657AC81EC14A47D2D2DEBB8693CA3220C3B" },
{ name = "nessie", version = "0.1.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "nessie", source = "hex", outer_checksum = "FA7DFF0F802CC75666D88A514B521012E83CC3A1C6785ED691563306AD9EB485" },
{ name = "gleam_erlang", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "1124AD3AA21143E5AF0FC5CF3D9529F6DB8CA03E43A55711B60B6B7B3874375C" },
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
{ name = "gleam_stdlib", version = "0.65.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "7C69C71D8C493AE11A5184828A77110EB05A7786EBF8B25B36A72F879C3EE107" },
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
{ name = "nessie", version = "0.1.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], source = "git", repo = "git@github.com:bchase/nessie.git", commit = "5ed7bf1f011f3c012419305bb5ef29ce74a73635" },
]

[requirements]
gleam_erlang = { version = "~> 0.25 or ~> 1.0" }
gleam_otp = { version = "~> 0.10 or ~> 1.0" }
gleam_stdlib = { version = "~> 0.34 or ~> 1.0" }
gleeunit = { version = "~> 1.0" }
nessie = { version = "~> 0.1 or ~> 1.0" }
nessie = { git = "git@github.com:bchase/nessie.git", ref = "upgrade-stdlib-removed-dynamic-from" }
256 changes: 139 additions & 117 deletions src/nessie_cluster.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

import gleam/erlang/atom.{type Atom}
import gleam/erlang/node.{type ConnectError, type Node}
import gleam/erlang/process.{type Subject, type Timer}
import gleam/function
import gleam/erlang/process.{type Subject, type Timer, type Name}
import gleam/io
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/otp/actor
import gleam/otp/supervision
import gleam/result
import gleam/string
import nessie
Expand Down Expand Up @@ -61,7 +61,7 @@ pub type DnsQuery {
/// use the `with_*` functions to configure it.
pub opaque type DnsCluster {
DnsCluster(
name: Atom,
name: Name(Message),
query: DnsQuery,
interval_millis: Option(Int),
logger: Logger,
Expand Down Expand Up @@ -95,7 +95,7 @@ type DnsClusterState {
/// `with_query`.
pub fn new() -> DnsCluster {
DnsCluster(
name: atom.create_from_string("nessie_cluster"),
name: process.new_name("nessie_cluster"),
query: Ignore,
interval_millis: Some(5000),
logger: default_logger("[nessie_cluster]"),
Expand All @@ -106,7 +106,7 @@ pub fn new() -> DnsCluster {
/// Use a custom name for the process.
///
/// The default `nessie_cluster` name is typically sufficient.
pub fn with_name(for cluster: DnsCluster, using name: Atom) -> DnsCluster {
pub fn with_name(for cluster: DnsCluster, using name: Name(Message)) -> DnsCluster {
DnsCluster(..cluster, name: name)
}

Expand Down Expand Up @@ -172,10 +172,10 @@ pub opaque type Message {
pub fn discover_nodes(
on subject: Subject(Message),
timeout_millis timeout: Option(Int),
) -> Result(#(List(Node), List(NodeConnectError)), process.CallError(_)) {
) -> Result(#(List(Node), List(NodeConnectError)), Nil) {
case timeout {
Some(timeout) ->
process.try_call(
try_call(
subject,
fn(client) { DiscoverNodes(Some(client), True) },
timeout,
Expand All @@ -193,8 +193,20 @@ pub fn discover_nodes(
pub fn stop(
subject: Subject(Message),
timeout: Int,
) -> Result(Nil, process.CallError(_)) {
process.try_call(subject, Stop, timeout)
) -> Result(Nil, Nil) {
try_call(subject, Stop, timeout)
}

fn try_call(
subject: Subject(Message),
message: fn(Subject(a)) -> Message,
timeout: Int,
) -> Result(a, Nil) {
let client = process.new_subject()

process.send(subject, message(client))

process.receive(client, timeout)
}

/// Returns a boolean indicating whether DNS discovery has
Expand All @@ -205,8 +217,15 @@ pub fn stop(
pub fn has_ran(
subject: Subject(Message),
timeout: Int,
) -> Result(Bool, process.CallError(_)) {
process.try_call(subject, HasRan, timeout)
) -> Result(Bool, Nil) {
try_call(subject, HasRan, timeout)
}

/// Calls `start` from within a `ChildSpecification`
pub fn supervised(
cluster: DnsCluster,
) -> supervision.ChildSpecification(Subject(Message)) {
supervision.worker(fn() { start(cluster) })
}

/// Starts an actor which will periodically poll DNS for
Expand All @@ -215,118 +234,121 @@ pub fn has_ran(
/// If the cluster's query is `Ignore`, the actor will start
/// successfully, but will not perform any DNS lookups or
/// attempt to connect to any nodes.
pub fn start_spec(
pub fn start(
cluster: DnsCluster,
parent_subject: Option(Subject(Subject(Message))),
) -> Result(Subject(Message), actor.StartError) {
actor.start_spec(spec(cluster, parent_subject))
) -> Result(actor.Started(Subject(Message)), actor.StartError) {
case init(cluster) {
Error(err) ->
Error(actor.InitFailed(err))

Ok(state) ->
state
|> actor.new
|> actor.named(cluster.name)
|> actor.on_message(handle_message)
|> actor.start
}
}

fn spec(cluster: DnsCluster, parent_subject: Option(Subject(Subject(Message)))) {
actor.Spec(
init_timeout: 10_000,
init: fn() {
let basename_result =
node.self()
|> node.to_atom()
|> cluster.resolver.basename()
case basename_result {
Ok(basename) -> {
let _ = process.register(process.self(), cluster.name)
let state =
DnsClusterState(
cluster: cluster,
basename: basename,
poll_timer: None,
self: process.new_subject(),
has_ran: False,
)
case cluster.query, cluster.interval_millis {
_, None -> Nil
Ignore, _ -> Nil
DnsQuery(_), _ ->
process.send(state.self, DiscoverNodes(None, False))
}
option.map(parent_subject, process.send(_, state.self))
let selector =
process.selecting(
process.new_selector(),
state.self,
function.identity,
)
actor.Ready(state: state, selector: selector)
}
Error(_) -> actor.Failed("Failed to get node basename")
fn init(
cluster: DnsCluster,
) -> Result(DnsClusterState, String) {
let basename_result =
node.self()
|> node.name()
|> cluster.resolver.basename()

case basename_result {
Ok(basename) -> {
let state =
DnsClusterState(
cluster: cluster,
basename: basename,
poll_timer: None,
self: process.new_subject(),
has_ran: False,
)

case cluster.query, cluster.interval_millis {
_, None -> Nil
Ignore, _ -> Nil
DnsQuery(_), _ ->
process.send(state.self, DiscoverNodes(None, False))
}
},
loop: fn(msg: Message, state: DnsClusterState) {
case msg, state.cluster.query {
Stop(client), _ -> {
option.map(state.poll_timer, process.cancel_timer)
let _ = process.unregister(state.cluster.name)
process.send(client, Nil)
state.cluster.logger("warn", "DNS cluster stopped.")
actor.Stop(process.Normal)
}
HasRan(client), _ -> {
process.send(client, state.has_ran)
actor.Continue(state: state, selector: None)
}
DiscoverNodes(maybe_client, manual), DnsQuery(query) -> {
let cluster = state.cluster

let errors =
do_discover_nodes(
cluster.resolver,
cluster.logger,
state.basename,
query,
)

let state = case cluster.interval_millis, maybe_client, manual {
// If there is an available client, send it a response.
_, Some(client), _ -> {
let connected_nodes = cluster.resolver.list_nodes()
actor.send(client, #(connected_nodes, errors))
state
}
// If no client and manual call, skip timer reset
_, _, True -> state
// If no interval is set, skip timer reset
None, _, _ -> state
// Finally we are confident this is not a manual invocation AND we have an interval
Some(interval_millis), _, _ ->
DnsClusterState(
..state,
poll_timer: Some(process.send_after(
state.self,
interval_millis,
DiscoverNodes(None, False),
)),
)
}

let state = DnsClusterState(..state, has_ran: True)
actor.Continue(state: state, selector: None)
}

DiscoverNodes(maybe_client, _), Ignore -> {
state.cluster.logger(
"warn",
"DNS cluster is set to ignore, will not discover or connect to nodes.",
Ok(state)
}

Error(_) -> Error("Failed to get node basename")
}
}

fn handle_message(state: DnsClusterState, msg: Message) -> actor.Next(DnsClusterState, Message) {
case msg, state.cluster.query {
Stop(client), _ -> {
option.map(state.poll_timer, process.cancel_timer)
// let _ = process.unregister(state.cluster.name)
process.send(client, Nil)
state.cluster.logger("warn", "DNS cluster stopped.")
actor.stop()
}
HasRan(client), _ -> {
process.send(client, state.has_ran)
actor.continue(state)
}
DiscoverNodes(maybe_client, manual), DnsQuery(query) -> {
let cluster = state.cluster

let errors =
do_discover_nodes(
cluster.resolver,
cluster.logger,
state.basename,
query,
)

let state = case cluster.interval_millis, maybe_client, manual {
// If there is an available client, send it a response.
_, Some(client), _ -> {
let connected_nodes = cluster.resolver.list_nodes()
actor.send(client, #(connected_nodes, errors))
state
}
// If no client and manual call, skip timer reset
_, _, True -> state
// If no interval is set, skip timer reset
None, _, _ -> state
// Finally we are confident this is not a manual invocation AND we have an interval
Some(interval_millis), _, _ ->
DnsClusterState(
..state,
poll_timer: Some(process.send_after(
state.self,
interval_millis,
DiscoverNodes(None, False),
)),
)
case maybe_client {
Some(client) -> {
let nodes = state.cluster.resolver.list_nodes()
process.send(client, #(nodes, []))
}
None -> Nil
}
actor.Continue(state: state, selector: None)
}

let state = DnsClusterState(..state, has_ran: True)
actor.continue(state)
}

DiscoverNodes(maybe_client, _), Ignore -> {
state.cluster.logger(
"warn",
"DNS cluster is set to ignore, will not discover or connect to nodes.",
)
case maybe_client {
Some(client) -> {
let nodes = state.cluster.resolver.list_nodes()
process.send(client, #(nodes, []))
}
None -> Nil
}
},
)
actor.continue(state)
}
}
}

/// Returns the default resolver which will query for A and AAAA
Expand Down Expand Up @@ -386,15 +408,15 @@ fn do_discover_nodes(
query: String,
) -> List(NodeConnectError) {
let node_names =
list.map(resolver.list_nodes(), fn(n) { atom.to_string(node.to_atom(n)) })
list.map(resolver.list_nodes(), fn(n) { atom.to_string(node.name(n)) })
let peer_ips = resolver.lookup(query)

let #(_, errors) =
peer_ips
|> list.map(fn(ip) { basename <> "@" <> ip })
|> list.filter(fn(node_name) { !list.contains(node_names, node_name) })
|> list.map(fn(node_name) {
let atom_node_name = atom.create_from_string(node_name)
let atom_node_name = atom.create(node_name)

case resolver.connect_node(atom_node_name) {
Ok(_) -> {
Expand Down
Loading