Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Oct 31, 2024
1 parent 95e936d commit 4712ccd
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions packages/sql-clickhouse/src/ClickhouseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ export interface ClickhouseClient extends Client.SqlClient {
}
readonly withClickhouseSettings: {
(
settings: Clickhouse.BaseQueryParams["clickhouse_settings"]
settings: NonNullable<Clickhouse.BaseQueryParams["clickhouse_settings"]>
): <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
<A, E, R>(
effect: Effect.Effect<A, E, R>,
settings: Clickhouse.BaseQueryParams["clickhouse_settings"]
settings: NonNullable<Clickhouse.BaseQueryParams["clickhouse_settings"]>
): Effect.Effect<A, E, R>
}
}
Expand Down Expand Up @@ -243,34 +243,36 @@ export const make = (
readonly values: Clickhouse.InsertValues<Readable, T>
readonly format?: Clickhouse.DataFormat
}) {
return FiberRef.getWith(currentClickhouseSettings, (settings_) =>
FiberRef.getWith(currentQueryId, (queryId_) =>
Effect.async<Clickhouse.InsertResult, SqlError>((resume) => {
const queryId = queryId_ ?? Crypto.randomUUID()
const settings = settings_ ?? {}
const controller = new AbortController()
client.insert({
format: "JSONEachRow",
...options,
abort_signal: controller.signal,
query_id: queryId,
clickhouse_settings: settings
}).then(
(result) =>
resume(Effect.succeed(result)),
(cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to insert data" })))
)
return Effect.suspend(() => {
controller.abort()
return Effect.promise(() => client.command({ query: `KILL QUERY WHERE query_id = '${queryId}'` }))
})
})))
return Effect.withFiberRuntime<Clickhouse.InsertResult, SqlError>((fiber) =>
Effect.async((resume) => {
const queryId = fiber.getFiberRef(currentQueryId) ?? Crypto.randomUUID()
const settings = fiber.getFiberRef(currentClickhouseSettings)
const controller = new AbortController()
client.insert({
format: "JSONEachRow",
...options,
abort_signal: controller.signal,
query_id: queryId,
clickhouse_settings: settings
}).then(
(result) => resume(Effect.succeed(result)),
(cause) => resume(Effect.fail(new SqlError({ cause, message: "Failed to insert data" })))
)
return Effect.suspend(() => {
controller.abort()
return Effect.promise(() => client.command({ query: `KILL QUERY WHERE query_id = '${queryId}'` }))
})
})
)
},
withQueryId: dual(2, <A, E, R>(effect: Effect.Effect<A, E, R>, queryId: string) =>
Effect.locally(effect, currentQueryId, queryId)),
withClickhouseSettings: dual(
2,
<A, E, R>(effect: Effect.Effect<A, E, R>, settings: Clickhouse.BaseQueryParams["clickhouse_settings"]) =>
<A, E, R>(
effect: Effect.Effect<A, E, R>,
settings: NonNullable<Clickhouse.BaseQueryParams["clickhouse_settings"]>
) =>
Effect.locally(effect, currentClickhouseSettings, settings)
)
}
Expand Down Expand Up @@ -300,10 +302,10 @@ export const currentQueryId: FiberRef.FiberRef<string | undefined> = globalValue
* @since 1.0.0
*/
export const currentClickhouseSettings: FiberRef.FiberRef<
Clickhouse.BaseQueryParams["clickhouse_settings"] | undefined
NonNullable<Clickhouse.BaseQueryParams["clickhouse_settings"]>
> = globalValue(
"@effect/sql-clickhouse/ClickhouseClient/currentClickhouseSettings",
() => FiberRef.unsafeMake<Clickhouse.BaseQueryParams["clickhouse_settings"] | undefined>(undefined)
() => FiberRef.unsafeMake({})
)

/**
Expand Down

0 comments on commit 4712ccd

Please sign in to comment.