diff --git a/apis/pubsub/v1alpha1/subscription_types.go b/apis/pubsub/v1alpha1/subscription_types.go index 9172a9081..77e994c8a 100644 --- a/apis/pubsub/v1alpha1/subscription_types.go +++ b/apis/pubsub/v1alpha1/subscription_types.go @@ -87,6 +87,10 @@ type SubscriptionParameters struct { // +optional PushConfig *PushConfig `json:"pushConfig,omitempty"` + // BigqueryConfig is a parameter which configures bigquery delivery. + // +optional + BigqueryConfig *BigqueryConfig `json:"bigqueryConfig,omitempty"` + // RetainAckedMessages is a message which indicates whether to retain acknowledged // messages. If true, then messages are not expunged from the // subscription's backlog, even if they are acknowledged, until they @@ -143,6 +147,29 @@ type PushConfig struct { PushEndpoint string `json:"pushEndpoint,omitempty"` } +// BigqueryConfig contains configuration for a bigquery delivery endpoint. +type BigqueryConfig struct { + // Bigquery table to deliver messages to. + Table string `json:"table,omitempty"` + + // When enabled, the topic schema will be used when writing to BigQuery. Else, + // tes the message bytes to a column called data in BigQuery. + // +optional + UseTopicSchema *bool `json:"useTopicSchema,omitempty"` + + // When enabled, the metadata of each message is written to additional columns in + // the BigQuery table. Else, the metadata is not written to the BigQuery table. + // https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata + // +optional + WriteMetadata *bool `json:"writeMetadata,omitempty"` + + // When enabled along with the "Use topic schema" option, any field that is present in + // the topic schema but not in the BigQuery schema will be dropped. Else, messages with extra fields are not written + // and remain in the subscription backlog. + // +optional + DropUnknownFields *bool `json:"dropUnknownFields,omitempty"` +} + // OidcToken contains information needed for generating an OpenID Connect token type OidcToken struct { // Audience is the "audience" to be used when generating OIDC token. diff --git a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go index 142f26bd1..e9983ea74 100644 --- a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go +++ b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go @@ -26,6 +26,36 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BigqueryConfig) DeepCopyInto(out *BigqueryConfig) { + *out = *in + if in.UseTopicSchema != nil { + in, out := &in.UseTopicSchema, &out.UseTopicSchema + *out = new(bool) + **out = **in + } + if in.WriteMetadata != nil { + in, out := &in.WriteMetadata, &out.WriteMetadata + *out = new(bool) + **out = **in + } + if in.DropUnknownFields != nil { + in, out := &in.DropUnknownFields, &out.DropUnknownFields + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BigqueryConfig. +func (in *BigqueryConfig) DeepCopy() *BigqueryConfig { + if in == nil { + return nil + } + out := new(BigqueryConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeadLetterPolicy) DeepCopyInto(out *DeadLetterPolicy) { *out = *in @@ -217,6 +247,11 @@ func (in *SubscriptionParameters) DeepCopyInto(out *SubscriptionParameters) { *out = new(PushConfig) (*in).DeepCopyInto(*out) } + if in.BigqueryConfig != nil { + in, out := &in.BigqueryConfig, &out.BigqueryConfig + *out = new(BigqueryConfig) + (*in).DeepCopyInto(*out) + } if in.RetryPolicy != nil { in, out := &in.RetryPolicy, &out.RetryPolicy *out = new(RetryPolicy) diff --git a/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml b/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml index e1f028a61..2be89b8bc 100644 --- a/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml +++ b/package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml @@ -65,6 +65,31 @@ spec: value of 10 seconds is used. format: int64 type: integer + bigqueryConfig: + description: BigqueryConfig is a parameter which configures bigquery + delivery. + properties: + dropUnknownFields: + description: When enabled along with the "Use topic schema" + option, any field that is present in the topic schema but + not in the BigQuery schema will be dropped. Else, messages + with extra fields are not written and remain in the subscription + backlog. + type: boolean + table: + description: Bigquery table to deliver messages to. + type: string + useTopicSchema: + description: When enabled, the topic schema will be used when + writing to BigQuery. Else, tes the message bytes to a column + called data in BigQuery. + type: boolean + writeMetadata: + description: When enabled, the metadata of each message is + written to additional columns in the BigQuery table. Else, + the metadata is not written to the BigQuery table. https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata + type: boolean + type: object deadLetterPolicy: description: DeadLetterPolicy is the policy that specifies the conditions for dead lettering messages in this subscription. diff --git a/pkg/clients/subscription/subscription.go b/pkg/clients/subscription/subscription.go index 8a546e043..aeca00757 100644 --- a/pkg/clients/subscription/subscription.go +++ b/pkg/clients/subscription/subscription.go @@ -21,6 +21,7 @@ import ( "strings" "github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1" + gcp "github.com/crossplane-contrib/provider-gcp/pkg/clients" "github.com/crossplane-contrib/provider-gcp/pkg/clients/topic" "github.com/google/go-cmp/cmp" @@ -53,6 +54,7 @@ func GenerateSubscription(projectID, name string, p v1alpha1.SubscriptionParamet setDeadLetterPolicy(projectID, p, s) setExpirationPolicy(p, s) setPushConfig(p, s) + setBigqueryConfig(p, s) setRetryPolicy(p, s) return s @@ -85,6 +87,18 @@ func setPushConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { } } +// setBigqueryConfig sets BigqueryConfig of subscription based on SubscriptionParameters. +func setBigqueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { + if p.BigqueryConfig != nil { + s.BigqueryConfig = &pubsub.BigQueryConfig{ + Table: p.BigqueryConfig.Table, + UseTopicSchema: gcp.BoolValue(p.BigqueryConfig.UseTopicSchema), + WriteMetadata: gcp.BoolValue(p.BigqueryConfig.WriteMetadata), + DropUnknownFields: gcp.BoolValue(p.BigqueryConfig.DropUnknownFields), + } + } +} + // setExpirationPolicy sets ExpirationPolicy of subscription based on SubscriptionParameters. func setExpirationPolicy(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) { if p.ExpirationPolicy != nil { @@ -169,6 +183,15 @@ func LateInitialize(p *v1alpha1.SubscriptionParameters, s pubsub.Subscription) { } } + if p.BigqueryConfig == nil && s.BigqueryConfig != nil { + p.BigqueryConfig = &v1alpha1.BigqueryConfig{ + Table: s.BigqueryConfig.Table, + DropUnknownFields: gcp.BoolPtr(s.BigqueryConfig.DropUnknownFields), + UseTopicSchema: gcp.BoolPtr(s.BigqueryConfig.UseTopicSchema), + WriteMetadata: gcp.BoolPtr(s.BigqueryConfig.WriteMetadata), + } + } + if p.RetryPolicy == nil && s.RetryPolicy != nil { p.RetryPolicy = &v1alpha1.RetryPolicy{ MaximumBackoff: s.RetryPolicy.MaximumBackoff, @@ -195,7 +218,7 @@ func IsUpToDate(projectID string, p v1alpha1.SubscriptionParameters, s pubsub.Su // GenerateUpdateRequest produces an UpdateSubscriptionRequest with the difference // between SubscriptionParameters and Subscription. // enableMessageOrdering, deadLetterPolicy, topic are not mutable -func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest { +func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest { // nolint:gocyclo observed := &v1alpha1.SubscriptionParameters{} LateInitialize(observed, s) @@ -245,6 +268,11 @@ func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pub setPushConfig(p, us.Subscription) } + if !cmp.Equal(p.BigqueryConfig, observed.BigqueryConfig) { + mask = append(mask, "bigqueryConfig") + setBigqueryConfig(p, us.Subscription) + } + if !cmp.Equal(p.RetryPolicy, observed.RetryPolicy) { mask = append(mask, "retryPolicy") setRetryPolicy(p, us.Subscription) diff --git a/pkg/clients/subscription/subscription_test.go b/pkg/clients/subscription/subscription_test.go index 8c9c9b74c..50600e989 100644 --- a/pkg/clients/subscription/subscription_test.go +++ b/pkg/clients/subscription/subscription_test.go @@ -23,6 +23,7 @@ import ( pubsub "google.golang.org/api/pubsub/v1" "github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1" + gcp "github.com/crossplane-contrib/provider-gcp/pkg/clients" ) const ( @@ -53,6 +54,12 @@ func params() *v1alpha1.SubscriptionParameters { }, PushEndpoint: "example.com", }, + BigqueryConfig: &v1alpha1.BigqueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: gcp.BoolPtr(true), + WriteMetadata: gcp.BoolPtr(true), + DropUnknownFields: gcp.BoolPtr(true), + }, RetryPolicy: &v1alpha1.RetryPolicy{ MaximumBackoff: "100s", MinimumBackoff: "15s", @@ -86,6 +93,12 @@ func subscription() *pubsub.Subscription { }, PushEndpoint: "example.com", }, + BigqueryConfig: &pubsub.BigQueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: true, + WriteMetadata: true, + DropUnknownFields: true, + }, RetryPolicy: &pubsub.RetryPolicy{ MaximumBackoff: "100s", MinimumBackoff: "15s", @@ -167,6 +180,98 @@ func TestLateInitialize(t *testing.T) { }, out: params(), }, + "Minimal": { + args: args{ + obs: pubsub.Subscription{ + Name: name, + Topic: topicNameExternal, + }, + param: &v1alpha1.SubscriptionParameters{ + AckDeadlineSeconds: 15, + DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ + DeadLetterTopic: topicName, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, + Topic: topicName, + }, + }, + out: &v1alpha1.SubscriptionParameters{ + AckDeadlineSeconds: 15, + DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{ + DeadLetterTopic: topicName, + MaxDeliveryAttempts: 5, + }, + Detached: true, + EnableMessageOrdering: true, + ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"}, + Topic: topicName, + BigqueryConfig: nil, + PushConfig: nil, + }, + }, + "PushConfig": { + args: args{ + obs: pubsub.Subscription{ + Name: name, + Topic: topicNameExternal, + PushConfig: &pubsub.PushConfig{ + PushEndpoint: "example.com", + }, + }, + param: &v1alpha1.SubscriptionParameters{ + PushConfig: &v1alpha1.PushConfig{ + Attributes: map[string]string{"attribute": "my-attribute"}, + OidcToken: &v1alpha1.OidcToken{ + Audience: "my-audience", + }, + PushEndpoint: "example.com", + }, + }, + }, + out: &v1alpha1.SubscriptionParameters{ + PushConfig: &v1alpha1.PushConfig{ + Attributes: map[string]string{"attribute": "my-attribute"}, + OidcToken: &v1alpha1.OidcToken{ + Audience: "my-audience", + ServiceAccountEmail: "", + }, + PushEndpoint: "example.com", + }, + Topic: topicNameExternal, + BigqueryConfig: nil, + }, + }, + "BigqueryConfig": { + args: args{ + obs: pubsub.Subscription{ + Name: name, + Topic: topicNameExternal, + BigqueryConfig: &pubsub.BigQueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + DropUnknownFields: true, + }, + }, + param: &v1alpha1.SubscriptionParameters{ + BigqueryConfig: &v1alpha1.BigqueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: gcp.BoolPtr(true), + }, + }, + }, + out: &v1alpha1.SubscriptionParameters{ + BigqueryConfig: &v1alpha1.BigqueryConfig{ + Table: "projects/my-project/subscriptions/my-bigquery-subscription", + UseTopicSchema: gcp.BoolPtr(true), + WriteMetadata: nil, + DropUnknownFields: nil, + }, + Topic: topicNameExternal, + PushConfig: nil, + }, + }, } for name, tc := range cases { @@ -243,7 +348,7 @@ func TestGenerateUpdateRequest(t *testing.T) { }, result: &pubsub.UpdateSubscriptionRequest{ Subscription: mutableSubscription, - UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,pushConfig,retryPolicy", + UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,pushConfig,bigqueryConfig,retryPolicy", }, }, }