Audit Logging - GCP Pub/Sub
Prerequisites
Before configuring the Audit log Sink, please complete the following steps in Google Cloud:
- Create a PubSub topic and take note of its topic name, for example, "test-
auditlog"
- If you wish to enable customer-managed encryption keys (CMEK), do so
- Record the GCP Project ID that owns the topic
- Set up a service account in the same project that trusts the Temporal internal service account to let Temporal write information to your account. Follow the instructions in the Temporal Cloud UI, there are two ways to set up this service account:
- Input the service account ID, GCP project ID and PubSub topic name
- Follow the instructions, manually set up a new service account
- Use the Terraform template to create the service account
- Input the service account ID, GCP project ID and PubSub topic name
Temporal Cloud UI
- In the Cloud UI, navigate to the Settings → Integration Page → Audit Log, confirm that you see Pub/Sub as a sink option
- Configure the Audit Log
- Choose Pub as Sink type
- Provide the following information
- Service account ID: [from Prerequisite 3]
- GCP Project ID: [from Prerequisite 2]
- Pub/Sub topic name: [from Prerequisite 1]
- Once you have filled in the necessary values, please click on “Create” to get Audit Log Configured
- Please check back in few minutes to make sure everything set up successfully
More information
More details available in our public-facing documentation: https://docs.temporal.io/cloud/audit-logging
Example of consuming an Audit Log
The following Go code is an example of consuming Audit Logs from a PubSub stream
package main
import (
"fmt"
"io/ioutil"
"os"
"github.com/gogo/protobuf/jsonpb"
// TODO: change path to your generated proto
export "generated/exported_workflow"
"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
// TODO: change path to temporal repo
ossserialization "go.temporal.io/server/common/persistence/serialization"
)
func extractWorkflowHistoriesFromFile(filename string) ([]*export.Workflow, error) {
bytes, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("error reading from file: %v", err)
}
blob := &common.DataBlob{
EncodingType: enumspb.ENCODING_TYPE_PROTO3,
Data: bytes,
}
result := &export.ExportedWorkflows{}
err = ossserialization.ProtoDecodeBlob(blob, result)
if err != nil {
return nil, fmt.Errorf("failed to decode file: %w", err)
}
workflows := result.Workflows
for _, workflow := range workflows {
history := workflow.History
if history == nil {
return nil, fmt.Errorf("history is nil")
}
}
return workflows, nil
}
func printWorkflow(workflow *export.Workflow) {
// Pretty print the workflow
marshaler := jsonpb.Marshaler{
Indent: "\t",
EmitDefaults: true,
}
Export Feature (User Copy)
9
str, err := marshaler.MarshalToString(workflow.History)
if err != nil {
fmt.Println("error print workflow history: ", err)
os.Exit(1) }
print(str) }
func printAllWorkflows(workflowHistories []*export.Workflow) {
for _, workflow := range workflowHistories {
printWorkflow(workflow)
}
}
func printWorkflowHistory(workflowID string, workflowHistories []*export.Workflow) {
if workflowID == "" {
fmt.Println("invalid workflow ID")
os.Exit(1) }
for _, workflow := range workflowHistories {
if workflow.History.Events[0].GetWorkflowExecutionStartedEventAttributes().WorkflowId
== workflowID {
fmt.Println("Printing workflow history for workflow ID: ", workflowID)
printWorkflow(workflow)
} }
fmt.Println("No workflow found with workflow ID: ", workflowID)
}
func main() {
if len(os.Args) < 2 {
fmt.Println("Please provide a path to a file")
os.Exit(1) }
filename := os.Args[1]
fmt.Println("Deserializing export workflow history from file: ", filename)
workflowHistories, err := extractWorkflowHistoriesFromFile(filename)
if err != nil {
fmt.Println("error extracting workflow histories: ", err)
os.Exit(1)
}
fmt.Println("Successfully deserialized workflow histories")
fmt.Println("Total number of workflow histories: ", len(workflowHistories))
fmt.Println("Choose an option:")
fmt.Println("1. Print out all the workflows")
fmt.Println("2. Print out the workflow hisotry of a specific workflow. Enter the workflow ID:")
var option int
fmt.Print("Enter your choice: ")
_, err = fmt.Scanf("%d", &option)
if err != nil {
fmt.Println("invalid input.")
os.Exit(1) }
switch option {
case 1:
printAllWorkflows(workflowHistories)
case 2:
fmt.Println("Please provide a workflow ID:")
var workflowID string
_, err = fmt.Scanf("%s", &workflowID)
if err != nil {
fmt.Println("invalid input for workflow ID.")
os.Exit(1) }
printWorkflowHistory(workflowID, workflowHistories)
default:
fmt.Println("only options 1 and 2 are supported.")
os.Exit(1) }
}