An example about configuring PubSub BigQuery Subscription with Pulumi

BigQuery Subscription

It’s hard to view the content of the messages that were published to a topic because the application has already processed and acknowledged them before you can do anything. Usually, you have to create another test subscription for the messages to be replicated to and then pull messages from that test subscription. However, the Google PubSub UI doesn’t provide any way to pull specific message by id. The GCloud Console UI is a frustrating UI itself, slow to load and had to pull several times to find the necessary messages.

Google offers BigQuery Subscription, a solution to that issue and also to provide a long term storage for your messages so you can troubleshoot and do complex query later. In this post, I’m going to show a sample BigQuery Subscription workflow with Pulumi.

Configure BigQuery Dataset and Table

First, you need to create a BigQuery Dataset and a BigQuery Table following the schema defined here. You can do it manually on the UI or via Pulumi

BigQuery Dataset

const pubsubDatasetId = `pubsub`;

export const pubsubDataset = new gcp.bigquery.Dataset(
  `my-dataset`,
  { datasetId: pubsubDatasetId }
);

BigQuery Table (a bit messy since the schema has to be defined in JSON string)

export const messageTable = new gcp.bigquery.Table(
  `my-table`,
  {
    datasetId: pubsubDatasetId,
    tableId: `message-values`,
    // if you don't want other people to accidentally delete is, set to true
    deletionProtection: true,
    schema: `
  [
    {
      "name": "data",
      "type": "STRING",
      "mode": "NULLABLE",
      "description": "The message body"
    },
    {
      "name": "subscription_name",
      "type": "STRING",
      "mode": "NULLABLE",
      "description": ""
    },
    {
      "name": "message_id",
      "type": "STRING",
      "mode": "NULLABLE",
      "description": ""
    },
    {
      "name": "publish_time",
      "type": "TIMESTAMP",
      "mode": "NULLABLE",
      "description": ""
    },
    {
      "name": "attributes",
      "type": "STRING",
      "mode": "NULLABLE",
      "description": "Message attributes as JSON string"
    }
  ]
  `,
  },
  {
    dependsOn: [pubsubDataset],
  }
);

Configure BigQuery Subscription

Creating a BigQuery Subscription is quite simple with Google Cloud Console. Remember to enable the Write metadata option so PubSub writes the data to the correct columns defined in the schema above.

BigQuery Subscription

To automate it via Pulumi

export const bigquerySubscription = new gcp.pubsub.Subscription(
  `my-big-query-subscription`,
  {
    topic: 'my-topic',
    bigqueryConfig: {
      table: pulumi.interpolate`${messageTable.project}:${messageTable.datasetId}.${messageTable.tableId}`,
      writeMetadata: true,
    }
  },
  {
    dependsOn: [messageTable],
  }
)

Application Logging

And of course, you need to make sure that you log enough information to debug later. At the very least, your application should log the messageId generated by Google PubSub after publishing the message, for example

import { PubSub } from '@google-cloud/pubsub';

const pubSubClient = new PubSub({});
const topic = pubSubClient.topic('my-topic');

const publishMessage = async (body: any): Promise<void> => {
  const data = JSON.stringify(event);
  const dataBuffer = Buffer.from(data);

  try {
    const messageId = await topic.publishMessage({ data: dataBuffer });
    logger.info('Message published to Google PubSub', { messageId });
  } catch (error) {
    logger.error('Failed to publish message', { error });
  }
};

Query data from BigQuery

Once you got the messageId, querying from BigQuery is a straight-forward task

SELECT * FROM `my-project.pubsub.message-values`
WHERE message_id='5627870046321012'
LIMIT 1

Query Result

In case you want to perform more complex query like filtering base on the attributes or the content of the message, use BigQuery JSON functions. For example, if you always include the clientId in the message body, you can filter by a specific client like this

SELECT * FROM `my-project.pubsub.message-values`
WHERE JSON_VALUE(data, "$.clientId") = 'a-client-id'
LIMIT 1