An example about configuring PubSub BigQuery Subscription with Pulumi
- BigQuery Subscription
- Configure BigQuery Dataset and Table
- Configure BigQuery Subscription
- Application Logging
- Query data from BigQuery
BigQuery Subscription
It’s hard to view the content of the messages that were published to a topic because the application has already processed and acknowledged them before you can do anything. Usually, you have to create another test subscription for the messages to be replicated to and then pull messages from that test subscription. However, the Google PubSub UI doesn’t provide any way to pull specific message by id. The GCloud Console UI is a frustrating UI itself, slow to load and had to pull several times to find the necessary messages.
Google offers BigQuery Subscription, a solution to that issue and also to provide a long term storage for your messages so you can troubleshoot and do complex query later. In this post, I’m going to show a sample BigQuery Subscription workflow with Pulumi.
Configure BigQuery Dataset and Table
First, you need to create a BigQuery Dataset and a BigQuery Table following the schema defined here. You can do it manually on the UI or via Pulumi
BigQuery Dataset
const pubsubDatasetId = `pubsub`;
export const pubsubDataset = new gcp.bigquery.Dataset(
`my-dataset`,
{ datasetId: pubsubDatasetId }
);
BigQuery Table (a bit messy since the schema has to be defined in JSON string)
export const messageTable = new gcp.bigquery.Table(
`my-table`,
{
datasetId: pubsubDatasetId,
tableId: `message-values`,
// if you don't want other people to accidentally delete is, set to true
deletionProtection: true,
schema: `
[
{
"name": "data",
"type": "STRING",
"mode": "NULLABLE",
"description": "The message body"
},
{
"name": "subscription_name",
"type": "STRING",
"mode": "NULLABLE",
"description": ""
},
{
"name": "message_id",
"type": "STRING",
"mode": "NULLABLE",
"description": ""
},
{
"name": "publish_time",
"type": "TIMESTAMP",
"mode": "NULLABLE",
"description": ""
},
{
"name": "attributes",
"type": "STRING",
"mode": "NULLABLE",
"description": "Message attributes as JSON string"
}
]
`,
},
{
dependsOn: [pubsubDataset],
}
);
Configure BigQuery Subscription
Creating a BigQuery Subscription is quite simple with Google Cloud Console. Remember to enable the Write metadata option so PubSub writes the data to the correct columns defined in the schema above.
To automate it via Pulumi
export const bigquerySubscription = new gcp.pubsub.Subscription(
`my-big-query-subscription`,
{
topic: 'my-topic',
bigqueryConfig: {
table: pulumi.interpolate`${messageTable.project}:${messageTable.datasetId}.${messageTable.tableId}`,
writeMetadata: true,
}
},
{
dependsOn: [messageTable],
}
)
Application Logging
And of course, you need to make sure that you log enough information to debug later. At the very
least, your application should log the messageId
generated by Google PubSub after publishing
the message, for example
import { PubSub } from '@google-cloud/pubsub';
const pubSubClient = new PubSub({});
const topic = pubSubClient.topic('my-topic');
const publishMessage = async (body: any): Promise<void> => {
const data = JSON.stringify(event);
const dataBuffer = Buffer.from(data);
try {
const messageId = await topic.publishMessage({ data: dataBuffer });
logger.info('Message published to Google PubSub', { messageId });
} catch (error) {
logger.error('Failed to publish message', { error });
}
};
Query data from BigQuery
Once you got the messageId
, querying from BigQuery is a straight-forward task
SELECT * FROM `my-project.pubsub.message-values`
WHERE message_id='5627870046321012'
LIMIT 1
In case you want to perform more complex query like filtering base on the attributes or the content
of the message, use
BigQuery JSON functions.
For example, if you always include the clientId
in the message body, you can filter by a specific
client like this
SELECT * FROM `my-project.pubsub.message-values`
WHERE JSON_VALUE(data, "$.clientId") = 'a-client-id'
LIMIT 1