Skip to main content

Audit Logging - GCP Pub/Sub

Prerequisites

Before configuring the Audit log Sink, please complete the following steps in Google Cloud:

  1. Create a PubSub topic and take note of its topic name, for example, "test- auditlog"
    1. If you wish to enable customer-managed encryption keys (CMEK), do so
  2. Record the GCP Project ID that owns the topic
  3. Set up a service account in the same project that trusts the Temporal internal service account to let Temporal write information to your account. Follow the instructions in the Temporal Cloud UI, there are two ways to set up this service account:
    1. Input the service account ID, GCP project ID and PubSub topic name
      1. Follow the instructions, manually set up a new service account
      2. Use the Terraform template to create the service account

Temporal Cloud UI

Temporal Cloud UI Setup for Audit Logging with GCP Pub/Sub

  1. In the Cloud UI, navigate to the Settings → Integration Page → Audit Log, confirm that you see Pub/Sub as a sink option
  2. Configure the Audit Log
    1. Choose Pub as Sink type
    2. Provide the following information
      1. Service account ID: [from Prerequisite 3]
      2. GCP Project ID: [from Prerequisite 2]
      3. Pub/Sub topic name: [from Prerequisite 1]
  3. Once you have filled in the necessary values, please click on “Create” to get Audit Log Configured
  4. Please check back in few minutes to make sure everything set up successfully

More information

More details available in our public-facing documentation: https://docs.temporal.io/cloud/audit-logging

Example of consuming an Audit Log

The following Go code is an example of consuming Audit Logs from a PubSub stream

package main
import (
"fmt"
"io/ioutil"
"os"
"github.com/gogo/protobuf/jsonpb"
// TODO: change path to your generated proto
export "generated/exported_workflow"
"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
// TODO: change path to temporal repo
ossserialization "go.temporal.io/server/common/persistence/serialization"
)
func extractWorkflowHistoriesFromFile(filename string) ([]*export.Workflow, error) {
bytes, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("error reading from file: %v", err)
}
blob := &common.DataBlob{
EncodingType: enumspb.ENCODING_TYPE_PROTO3,
Data: bytes,
}
result := &export.ExportedWorkflows{}
err = ossserialization.ProtoDecodeBlob(blob, result)
if err != nil {
return nil, fmt.Errorf("failed to decode file: %w", err)
}
workflows := result.Workflows
for _, workflow := range workflows {
history := workflow.History
if history == nil {
return nil, fmt.Errorf("history is nil")
}
}
return workflows, nil
}
func printWorkflow(workflow *export.Workflow) {
// Pretty print the workflow
marshaler := jsonpb.Marshaler{
Indent: "\t",
EmitDefaults: true,
}
Export Feature (User Copy)
9
str, err := marshaler.MarshalToString(workflow.History)
if err != nil {
fmt.Println("error print workflow history: ", err)
os.Exit(1) }
print(str) }
func printAllWorkflows(workflowHistories []*export.Workflow) {
for _, workflow := range workflowHistories {
printWorkflow(workflow)
}
}
func printWorkflowHistory(workflowID string, workflowHistories []*export.Workflow) {
if workflowID == "" {
fmt.Println("invalid workflow ID")
os.Exit(1) }
for _, workflow := range workflowHistories {
if workflow.History.Events[0].GetWorkflowExecutionStartedEventAttributes().WorkflowId
== workflowID {
fmt.Println("Printing workflow history for workflow ID: ", workflowID)
printWorkflow(workflow)
} }
fmt.Println("No workflow found with workflow ID: ", workflowID)
}
func main() {
if len(os.Args) < 2 {
fmt.Println("Please provide a path to a file")
os.Exit(1) }
filename := os.Args[1]
fmt.Println("Deserializing export workflow history from file: ", filename)
workflowHistories, err := extractWorkflowHistoriesFromFile(filename)
if err != nil {
fmt.Println("error extracting workflow histories: ", err)
os.Exit(1)
}
fmt.Println("Successfully deserialized workflow histories")
fmt.Println("Total number of workflow histories: ", len(workflowHistories))
fmt.Println("Choose an option:")
fmt.Println("1. Print out all the workflows")
fmt.Println("2. Print out the workflow hisotry of a specific workflow. Enter the workflow ID:")
var option int
fmt.Print("Enter your choice: ")
_, err = fmt.Scanf("%d", &option)
if err != nil {
fmt.Println("invalid input.")
os.Exit(1) }
switch option {
case 1:
printAllWorkflows(workflowHistories)
case 2:
fmt.Println("Please provide a workflow ID:")
var workflowID string
_, err = fmt.Scanf("%s", &workflowID)
if err != nil {
fmt.Println("invalid input for workflow ID.")
os.Exit(1) }
printWorkflowHistory(workflowID, workflowHistories)
default:
fmt.Println("only options 1 and 2 are supported.")
os.Exit(1) }
}