Conversation
Signed-off-by: teodordelibasic-db <[email protected]>
0695a36 to
74b5211
Compare
Signed-off-by: teodordelibasic-db <[email protected]>
| pub(crate) workspace_id: String, | ||
| pub(crate) tls_config: Arc<dyn TlsConfig>, |
There was a problem hiding this comment.
nit: I saw claude put this pub(crate) before in my own code, I don't really know whats the purpose of it, but I think it's not necessary
There was a problem hiding this comment.
It's necessary. pub(crate) means "visible within this crate but not to external consumers." These fields were previously private (only accessible within lib.rs), but StreamBuilder lives in a different module (builder/stream_builder.rs) and needs to read self.sdk.workspace_id, self.sdk.tls_config, etc. to construct OAuthHeadersProvider and ZerobusArrowStream. Without pub(crate), the builder can't access them. The same applies to get_or_create_channel_zerobus_client() and ZerobusStream::new_stream().
| pub fn stream_builder( | ||
| &self, | ||
| table_name: impl Into<String>, | ||
| ) -> StreamBuilder<'_, NoFormat, NoAuth> { | ||
| StreamBuilder::new(self, table_name) | ||
| } |
There was a problem hiding this comment.
Is there any specific reason why the table name is a method parameter and not another builder method .table() ?
There was a problem hiding this comment.
Eh not really except that I don't expect a stream to not need a table name in the future, but it probably may be better to have it as a method as well.
There was a problem hiding this comment.
yeah its a central part of ingestion. But still I would say its worth verifying the contract with Vicky
danilonajkov-db
left a comment
There was a problem hiding this comment.
Should we mark the old methods as depricated?
| } | ||
|
|
||
| /// Select compiled protobuf record format. | ||
| pub fn compiled_proto( |
There was a problem hiding this comment.
maybe just .proto() (just personal preference, sounds more straightforward to me)
| @@ -0,0 +1,665 @@ | |||
| //! Typestate builder for creating Zerobus ingestion streams. | |||
There was a problem hiding this comment.
Some validations from LLM that would be good to add:
- No runtime assertion that descriptor_proto.is_some() when building
CompiledProto. If a future change lets descriptor_proto be cleared independently
of the typestate, this silently constructs a broken stream. Today it's safe
because the only writers to descriptor_proto are .json() (sets None) and
.compiled_proto(desc) (sets Some(desc)), and each pins the typestate accordingly — but that invariant is a code convention, not a compiler guarantee. Compare to
the Arrow path, which does .expect("Arrow format guarantees schema is set")
(stream_builder.rs:919) for the analogous invariant.
- No check that self.auth.is_some() before resolve_headers_provider — relies on
the HasAuth typestate. The .expect("HasAuth guarantees auth is set") at
stream_builder.rs:852 is the runtime fallback if the type system is ever
bypassed.
Also might be worth adding a method is_valid_config inside ZerobusStream::new_stream() that checks all invariants. To make sure the customer can't weirdly configure something while this conversion from old -> new stream creation API is happening.
What changes are proposed in this pull request?
This PR adds a typestate
StreamBuilderAPI to the Rust SDK that replaces the existingcreate_stream*family of methods with a fluent, compile-time-safe builder.The builder enforces a strict configuration order: auth -> format -> config -> build. Invalid combinations (building without auth, building without format, calling gRPC-only setters on an Arrow stream) are compile errors, not runtime errors.
Motivation:
create_stream(TableProperties, client_id, client_secret, Option<StreamConfigurationOptions>)API is a wide parameter list where format is just a field insideStreamConfigurationOptions. Adding or removing fields is a breaking change.Changes:
New files:
rust/sdk/src/builder/stream_builder.rs— theStreamBuilder<F, A>typestate builder with format/auth marker generics, all setter methods, three format-specificbuild()impls, and 12 unit testsrust/sdk/src/builder/stream_format.rs— marker types (NoFormat,Json,CompiledProto,Arrow,NoAuth,HasAuth), sealed traits (StreamFormat,GrpcFormat), and trait implsModified:
rust/sdk/src/lib.rs— addedstream_builder()entry point onZerobusSdk, deprecated all fourcreate_stream*/create_arrow_stream*methods withsince = "1.1.0", changedworkspace_id/tls_config/get_or_create_channel_zerobus_client/ZerobusStream::new_streamtopub(crate)so the builder can access SDK internals, refactoredrecreate_streamandrecreate_arrow_streamto call internal constructors directly instead of routing through deprecated methods, fixed crate-level quick-start doc examplerust/sdk/src/builder/mod.rs— added module declarations and re-exports for stream builder and format typesrust/sdk/src/headers_provider.rs— addedNoOpHeadersProviderfor local dev / testing / sidecar proxy scenarios (used bystream_builder(...).no_auth())rust/examples/json/single/src/main.rs— migrated to builder APIrust/examples/json/batch/src/main.rs— migrated to builder APIrust/examples/proto/single/src/main.rs— migrated to builder APIrust/examples/proto/batch/src/main.rs— migrated to builder APIrust/CHANGELOG.md— added v1.1.0 release notes with new features, deprecations, and API changesrust/README.md— rewrote stream creation section, custom auth example, callbacks example, recovery example, API reference, and architecture diagram to use the new builder APIDesign decisions:
build()forcesrecord_typeto match the typestate regardless of what.options()set, as a safety belt..options(StreamConfigurationOptions)is kept as a migration bridge for users transitioning from the old API. It replaces all prior setter values and is documented as such.StreamFormat/GrpcFormatprevent external implementations, preserving forward compatibility.#[must_use]onStreamBuilderwarns if a builder is created but never built.How is this tested?
12 unit tests in
stream_builder.rs:json_oauth_builder_compiles— verifies JSON + OAuth builder chain compilescompiled_proto_headers_provider_compiles— verifies Proto + custom headers provider chain compilesconfig_setters_chain— verifies all shared and gRPC-specific setters chain correctlyoptions_replaces_config— verifies.options()replaces prior setter valuessetter_after_options_mutates_replacement— verifies setters after.options()modify the replacement configdefault_config_without_setters— verifies sensible defaults when no setters are calledoptions_cannot_override_record_type— verifies.options()with wrongrecord_typeis overridden at build timeno_auth_shortcut_compiles— verifiesno_auth()convenience compilesdebug_impl_works— verifies Debug output includes builder statenoop_headers_provider_returns_empty— verifiesNoOpHeadersProviderreturns empty headersresolve_headers_provider_with_custom_provider— verifies custom provider resolution returns expected headersresolve_headers_provider_with_oauth— verifies OAuth provider resolution constructs without panicAdditionally:
cargo test -p databricks-zerobus-ingest-sdkpasses (81 tests, including all pre-existing tests)cargo check.json()before auth, calling.build()onNoFormat) fail to compile with missing-method errorsFollow-up items
These are scoped out of this PR and tracked for subsequent work.
Introduce a shared internal stream-open abstraction (e.g.,
StreamOpenRequestorResolvedStreamConfig) that all stream creation paths target: the builder, the deprecated methods, the recreate methods, and the FFI/JNI/PyO3/NAPI wrapper bridges. This is the critical architectural step before removing deprecated methods in 2.0, because without it wrapper SDKs will need a second migration.Add integration tests for builder-based stream creation in
rust/tests/srcagainst the mock gRPC server. The current integration suite exercises stream lifecycle through the old constructors but does not yet cover thestream_builder()path.Add compile-fail tests (via
trybuild) to verify that invalid typestate combinations produce compiler errors. Currently correctness is enforced by the type system but not explicitly tested.Typed stream returns:
build()currently returnsZerobusStreamfor both JSON and Proto, so sending the wrong payload format is still a runtime error. A future version could returnZerobusJsonStream/ZerobusProtoStream(similar to what Java already has) to make format mismatches a compile error. Candidate for 2.0.Extract shared config fields (
recovery,recovery_timeout_ms,recovery_backoff_ms,recovery_retries,server_lack_of_ack_timeout_ms,flush_timeout_ms) into aSharedStreamConfigstruct that bothStreamConfigurationOptionsandArrowStreamConfigurationOptionscompose. Currently each shared setter dual-writes to both config structs.Add idiomatic stream builder APIs to wrapper SDKs (Python, Java, Go, TypeScript), each targeting the shared internal abstraction from item 1. These should not attempt to mirror the Rust typestate — each language should use its own idiomatic pattern (kwargs in Python, traditional builder in Java, functional options in Go, options object in TypeScript).
Remove
.options()escape hatch and deprecatedcreate_stream*methods in 2.0, after wrapper SDKs are migrated.