From 4dc7a62e494055300ba8be17f5d0ef342e2a3c4b Mon Sep 17 00:00:00 2001
From: Mike Prieto <mikeprieto@google.com>
Date: Fri, 24 Jan 2025 23:42:23 +0000
Subject: [PATCH 1/2] docs: Add ingestion from Kafka samples and tests

---
 package.json                                  |   2 +-
 samples/createTopicWithAwsMskIngestion.js     |  90 +++++++++++++++
 .../createTopicWithAzureEventHubsIngestion.js | 105 ++++++++++++++++++
 .../createTopicWithConfluentCloudIngestion.js |  95 ++++++++++++++++
 samples/package.json                          |   2 +-
 samples/system-test/topics.test.ts            |  52 +++++++++
 .../createTopicWithAwsMskIngestion.ts         |  86 ++++++++++++++
 .../createTopicWithAzureEventHubsIngestion.ts | 101 +++++++++++++++++
 .../createTopicWithConfluentCloudIngestion.ts |  91 +++++++++++++++
 9 files changed, 622 insertions(+), 2 deletions(-)
 create mode 100644 samples/createTopicWithAwsMskIngestion.js
 create mode 100644 samples/createTopicWithAzureEventHubsIngestion.js
 create mode 100644 samples/createTopicWithConfluentCloudIngestion.js
 create mode 100644 samples/typescript/createTopicWithAwsMskIngestion.ts
 create mode 100644 samples/typescript/createTopicWithAzureEventHubsIngestion.ts
 create mode 100644 samples/typescript/createTopicWithConfluentCloudIngestion.ts

diff --git a/package.json b/package.json
index ad2f60bfa..4e2faeb97 100644
--- a/package.json
+++ b/package.json
@@ -1,7 +1,7 @@
 {
   "name": "@google-cloud/pubsub",
   "description": "Cloud Pub/Sub Client Library for Node.js",
-  "version": "4.9.0",
+  "version": "4.10.0",
   "license": "Apache-2.0",
   "author": "Google Inc.",
   "engines": {
diff --git a/samples/createTopicWithAwsMskIngestion.js b/samples/createTopicWithAwsMskIngestion.js
new file mode 100644
index 000000000..047598913
--- /dev/null
+++ b/samples/createTopicWithAwsMskIngestion.js
@@ -0,0 +1,90 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This is a generated sample, using the typeless sample bot. Please
+// look for the source TypeScript sample (.ts) for modifications.
+'use strict';
+
+/**
+ * This sample demonstrates how to perform basic operations on topics with
+ * the Google Cloud Pub/Sub API.
+ *
+ * For more information, see the README.md under /pubsub and the documentation
+ * at https://cloud.google.com/pubsub/docs.
+ */
+
+// sample-metadata:
+//   title: Create Topic With AWS MSK Ingestion
+//   description: Creates a new topic, with AWS MSK ingestion enabled.
+//   usage: node createTopicWithAwsMskIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>
+
+// [START pubsub_create_topic_with_aws_msk_ingestion]
+/**
+ * TODO(developer): Uncomment these variables before running the sample.
+ */
+// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
+// const clusterArn = 'arn:aws:kafka:...';
+// const mskTopic = 'YOUR_MSK_TOPIC';
+// const roleArn = 'arn:aws:iam:...';
+// const gcpServiceAccount = 'ingestion-account@...';
+
+// Imports the Google Cloud client library
+const { PubSub } = require("@google-cloud/pubsub");
+
+// Creates a client; cache this for further use
+const pubSubClient = new PubSub();
+
+async function createTopicWithAwsMskIngestion(
+topicNameOrId,
+clusterArn,
+mskTopic,
+awsRoleArn,
+gcpServiceAccount)
+{
+  // Creates a new topic with AWS MSK ingestion.
+  await pubSubClient.createTopic({
+    name: topicNameOrId,
+    ingestionDataSourceSettings: {
+      awsMsk: {
+        clusterArn,
+        topic: mskTopic,
+        awsRoleArn,
+        gcpServiceAccount
+      }
+    }
+  });
+  console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
+}
+// [END pubsub_create_topic_with_aws_msk_ingestion]
+
+function main(
+topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+clusterArn = 'arn:aws:kafka:...',
+mskTopic = 'YOUR_MSK_TOPIC',
+roleArn = 'arn:aws:iam:...',
+gcpServiceAccount = 'ingestion-account@...')
+{
+  createTopicWithAwsMskIngestion(
+    topicNameOrId,
+    clusterArn,
+    mskTopic,
+    roleArn,
+    gcpServiceAccount
+  ).catch((err) => {
+    console.error(err.message);
+    process.exitCode = 1;
+  });
+}
+
+main(...process.argv.slice(2));
\ No newline at end of file
diff --git a/samples/createTopicWithAzureEventHubsIngestion.js b/samples/createTopicWithAzureEventHubsIngestion.js
new file mode 100644
index 000000000..3c658445b
--- /dev/null
+++ b/samples/createTopicWithAzureEventHubsIngestion.js
@@ -0,0 +1,105 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This is a generated sample, using the typeless sample bot. Please
+// look for the source TypeScript sample (.ts) for modifications.
+'use strict';
+
+/**
+ * This sample demonstrates how to perform basic operations on topics with
+ * the Google Cloud Pub/Sub API.
+ *
+ * For more information, see the README.md under /pubsub and the documentation
+ * at https://cloud.google.com/pubsub/docs.
+ */
+
+// sample-metadata:
+//   title: Create Topic With Azure Event Hubs Ingestion
+//   description: Creates a new topic, with Azure Event Hubs ingestion enabled.
+//   usage: node createTopicWithAzureEventHubsIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>
+
+// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
+/**
+ * TODO(developer): Uncomment these variables before running the sample.
+ */
+// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
+// const resourceGroup = 'YOUR_RESOURCE_GROUP';
+// const namespace = 'YOUR_NAMESPACE';
+// const eventHub = 'YOUR_EVENT_HUB';
+// const clientId = 'YOUR_CLIENT_ID';
+// const tenantId = 'YOUR_TENANT_ID';
+// const subscriptionId = 'YOUR_SUBSCRIPTION_ID';
+// const gcpServiceAccount = 'ingestion-account@...';
+
+// Imports the Google Cloud client library
+const { PubSub } = require("@google-cloud/pubsub");
+
+// Creates a client; cache this for further use
+const pubSubClient = new PubSub();
+
+async function createTopicWithAzureEventHubsIngestion(
+topicNameOrId,
+resourceGroup,
+namespace,
+eventHub,
+clientId,
+tenantId,
+subscriptionId,
+gcpServiceAccount)
+{
+  // Creates a new topic with Azure Event Hubs ingestion.
+  await pubSubClient.createTopic({
+    name: topicNameOrId,
+    ingestionDataSourceSettings: {
+      azureEventHubs: {
+        resourceGroup,
+        namespace,
+        eventHub,
+        clientId,
+        tenantId,
+        subscriptionId,
+        gcpServiceAccount
+      }
+    }
+  });
+  console.log(`Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`);
+}
+// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
+
+function main(
+topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+resourceGroup = 'YOUR_RESOURCE_GROUP',
+namespace = 'YOUR_NAMESPACE',
+eventHub = 'YOUR_EVENT_HUB',
+clientId = 'YOUR_CLIENT_ID',
+tenantId = 'YOUR_TENANT_ID',
+subscriptionId = 'YOUR_SUBSCRIPTION_ID',
+gcpServiceAccount = 'ingestion-account@...')
+{
+  createTopicWithAzureEventHubsIngestion(
+    topicNameOrId,
+    resourceGroup,
+    namespace,
+    eventHub,
+    clientId,
+    tenantId,
+    subscriptionId,
+    gcpServiceAccount
+  ).catch((err) => {
+    console.error(err.message);
+    process.exitCode = 1;
+  });
+}
+
+main(...process.argv.slice(2));
\ No newline at end of file
diff --git a/samples/createTopicWithConfluentCloudIngestion.js b/samples/createTopicWithConfluentCloudIngestion.js
new file mode 100644
index 000000000..36b306f07
--- /dev/null
+++ b/samples/createTopicWithConfluentCloudIngestion.js
@@ -0,0 +1,95 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This is a generated sample, using the typeless sample bot. Please
+// look for the source TypeScript sample (.ts) for modifications.
+'use strict';
+
+/**
+ * This sample demonstrates how to perform basic operations on topics with
+ * the Google Cloud Pub/Sub API.
+ *
+ * For more information, see the README.md under /pubsub and the documentation
+ * at https://cloud.google.com/pubsub/docs.
+ */
+
+// sample-metadata:
+//   title: Create Topic With Confluent Cloud Ingestion
+//   description: Creates a new topic, with Confluent Cloud ingestion enabled.
+//   usage: node createTopicWithConfluentCloudIngestion.js <topic-name> <bootstrap-server> <cluster-id> <confluent-topic> <identity-pool-id> <gcp-service-account>
+
+// [START pubsub_create_topic_with_confluent_cloud_ingestion]
+/**
+ * TODO(developer): Uncomment these variables before running the sample.
+ */
+// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
+// const bootstrapServer = 'url:port';
+// const clusterId = 'YOUR_CLUSTER_ID';
+// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
+// const identityPoolId = 'pool-ID';
+// const gcpServiceAccount = 'ingestion-account@...';
+
+// Imports the Google Cloud client library
+const { PubSub } = require("@google-cloud/pubsub");
+
+// Creates a client; cache this for further use
+const pubSubClient = new PubSub();
+
+async function createTopicWithConfluentCloudIngestion(
+topicNameOrId,
+bootstrapServer,
+clusterId,
+confluentTopic,
+identityPoolId,
+gcpServiceAccount)
+{
+  // Creates a new topic with Confluent Cloud ingestion.
+  await pubSubClient.createTopic({
+    name: topicNameOrId,
+    ingestionDataSourceSettings: {
+      confluentCloud: {
+        bootstrapServer,
+        clusterId,
+        topic: confluentTopic,
+        identityPoolId,
+        gcpServiceAccount
+      }
+    }
+  });
+  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
+}
+// [END pubsub_create_topic_with_confluent_cloud_ingestion]
+
+function main(
+topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+bootstrapServer = 'url:port',
+clusterId = 'YOUR_CLUSTER_ID',
+confluentTopic = 'YOUR_CONFLUENT_TOPIC',
+identityPoolId = 'pool-ID',
+gcpServiceAccount = 'ingestion-account@...')
+{
+  createTopicWithConfluentCloudIngestion(
+    topicNameOrId,
+    bootstrapServer,
+    clusterId,
+    confluentTopic,
+    identityPoolId,
+    gcpServiceAccount
+  ).catch((err) => {
+    console.error(err.message);
+    process.exitCode = 1;
+  });
+}
+
+main(...process.argv.slice(2));
\ No newline at end of file
diff --git a/samples/package.json b/samples/package.json
index 87fcd15ca..0eb5032a6 100644
--- a/samples/package.json
+++ b/samples/package.json
@@ -22,7 +22,7 @@
   },
   "dependencies": {
     "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.0.0",
-    "@google-cloud/pubsub": "^4.9.0",
+    "@google-cloud/pubsub": "^4.10.0",
     "@google-cloud/storage": "^7.11.1",
     "@opentelemetry/api": "^1.6.0",
     "@opentelemetry/resources": "^1.17.0",
diff --git a/samples/system-test/topics.test.ts b/samples/system-test/topics.test.ts
index 54c28ee7f..99edacb48 100644
--- a/samples/system-test/topics.test.ts
+++ b/samples/system-test/topics.test.ts
@@ -154,6 +154,58 @@ describe('topics', () => {
     }
   });
 
+  it('should create a topic with aws msk ingestion', async () => {
+    const testId = 'create-aws-msk-ingestion';
+    const name = topicName(testId);
+    const clusterArn = 'arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1';
+    const mskTopic = 'fake-msk-topic';
+    const roleArn = 'arn:aws:iam::111111111111:role/fake-role-name';
+    const gcpServiceAccount = 'fake-service-account@fake-gcp-project.iam.gserviceaccount.com'
+
+    const output = execSync(
+      `${commandFor('createTopicWithAwsMskIngestion')} ${name} ${clusterArn} ${mskTopic} ${roleArn} ${gcpServiceAccount}`);
+      assert.include(output, `Topic ${name} created with AWS MSK ingestion.`)
+      const [topics] = await pubsub.getTopics();
+      const exists = topics.some(t => t.name === fullTopicName(name));
+      assert.ok(exists, 'Topic was created');
+  });
+
+  it('should create a topic with confluent cloud ingestion', async () => {
+    const testId = 'create-confluent-cloud-ingestion';
+    const name = topicName(testId);
+    const bootstrapServer = 'fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092';
+    const clusterId = 'fake-cluster-id';
+    const confluentTopic = 'fake-confluent-topic';
+    const identityPoolId = 'fake-pool-id';
+    const gcpServiceAccount = 'fake-service-account@fake-gcp-project.iam.gserviceaccount.com'
+
+    const output = execSync(
+      `${commandFor('createTopicWithConfluentCloudIngestion')} ${name} ${bootstrapServer} ${clusterId} ${confluentTopic} ${identityPoolId} ${gcpServiceAccount}`);
+      assert.include(output, `Topic ${name} created with Confluent Cloud ingestion.`)
+      const [topics] = await pubsub.getTopics();
+      const exists = topics.some(t => t.name === fullTopicName(name));
+      assert.ok(exists, 'Topic was created');
+  });
+
+  it('should create a topic with azure event hubs ingestion', async () => {
+    const testId = 'create-azure-event-hubs-ingestion';
+    const name = topicName(testId);
+    const resourceGroup = 'fake-resource-group';
+    const namespace = 'fake-namespace';
+    const eventHub = 'fake-event-hub';
+    const clientId = 'fake-client-id';
+    const tenantId = 'fake-tenant-id';
+    const subscriptionId = 'fake-subscription-id';
+    const gcpServiceAccount = 'fake-service-account@fake-gcp-project.iam.gserviceaccount.com'
+
+    const output = execSync(
+      `${commandFor('createTopicWithAzureEventHubsIngestion')} ${name} ${resourceGroup} ${namespace} ${eventHub} ${clientId} ${tenantId} ${subscriptionId} ${gcpServiceAccount}`);
+      assert.include(output, `Topic ${name} created with Azure Event Hubs ingestion.`)
+      const [topics] = await pubsub.getTopics();
+      const exists = topics.some(t => t.name === fullTopicName(name));
+      assert.ok(exists, 'Topic was created');
+  });
+
   it('should update a topic with kinesis integration', async () => {
     const pair = await createPair('update-kinesis');
     const output = execSync(
diff --git a/samples/typescript/createTopicWithAwsMskIngestion.ts b/samples/typescript/createTopicWithAwsMskIngestion.ts
new file mode 100644
index 000000000..117ddb3c6
--- /dev/null
+++ b/samples/typescript/createTopicWithAwsMskIngestion.ts
@@ -0,0 +1,86 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * This sample demonstrates how to perform basic operations on topics with
+ * the Google Cloud Pub/Sub API.
+ *
+ * For more information, see the README.md under /pubsub and the documentation
+ * at https://cloud.google.com/pubsub/docs.
+ */
+
+// sample-metadata:
+//   title: Create Topic With AWS MSK Ingestion
+//   description: Creates a new topic, with AWS MSK ingestion enabled.
+//   usage: node createTopicWithAwsMskIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>
+
+// [START pubsub_create_topic_with_aws_msk_ingestion]
+/**
+ * TODO(developer): Uncomment these variables before running the sample.
+ */
+// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
+// const clusterArn = 'arn:aws:kafka:...';
+// const mskTopic = 'YOUR_MSK_TOPIC';
+// const roleArn = 'arn:aws:iam:...';
+// const gcpServiceAccount = 'ingestion-account@...';
+
+// Imports the Google Cloud client library
+import {PubSub} from '@google-cloud/pubsub';
+
+// Creates a client; cache this for further use
+const pubSubClient = new PubSub();
+
+async function createTopicWithAwsMskIngestion(
+  topicNameOrId: string,
+  clusterArn: string,
+  mskTopic: string,
+  awsRoleArn: string,
+  gcpServiceAccount: string
+) {
+  // Creates a new topic with AWS MSK ingestion.
+  await pubSubClient.createTopic({
+    name: topicNameOrId,
+    ingestionDataSourceSettings: {
+      awsMsk: {
+        clusterArn,
+        topic: mskTopic,
+        awsRoleArn,
+        gcpServiceAccount,
+      },
+    },
+  });
+  console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
+}
+// [END pubsub_create_topic_with_aws_msk_ingestion]
+
+function main(
+  topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+  clusterArn = 'arn:aws:kafka:...',
+  mskTopic = 'YOUR_MSK_TOPIC',
+  roleArn = 'arn:aws:iam:...',
+  gcpServiceAccount = 'ingestion-account@...'
+) {
+  createTopicWithAwsMskIngestion(
+    topicNameOrId,
+    clusterArn,
+    mskTopic,
+    roleArn,
+    gcpServiceAccount
+  ).catch(err => {
+    console.error(err.message);
+    process.exitCode = 1;
+  });
+}
+
+main(...process.argv.slice(2));
diff --git a/samples/typescript/createTopicWithAzureEventHubsIngestion.ts b/samples/typescript/createTopicWithAzureEventHubsIngestion.ts
new file mode 100644
index 000000000..eb3537c3e
--- /dev/null
+++ b/samples/typescript/createTopicWithAzureEventHubsIngestion.ts
@@ -0,0 +1,101 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * This sample demonstrates how to perform basic operations on topics with
+ * the Google Cloud Pub/Sub API.
+ *
+ * For more information, see the README.md under /pubsub and the documentation
+ * at https://cloud.google.com/pubsub/docs.
+ */
+
+// sample-metadata:
+//   title: Create Topic With Azure Event Hubs Ingestion
+//   description: Creates a new topic, with Azure Event Hubs ingestion enabled.
+//   usage: node createTopicWithAzureEventHubsIngestion.js <topic-name> <cluster-arn> <msk-topic> <role-arn> <gcp-service-account>
+
+// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
+/**
+ * TODO(developer): Uncomment these variables before running the sample.
+ */
+// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
+// const resourceGroup = 'YOUR_RESOURCE_GROUP';
+// const namespace = 'YOUR_NAMESPACE';
+// const eventHub = 'YOUR_EVENT_HUB';
+// const clientId = 'YOUR_CLIENT_ID';
+// const tenantId = 'YOUR_TENANT_ID';
+// const subscriptionId = 'YOUR_SUBSCRIPTION_ID';
+// const gcpServiceAccount = 'ingestion-account@...';
+
+// Imports the Google Cloud client library
+import {PubSub} from '@google-cloud/pubsub';
+
+// Creates a client; cache this for further use
+const pubSubClient = new PubSub();
+
+async function createTopicWithAzureEventHubsIngestion(
+  topicNameOrId: string,
+  resourceGroup: string,
+  namespace: string,
+  eventHub: string,
+  clientId: string,
+  tenantId: string,
+  subscriptionId: string,
+  gcpServiceAccount: string
+) {
+  // Creates a new topic with Azure Event Hubs ingestion.
+  await pubSubClient.createTopic({
+    name: topicNameOrId,
+    ingestionDataSourceSettings: {
+      azureEventHubs: {
+        resourceGroup,
+        namespace,
+        eventHub,
+        clientId,
+        tenantId,
+        subscriptionId,
+        gcpServiceAccount,
+      },
+    },
+  });
+  console.log(`Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`);
+}
+// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
+
+function main(
+  topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+  resourceGroup = 'YOUR_RESOURCE_GROUP',
+  namespace = 'YOUR_NAMESPACE',
+  eventHub = 'YOUR_EVENT_HUB',
+  clientId = 'YOUR_CLIENT_ID',
+  tenantId = 'YOUR_TENANT_ID',
+  subscriptionId = 'YOUR_SUBSCRIPTION_ID',
+  gcpServiceAccount = 'ingestion-account@...'
+) {
+  createTopicWithAzureEventHubsIngestion(
+    topicNameOrId,
+    resourceGroup,
+    namespace,
+    eventHub,
+    clientId,
+    tenantId,
+    subscriptionId,
+    gcpServiceAccount
+  ).catch(err => {
+    console.error(err.message);
+    process.exitCode = 1;
+  });
+}
+
+main(...process.argv.slice(2));
diff --git a/samples/typescript/createTopicWithConfluentCloudIngestion.ts b/samples/typescript/createTopicWithConfluentCloudIngestion.ts
new file mode 100644
index 000000000..a540dbdad
--- /dev/null
+++ b/samples/typescript/createTopicWithConfluentCloudIngestion.ts
@@ -0,0 +1,91 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * This sample demonstrates how to perform basic operations on topics with
+ * the Google Cloud Pub/Sub API.
+ *
+ * For more information, see the README.md under /pubsub and the documentation
+ * at https://cloud.google.com/pubsub/docs.
+ */
+
+// sample-metadata:
+//   title: Create Topic With Confluent Cloud Ingestion
+//   description: Creates a new topic, with Confluent Cloud ingestion enabled.
+//   usage: node createTopicWithConfluentCloudIngestion.js <topic-name> <bootstrap-server> <cluster-id> <confluent-topic> <identity-pool-id> <gcp-service-account>
+
+// [START pubsub_create_topic_with_confluent_cloud_ingestion]
+/**
+ * TODO(developer): Uncomment these variables before running the sample.
+ */
+// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
+// const bootstrapServer = 'url:port';
+// const clusterId = 'YOUR_CLUSTER_ID';
+// const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
+// const identityPoolId = 'pool-ID';
+// const gcpServiceAccount = 'ingestion-account@...';
+
+// Imports the Google Cloud client library
+import {PubSub} from '@google-cloud/pubsub';
+
+// Creates a client; cache this for further use
+const pubSubClient = new PubSub();
+
+async function createTopicWithConfluentCloudIngestion(
+  topicNameOrId: string,
+  bootstrapServer: string,
+  clusterId: string,
+  confluentTopic: string,
+  identityPoolId: string,
+  gcpServiceAccount: string
+) {
+  // Creates a new topic with Confluent Cloud ingestion.
+  await pubSubClient.createTopic({
+    name: topicNameOrId,
+    ingestionDataSourceSettings: {
+      confluentCloud: {
+        bootstrapServer,
+        clusterId,
+        topic: confluentTopic,
+        identityPoolId,
+        gcpServiceAccount,
+      },
+    },
+  });
+  console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
+}
+// [END pubsub_create_topic_with_confluent_cloud_ingestion]
+
+function main(
+  topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+  bootstrapServer = 'url:port',
+  clusterId = 'YOUR_CLUSTER_ID',
+  confluentTopic = 'YOUR_CONFLUENT_TOPIC',
+  identityPoolId = 'pool-ID',
+  gcpServiceAccount = 'ingestion-account@...'
+) {
+  createTopicWithConfluentCloudIngestion(
+    topicNameOrId,
+    bootstrapServer,
+    clusterId,
+    confluentTopic,
+    identityPoolId,
+    gcpServiceAccount
+  ).catch(err => {
+    console.error(err.message);
+    process.exitCode = 1;
+  });
+}
+
+main(...process.argv.slice(2));

From f267942b8495d0f4fd4d9bf23983f761d0d82bcd Mon Sep 17 00:00:00 2001
From: Mike Prieto <mikeprieto@google.com>
Date: Fri, 24 Jan 2025 23:45:23 +0000
Subject: [PATCH 2/2] chore: Fix formatting in samples

---
 samples/createTopicWithAwsMskIngestion.js     | 36 ++++++-------
 .../createTopicWithAzureEventHubsIngestion.js | 52 ++++++++++---------
 .../createTopicWithConfluentCloudIngestion.js | 40 +++++++-------
 .../createTopicWithAzureEventHubsIngestion.ts |  4 +-
 4 files changed, 68 insertions(+), 64 deletions(-)

diff --git a/samples/createTopicWithAwsMskIngestion.js b/samples/createTopicWithAwsMskIngestion.js
index 047598913..5020e9be9 100644
--- a/samples/createTopicWithAwsMskIngestion.js
+++ b/samples/createTopicWithAwsMskIngestion.js
@@ -40,18 +40,18 @@
 // const gcpServiceAccount = 'ingestion-account@...';
 
 // Imports the Google Cloud client library
-const { PubSub } = require("@google-cloud/pubsub");
+const {PubSub} = require('@google-cloud/pubsub');
 
 // Creates a client; cache this for further use
 const pubSubClient = new PubSub();
 
 async function createTopicWithAwsMskIngestion(
-topicNameOrId,
-clusterArn,
-mskTopic,
-awsRoleArn,
-gcpServiceAccount)
-{
+  topicNameOrId,
+  clusterArn,
+  mskTopic,
+  awsRoleArn,
+  gcpServiceAccount
+) {
   // Creates a new topic with AWS MSK ingestion.
   await pubSubClient.createTopic({
     name: topicNameOrId,
@@ -60,31 +60,31 @@ gcpServiceAccount)
         clusterArn,
         topic: mskTopic,
         awsRoleArn,
-        gcpServiceAccount
-      }
-    }
+        gcpServiceAccount,
+      },
+    },
   });
   console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
 }
 // [END pubsub_create_topic_with_aws_msk_ingestion]
 
 function main(
-topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
-clusterArn = 'arn:aws:kafka:...',
-mskTopic = 'YOUR_MSK_TOPIC',
-roleArn = 'arn:aws:iam:...',
-gcpServiceAccount = 'ingestion-account@...')
-{
+  topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+  clusterArn = 'arn:aws:kafka:...',
+  mskTopic = 'YOUR_MSK_TOPIC',
+  roleArn = 'arn:aws:iam:...',
+  gcpServiceAccount = 'ingestion-account@...'
+) {
   createTopicWithAwsMskIngestion(
     topicNameOrId,
     clusterArn,
     mskTopic,
     roleArn,
     gcpServiceAccount
-  ).catch((err) => {
+  ).catch(err => {
     console.error(err.message);
     process.exitCode = 1;
   });
 }
 
-main(...process.argv.slice(2));
\ No newline at end of file
+main(...process.argv.slice(2));
diff --git a/samples/createTopicWithAzureEventHubsIngestion.js b/samples/createTopicWithAzureEventHubsIngestion.js
index 3c658445b..1e4b16a72 100644
--- a/samples/createTopicWithAzureEventHubsIngestion.js
+++ b/samples/createTopicWithAzureEventHubsIngestion.js
@@ -43,21 +43,21 @@
 // const gcpServiceAccount = 'ingestion-account@...';
 
 // Imports the Google Cloud client library
-const { PubSub } = require("@google-cloud/pubsub");
+const {PubSub} = require('@google-cloud/pubsub');
 
 // Creates a client; cache this for further use
 const pubSubClient = new PubSub();
 
 async function createTopicWithAzureEventHubsIngestion(
-topicNameOrId,
-resourceGroup,
-namespace,
-eventHub,
-clientId,
-tenantId,
-subscriptionId,
-gcpServiceAccount)
-{
+  topicNameOrId,
+  resourceGroup,
+  namespace,
+  eventHub,
+  clientId,
+  tenantId,
+  subscriptionId,
+  gcpServiceAccount
+) {
   // Creates a new topic with Azure Event Hubs ingestion.
   await pubSubClient.createTopic({
     name: topicNameOrId,
@@ -69,24 +69,26 @@ gcpServiceAccount)
         clientId,
         tenantId,
         subscriptionId,
-        gcpServiceAccount
-      }
-    }
+        gcpServiceAccount,
+      },
+    },
   });
-  console.log(`Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`);
+  console.log(
+    `Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`
+  );
 }
 // [END pubsub_create_topic_with_azure_event_hubs_ingestion]
 
 function main(
-topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
-resourceGroup = 'YOUR_RESOURCE_GROUP',
-namespace = 'YOUR_NAMESPACE',
-eventHub = 'YOUR_EVENT_HUB',
-clientId = 'YOUR_CLIENT_ID',
-tenantId = 'YOUR_TENANT_ID',
-subscriptionId = 'YOUR_SUBSCRIPTION_ID',
-gcpServiceAccount = 'ingestion-account@...')
-{
+  topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+  resourceGroup = 'YOUR_RESOURCE_GROUP',
+  namespace = 'YOUR_NAMESPACE',
+  eventHub = 'YOUR_EVENT_HUB',
+  clientId = 'YOUR_CLIENT_ID',
+  tenantId = 'YOUR_TENANT_ID',
+  subscriptionId = 'YOUR_SUBSCRIPTION_ID',
+  gcpServiceAccount = 'ingestion-account@...'
+) {
   createTopicWithAzureEventHubsIngestion(
     topicNameOrId,
     resourceGroup,
@@ -96,10 +98,10 @@ gcpServiceAccount = 'ingestion-account@...')
     tenantId,
     subscriptionId,
     gcpServiceAccount
-  ).catch((err) => {
+  ).catch(err => {
     console.error(err.message);
     process.exitCode = 1;
   });
 }
 
-main(...process.argv.slice(2));
\ No newline at end of file
+main(...process.argv.slice(2));
diff --git a/samples/createTopicWithConfluentCloudIngestion.js b/samples/createTopicWithConfluentCloudIngestion.js
index 36b306f07..d81b53fd3 100644
--- a/samples/createTopicWithConfluentCloudIngestion.js
+++ b/samples/createTopicWithConfluentCloudIngestion.js
@@ -41,19 +41,19 @@
 // const gcpServiceAccount = 'ingestion-account@...';
 
 // Imports the Google Cloud client library
-const { PubSub } = require("@google-cloud/pubsub");
+const {PubSub} = require('@google-cloud/pubsub');
 
 // Creates a client; cache this for further use
 const pubSubClient = new PubSub();
 
 async function createTopicWithConfluentCloudIngestion(
-topicNameOrId,
-bootstrapServer,
-clusterId,
-confluentTopic,
-identityPoolId,
-gcpServiceAccount)
-{
+  topicNameOrId,
+  bootstrapServer,
+  clusterId,
+  confluentTopic,
+  identityPoolId,
+  gcpServiceAccount
+) {
   // Creates a new topic with Confluent Cloud ingestion.
   await pubSubClient.createTopic({
     name: topicNameOrId,
@@ -63,22 +63,22 @@ gcpServiceAccount)
         clusterId,
         topic: confluentTopic,
         identityPoolId,
-        gcpServiceAccount
-      }
-    }
+        gcpServiceAccount,
+      },
+    },
   });
   console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
 }
 // [END pubsub_create_topic_with_confluent_cloud_ingestion]
 
 function main(
-topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
-bootstrapServer = 'url:port',
-clusterId = 'YOUR_CLUSTER_ID',
-confluentTopic = 'YOUR_CONFLUENT_TOPIC',
-identityPoolId = 'pool-ID',
-gcpServiceAccount = 'ingestion-account@...')
-{
+  topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
+  bootstrapServer = 'url:port',
+  clusterId = 'YOUR_CLUSTER_ID',
+  confluentTopic = 'YOUR_CONFLUENT_TOPIC',
+  identityPoolId = 'pool-ID',
+  gcpServiceAccount = 'ingestion-account@...'
+) {
   createTopicWithConfluentCloudIngestion(
     topicNameOrId,
     bootstrapServer,
@@ -86,10 +86,10 @@ gcpServiceAccount = 'ingestion-account@...')
     confluentTopic,
     identityPoolId,
     gcpServiceAccount
-  ).catch((err) => {
+  ).catch(err => {
     console.error(err.message);
     process.exitCode = 1;
   });
 }
 
-main(...process.argv.slice(2));
\ No newline at end of file
+main(...process.argv.slice(2));
diff --git a/samples/typescript/createTopicWithAzureEventHubsIngestion.ts b/samples/typescript/createTopicWithAzureEventHubsIngestion.ts
index eb3537c3e..e49a4bf3d 100644
--- a/samples/typescript/createTopicWithAzureEventHubsIngestion.ts
+++ b/samples/typescript/createTopicWithAzureEventHubsIngestion.ts
@@ -69,7 +69,9 @@ async function createTopicWithAzureEventHubsIngestion(
       },
     },
   });
-  console.log(`Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`);
+  console.log(
+    `Topic ${topicNameOrId} created with Azure Event Hubs ingestion.`
+  );
 }
 // [END pubsub_create_topic_with_azure_event_hubs_ingestion]