Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Add ingestion from Kafka samples and tests #2009

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions samples/createTopicWithAwsMskIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With AWS MSK Ingestion
// description: Creates a new topic, with AWS MSK ingestion enabled.
// usage: node createTopicWithAwsMskIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>

// [START pubsub_create_topic_with_aws_msk_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const clusterArn = 'arn:aws:kafka:...';
// const mskTopic = 'YOUR_MSK_TOPIC';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAwsMskIngestion(
topicNameOrId,
clusterArn,
mskTopic,
awsRoleArn,
gcpServiceAccount
) {
// Creates a new topic with AWS MSK ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
awsMsk: {
clusterArn,
topic: mskTopic,
awsRoleArn,
gcpServiceAccount,
},
},
});
console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
}
// [END pubsub_create_topic_with_aws_msk_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
clusterArn = 'arn:aws:kafka:...',
mskTopic = 'YOUR_MSK_TOPIC',
roleArn = 'arn:aws:iam:...',
gcpServiceAccount = 'ingestion-account@...'
) {
createTopicWithAwsMskIngestion(
topicNameOrId,
clusterArn,
mskTopic,
roleArn,
gcpServiceAccount
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
107 changes: 107 additions & 0 deletions samples/createTopicWithAzureEventHubsIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Azure Event Hubs Ingestion
// description: Creates a new topic, with Azure Event Hubs ingestion enabled.
// usage: node createTopicWithAzureEventHubsIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>

// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const resourceGroup = 'YOUR_RESOURCE_GROUP';
// const namespace = 'YOUR_NAMESPACE';
// const eventHub = 'YOUR_EVENT_HUB';
// const clientId = 'YOUR_CLIENT_ID';
// const tenantId = 'YOUR_TENANT_ID';
// const subscriptionId = 'YOUR_SUBSCRIPTION_ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAzureEventHubsIngestion(
topicNameOrId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount
) {
// Creates a new topic with Azure Event Hubs ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
azureEventHubs: {
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount,
},
},
});
console.log(
`Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`
);
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
resourceGroup = 'YOUR_RESOURCE_GROUP',
namespace = 'YOUR_NAMESPACE',
eventHub = 'YOUR_EVENT_HUB',
clientId = 'YOUR_CLIENT_ID',
tenantId = 'YOUR_TENANT_ID',
subscriptionId = 'YOUR_SUBSCRIPTION_ID',
gcpServiceAccount = 'ingestion-account@...'
) {
createTopicWithAzureEventHubsIngestion(
topicNameOrId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
95 changes: 95 additions & 0 deletions samples/createTopicWithConfluentCloudIngestion.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on topics with
* the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Create Topic With Confluent Cloud Ingestion
// description: Creates a new topic, with Confluent Cloud ingestion enabled.
// usage: node createTopicWithConfluentCloudIngestion.js <topic-name> <bootstrap-server> <cluster-id> <confluent-topic> <identity-pool-id> <gcp-service-account>

// [START pubsub_create_topic_with_confluent_cloud_ingestion]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bootstrapServer = 'url:port';
// const clusterId = 'YOUR_CLUSTER_ID';
// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
// const identityPoolId = 'pool-ID';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithConfluentCloudIngestion(
topicNameOrId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount
) {
// Creates a new topic with Confluent Cloud ingestion.
await pubSubClient.createTopic({
name: topicNameOrId,
ingestionDataSourceSettings: {
confluentCloud: {
bootstrapServer,
clusterId,
topic: confluentTopic,
identityPoolId,
gcpServiceAccount,
},
},
});
console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
}
// [END pubsub_create_topic_with_confluent_cloud_ingestion]

function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
bootstrapServer = 'url:port',
clusterId = 'YOUR_CLUSTER_ID',
confluentTopic = 'YOUR_CONFLUENT_TOPIC',
identityPoolId = 'pool-ID',
gcpServiceAccount = 'ingestion-account@...'
) {
createTopicWithConfluentCloudIngestion(
topicNameOrId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount
).catch(err => {
console.error(err.message);
process.exitCode = 1;
});
}

main(...process.argv.slice(2));
52 changes: 52 additions & 0 deletions samples/system-test/topics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,58 @@ describe('topics', () => {
}
});

it('should create a topic with aws msk ingestion', async () => {
const testId = 'create-aws-msk-ingestion';
const name = topicName(testId);
const clusterArn = 'arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1';
const mskTopic = 'fake-msk-topic';
const roleArn = 'arn:aws:iam::111111111111:role/fake-role-name';
const gcpServiceAccount = '[email protected]'

const output = execSync(
`${commandFor('createTopicWithAwsMskIngestion')} ${name} ${clusterArn} ${mskTopic} ${roleArn} ${gcpServiceAccount}`);
assert.include(output, `Topic ${name} created with AWS MSK ingestion.`)
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should create a topic with confluent cloud ingestion', async () => {
const testId = 'create-confluent-cloud-ingestion';
const name = topicName(testId);
const bootstrapServer = 'fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092';
const clusterId = 'fake-cluster-id';
const confluentTopic = 'fake-confluent-topic';
const identityPoolId = 'fake-pool-id';
const gcpServiceAccount = '[email protected]'

const output = execSync(
`${commandFor('createTopicWithConfluentCloudIngestion')} ${name} ${bootstrapServer} ${clusterId} ${confluentTopic} ${identityPoolId} ${gcpServiceAccount}`);
assert.include(output, `Topic ${name} created with Confluent Cloud ingestion.`)
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should create a topic with azure event hubs ingestion', async () => {
const testId = 'create-azure-event-hubs-ingestion';
const name = topicName(testId);
const resourceGroup = 'fake-resource-group';
const namespace = 'fake-namespace';
const eventHub = 'fake-event-hub';
const clientId = 'fake-client-id';
const tenantId = 'fake-tenant-id';
const subscriptionId = 'fake-subscription-id';
const gcpServiceAccount = '[email protected]'

const output = execSync(
`${commandFor('createTopicWithAzureEventHubsIngestion')} ${name} ${resourceGroup} ${namespace} ${eventHub} ${clientId} ${tenantId} ${subscriptionId} ${gcpServiceAccount}`);
assert.include(output, `Topic ${name} created with Azure Event Hubs ingestion.`)
const [topics] = await pubsub.getTopics();
const exists = topics.some(t => t.name === fullTopicName(name));
assert.ok(exists, 'Topic was created');
});

it('should update a topic with kinesis integration', async () => {
const pair = await createPair('update-kinesis');
const output = execSync(
Expand Down
Loading
Loading