Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bigquery subscription type support #476

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions apis/pubsub/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type SubscriptionParameters struct {
// +optional
PushConfig *PushConfig `json:"pushConfig,omitempty"`

// BigqueryConfig is a parameter which configures bigquery delivery.
// +optional
BigqueryConfig *BigqueryConfig `json:"bigqueryConfig,omitempty"`
Feggah marked this conversation as resolved.
Show resolved Hide resolved

// RetainAckedMessages is a message which indicates whether to retain acknowledged
// messages. If true, then messages are not expunged from the
// subscription's backlog, even if they are acknowledged, until they
Expand Down Expand Up @@ -143,6 +147,29 @@ type PushConfig struct {
PushEndpoint string `json:"pushEndpoint,omitempty"`
}

// BigqueryConfig contains configuration for a bigquery delivery endpoint.
type BigqueryConfig struct {
// Bigquery table to deliver messages to.
Table string `json:"table,omitempty"`
Comment on lines +152 to +153
Copy link
Collaborator

@Feggah Feggah Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the Table field required (when bigqueryConfig is specified) you need to remove the omitempty JSON tag.

Suggested change
// Bigquery table to deliver messages to.
Table string `json:"table,omitempty"`
// Bigquery table to deliver messages to.
// Must be in the format {projectId}.{datasetId}.{tableId}
Table string `json:"table"`


// When enabled, the topic schema will be used when writing to BigQuery. Else,
// tes the message bytes to a column called data in BigQuery.
// +optional
UseTopicSchema *bool `json:"useTopicSchema,omitempty"`

// When enabled, the metadata of each message is written to additional columns in
// the BigQuery table. Else, the metadata is not written to the BigQuery table.
// https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata
// +optional
WriteMetadata *bool `json:"writeMetadata,omitempty"`

// When enabled along with the "Use topic schema" option, any field that is present in
// the topic schema but not in the BigQuery schema will be dropped. Else, messages with extra fields are not written
// and remain in the subscription backlog.
// +optional
DropUnknownFields *bool `json:"dropUnknownFields,omitempty"`
}

// OidcToken contains information needed for generating an OpenID Connect token
type OidcToken struct {
// Audience is the "audience" to be used when generating OIDC token.
Expand Down
35 changes: 35 additions & 0 deletions apis/pubsub/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions package/crds/pubsub.gcp.crossplane.io_subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,31 @@ spec:
value of 10 seconds is used.
format: int64
type: integer
bigqueryConfig:
description: BigqueryConfig is a parameter which configures bigquery
delivery.
properties:
dropUnknownFields:
description: When enabled along with the "Use topic schema"
option, any field that is present in the topic schema but
not in the BigQuery schema will be dropped. Else, messages
with extra fields are not written and remain in the subscription
backlog.
type: boolean
table:
description: Bigquery table to deliver messages to.
type: string
useTopicSchema:
description: When enabled, the topic schema will be used when
writing to BigQuery. Else, tes the message bytes to a column
called data in BigQuery.
type: boolean
writeMetadata:
description: When enabled, the metadata of each message is
written to additional columns in the BigQuery table. Else,
the metadata is not written to the BigQuery table. https://cloud.google.com/pubsub/docs/bigquery?hl=ru#write-metadata
type: boolean
type: object
deadLetterPolicy:
description: DeadLetterPolicy is the policy that specifies the
conditions for dead lettering messages in this subscription.
Expand Down
30 changes: 29 additions & 1 deletion pkg/clients/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1"
gcp "github.com/crossplane-contrib/provider-gcp/pkg/clients"
"github.com/crossplane-contrib/provider-gcp/pkg/clients/topic"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -53,6 +54,7 @@ func GenerateSubscription(projectID, name string, p v1alpha1.SubscriptionParamet
setDeadLetterPolicy(projectID, p, s)
setExpirationPolicy(p, s)
setPushConfig(p, s)
setBigqueryConfig(p, s)
setRetryPolicy(p, s)

return s
Expand Down Expand Up @@ -85,6 +87,18 @@ func setPushConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) {
}
}

// setBigqueryConfig sets BigqueryConfig of subscription based on SubscriptionParameters.
func setBigqueryConfig(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) {
if p.BigqueryConfig != nil {
s.BigqueryConfig = &pubsub.BigQueryConfig{
Table: p.BigqueryConfig.Table,
UseTopicSchema: gcp.BoolValue(p.BigqueryConfig.UseTopicSchema),
WriteMetadata: gcp.BoolValue(p.BigqueryConfig.WriteMetadata),
DropUnknownFields: gcp.BoolValue(p.BigqueryConfig.DropUnknownFields),
}
}
}

// setExpirationPolicy sets ExpirationPolicy of subscription based on SubscriptionParameters.
func setExpirationPolicy(p v1alpha1.SubscriptionParameters, s *pubsub.Subscription) {
if p.ExpirationPolicy != nil {
Expand Down Expand Up @@ -169,6 +183,15 @@ func LateInitialize(p *v1alpha1.SubscriptionParameters, s pubsub.Subscription) {
}
}

if p.BigqueryConfig == nil && s.BigqueryConfig != nil {
p.BigqueryConfig = &v1alpha1.BigqueryConfig{
Table: s.BigqueryConfig.Table,
DropUnknownFields: gcp.BoolPtr(s.BigqueryConfig.DropUnknownFields),
UseTopicSchema: gcp.BoolPtr(s.BigqueryConfig.UseTopicSchema),
WriteMetadata: gcp.BoolPtr(s.BigqueryConfig.WriteMetadata),
}
}

if p.RetryPolicy == nil && s.RetryPolicy != nil {
p.RetryPolicy = &v1alpha1.RetryPolicy{
MaximumBackoff: s.RetryPolicy.MaximumBackoff,
Expand All @@ -195,7 +218,7 @@ func IsUpToDate(projectID string, p v1alpha1.SubscriptionParameters, s pubsub.Su
// GenerateUpdateRequest produces an UpdateSubscriptionRequest with the difference
// between SubscriptionParameters and Subscription.
// enableMessageOrdering, deadLetterPolicy, topic are not mutable
func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest {
func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pubsub.Subscription) *pubsub.UpdateSubscriptionRequest { // nolint:gocyclo
observed := &v1alpha1.SubscriptionParameters{}
LateInitialize(observed, s)

Expand Down Expand Up @@ -245,6 +268,11 @@ func GenerateUpdateRequest(name string, p v1alpha1.SubscriptionParameters, s pub
setPushConfig(p, us.Subscription)
}

if !cmp.Equal(p.BigqueryConfig, observed.BigqueryConfig) {
mask = append(mask, "bigqueryConfig")
setBigqueryConfig(p, us.Subscription)
}

if !cmp.Equal(p.RetryPolicy, observed.RetryPolicy) {
mask = append(mask, "retryPolicy")
setRetryPolicy(p, us.Subscription)
Expand Down
107 changes: 106 additions & 1 deletion pkg/clients/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
pubsub "google.golang.org/api/pubsub/v1"

"github.com/crossplane-contrib/provider-gcp/apis/pubsub/v1alpha1"
gcp "github.com/crossplane-contrib/provider-gcp/pkg/clients"
)

const (
Expand Down Expand Up @@ -53,6 +54,12 @@ func params() *v1alpha1.SubscriptionParameters {
},
PushEndpoint: "example.com",
},
BigqueryConfig: &v1alpha1.BigqueryConfig{
Table: "projects/my-project/subscriptions/my-bigquery-subscription",
UseTopicSchema: gcp.BoolPtr(true),
WriteMetadata: gcp.BoolPtr(true),
DropUnknownFields: gcp.BoolPtr(true),
},
RetryPolicy: &v1alpha1.RetryPolicy{
MaximumBackoff: "100s",
MinimumBackoff: "15s",
Expand Down Expand Up @@ -86,6 +93,12 @@ func subscription() *pubsub.Subscription {
},
PushEndpoint: "example.com",
},
BigqueryConfig: &pubsub.BigQueryConfig{
Table: "projects/my-project/subscriptions/my-bigquery-subscription",
UseTopicSchema: true,
WriteMetadata: true,
DropUnknownFields: true,
},
RetryPolicy: &pubsub.RetryPolicy{
MaximumBackoff: "100s",
MinimumBackoff: "15s",
Expand Down Expand Up @@ -167,6 +180,98 @@ func TestLateInitialize(t *testing.T) {
},
out: params(),
},
"Minimal": {
args: args{
obs: pubsub.Subscription{
Name: name,
Topic: topicNameExternal,
},
param: &v1alpha1.SubscriptionParameters{
AckDeadlineSeconds: 15,
DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{
DeadLetterTopic: topicName,
MaxDeliveryAttempts: 5,
},
Detached: true,
EnableMessageOrdering: true,
ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"},
Topic: topicName,
},
},
out: &v1alpha1.SubscriptionParameters{
AckDeadlineSeconds: 15,
DeadLetterPolicy: &v1alpha1.DeadLetterPolicy{
DeadLetterTopic: topicName,
MaxDeliveryAttempts: 5,
},
Detached: true,
EnableMessageOrdering: true,
ExpirationPolicy: &v1alpha1.ExpirationPolicy{TTL: "1296000s"},
Topic: topicName,
BigqueryConfig: nil,
PushConfig: nil,
},
},
"PushConfig": {
args: args{
obs: pubsub.Subscription{
Name: name,
Topic: topicNameExternal,
PushConfig: &pubsub.PushConfig{
PushEndpoint: "example.com",
},
},
param: &v1alpha1.SubscriptionParameters{
PushConfig: &v1alpha1.PushConfig{
Attributes: map[string]string{"attribute": "my-attribute"},
OidcToken: &v1alpha1.OidcToken{
Audience: "my-audience",
},
PushEndpoint: "example.com",
},
},
},
out: &v1alpha1.SubscriptionParameters{
PushConfig: &v1alpha1.PushConfig{
Attributes: map[string]string{"attribute": "my-attribute"},
OidcToken: &v1alpha1.OidcToken{
Audience: "my-audience",
ServiceAccountEmail: "",
},
PushEndpoint: "example.com",
},
Topic: topicNameExternal,
BigqueryConfig: nil,
},
},
"BigqueryConfig": {
args: args{
obs: pubsub.Subscription{
Name: name,
Topic: topicNameExternal,
BigqueryConfig: &pubsub.BigQueryConfig{
Table: "projects/my-project/subscriptions/my-bigquery-subscription",
DropUnknownFields: true,
},
},
param: &v1alpha1.SubscriptionParameters{
BigqueryConfig: &v1alpha1.BigqueryConfig{
Table: "projects/my-project/subscriptions/my-bigquery-subscription",
UseTopicSchema: gcp.BoolPtr(true),
},
},
},
out: &v1alpha1.SubscriptionParameters{
BigqueryConfig: &v1alpha1.BigqueryConfig{
Table: "projects/my-project/subscriptions/my-bigquery-subscription",
UseTopicSchema: gcp.BoolPtr(true),
WriteMetadata: nil,
DropUnknownFields: nil,
},
Topic: topicNameExternal,
PushConfig: nil,
},
},
}

for name, tc := range cases {
Expand Down Expand Up @@ -243,7 +348,7 @@ func TestGenerateUpdateRequest(t *testing.T) {
},
result: &pubsub.UpdateSubscriptionRequest{
Subscription: mutableSubscription,
UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,pushConfig,retryPolicy",
UpdateMask: "ackDeadlineSeconds,detached,filter,labels,messageRetentionDuration,retainAckedMessages,expirationPolicy,pushConfig,bigqueryConfig,retryPolicy",
},
},
}
Expand Down