-
-
Notifications
You must be signed in to change notification settings - Fork 298
/
Copy pathsample-connect.ts
69 lines (64 loc) · 2.7 KB
/
sample-connect.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import * as PodsRpc from "@effect/cluster-node/PodsRpc"
import type * as ShardingServiceRpc from "@effect/cluster-node/ShardingServiceRpc"
import * as ShardManagerClientRpc from "@effect/cluster-node/ShardManagerClientRpc"
import * as StorageFile from "@effect/cluster-node/StorageFile"
import * as Serialization from "@effect/cluster/Serialization"
import * as Sharding from "@effect/cluster/Sharding"
import * as ShardingConfig from "@effect/cluster/ShardingConfig"
import { HttpClient, HttpClientRequest } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"
import { RpcResolver } from "@effect/rpc"
import { HttpRpcResolver } from "@effect/rpc-http"
import { Effect, Layer, Logger, LogLevel, Ref } from "effect"
import { CounterEntity, GetCurrent, Increment } from "./sample-common.js"
const liveLayer = Effect.gen(function*() {
const messenger = yield* Sharding.messenger(CounterEntity)
const idRef = yield* Ref.make(0)
while (true) {
const id = yield* Ref.getAndUpdate(idRef, (_) => _ + 1)
const entityId = `entity-${id % 10}`
yield* messenger.sendDiscard(entityId)(new Increment({ messageId: `increment-${id}` }))
const result = yield* messenger.send(entityId)(new GetCurrent({ messageId: `get-count-${id}` }))
yield* Effect.logInfo(`Counter ${entityId} is now: ${result}`)
yield* Effect.sleep(200)
}
}).pipe(
Layer.effectDiscard,
Layer.provide(Sharding.live),
Layer.provide(StorageFile.storageFile),
Layer.provide(Layer.unwrapEffect(Effect.gen(function*() {
const client = yield* HttpClient.HttpClient
return PodsRpc.podsRpc<never>((podAddress) =>
HttpRpcResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
client.pipe(
HttpClient.filterStatusOk,
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(RpcResolver.toClient)
)
}))),
Layer.provide(Layer.unwrapEffect(Effect.gen(function*() {
const client = yield* HttpClient.HttpClient
return ShardManagerClientRpc.shardManagerClientRpc(
(shardManagerUri) =>
HttpRpcResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
client.pipe(
HttpClient.filterStatusOk,
HttpClient.mapRequest(
HttpClientRequest.prependUrl(shardManagerUri)
)
)
).pipe(RpcResolver.toClient)
)
}))),
Layer.provide(ShardingConfig.withDefaults({ shardingPort: 54322 })),
Layer.provide(Serialization.json),
Layer.provide(NodeHttpClient.layerUndici)
)
Layer.launch(liveLayer).pipe(
Logger.withMinimumLogLevel(LogLevel.All),
Effect.tapErrorCause(Effect.logError),
NodeRuntime.runMain
)