Subscribe to a Topic's Messages

Subscribing to messages on a topic with the Go CDK takes three steps:

  1. Open the subscription with the Pub/Sub provider of your choice (once per subscription).
  2. Receive messages from the topic.
  3. For each message, acknowledge its receipt using the Ack method after completing any work related to the message. This will prevent the message from being redelivered.

The last two steps are the same across all providers because the first step creates a value of the portable *pubsub.Subscription type. A simple subscriber that operates on messages serially looks like this:

// Loop on received messages.
for {
	msg, err := subscription.Receive(ctx)
	if err != nil {
		// Errors from Receive indicate that Receive will no longer succeed.
		log.Printf("Receiving message: %v", err)
		break
	}
	// Do work based on the message, for example:
	fmt.Printf("Got message: %q\n", msg.Body)
	// Messages must always be acknowledged with Ack.
	msg.Ack()
}

If you want your subscriber to operate on the incoming messages concurrently, you can start multiple goroutines:

// Loop on received messages. We can use a channel as a semaphore to limit how
// many goroutines we have active at a time as well as wait on the goroutines
// to finish before exiting.
const maxHandlers = 10
sem := make(chan struct{}, maxHandlers)
recvLoop:
for {
	msg, err := subscription.Receive(ctx)
	if err != nil {
		// Errors from Receive indicate that Receive will no longer succeed.
		log.Printf("Receiving message: %v", err)
		break
	}

	// Wait if there are too many active handle goroutines and acquire the
	// semaphore. If the context is canceled, stop waiting and start shutting
	// down.
	select {
	case sem <- struct{}{}:
	case <-ctx.Done():
		break recvLoop
	}

	// Handle the message in a new goroutine.
	go func() {
		defer func() { <-sem }() // Release the semaphore.
		defer msg.Ack()          // Messages must always be acknowledged with Ack.

		// Do work based on the message, for example:
		fmt.Printf("Got message: %q\n", msg.Body)
	}()
}

// We're no longer receiving messages. Wait to finish handling any
// unacknowledged messages by totally acquiring the semaphore.
for n := 0; n < maxHandlers; n++ {
	sem <- struct{}{}
}

Note that the semantics of message delivery can vary by provider.

The rest of this guide will discuss how to accomplish the first step: opening a subscription for your chosen Pub/Sub provider.

Constructors versus URL openers🔗

The easiest way to open a subscription is using pubsub.OpenSubscription and a URL pointing to the topic, making sure you “blank import” the driver package to link it in. See Concepts: URLs for more details. If you need fine-grained control over the connection settings, you can call the constructor function in the driver package directly (like gcppubsub.OpenSubscription). This guide will show how to use both forms for each pub/sub provider.

Amazon Simple Queueing Service🔗

The Go CDK can subscribe to an Amazon Simple Queueing Service (SQS) topic. SQS URLs closely resemble the the queue URL, except the leading https:// is replaced with awssqs://. You can specify the region query parameter to ensure your application connects to the correct region, but otherwise pubsub.OpenSubscription will use the region found in the environment variables or your AWS CLI configuration.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/awssnssqs"
)

// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will open the subscription with the URL
// "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue".
subscription, err := pubsub.OpenSubscription(ctx,
	"awssqs://sqs.us-east-2.amazonaws.com/123456789012/"+
		"myqueue?region=us-east-2")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

If your messages are being sent to SQS directly, or if they are being delivered via an SNS topic with RawMessageDelivery enabled, set a raw=true query parameter in your URL, or set SubscriberOptions.Raw to true if you’re using the constructors. By default, the subscription will use heuristics to identify whether the message bodies are raw or SNS JSON.

Messages with a base64encoded message attribute will be automatically Base64 decoded before being returned. See the SNS publishing guide or the [SQS publshing guide][] for more details.

Amazon Simple Queueing Service Constructor🔗

The awssnssqs.OpenSubscription constructor opens an SQS queue. You must first create an AWS session with the same region as your topic:

import (
	"context"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"gocloud.dev/pubsub/awssnssqs"
)

// Establish an AWS session.
// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
// The region must match the region for "MyQueue".
sess, err := session.NewSession(&aws.Config{
	Region: aws.String("us-east-2"),
})
if err != nil {
	return err
}

// Construct a *pubsub.Subscription.
// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
subscription := awssnssqs.OpenSubscription(ctx, sess, queueURL, nil)
defer subscription.Shutdown(ctx)

Google Cloud Pub/Sub🔗

The Go CDK can receive messages from a Google Cloud Pub/Sub subscription. The URLs use the project ID and the subscription ID. pubsub.OpenSubscription will use Application Default Credentials.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/gcppubsub"
)

subscription, err := pubsub.OpenSubscription(ctx,
	"gcppubsub://projects/my-project/subscriptions/my-subscription")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

Google Cloud Pub/Sub Constructor🔗

The gcppubsub.OpenSubscription constructor opens a Cloud Pub/Sub subscription. You must first obtain GCP credentials and then create a gRPC connection to Cloud Pub/Sub. (This gRPC connection can be reused among subscriptions.)

import (
	"context"

	"gocloud.dev/gcp"
	"gocloud.dev/pubsub/gcppubsub"
)

// Your GCP credentials.
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
	return err
}

// Open a gRPC connection to the GCP Pub/Sub API.
conn, cleanup, err := gcppubsub.Dial(ctx, creds.TokenSource)
if err != nil {
	return err
}
defer cleanup()

// Construct a SubscriberClient using the connection.
subClient, err := gcppubsub.SubscriberClient(ctx, conn)
if err != nil {
	return err
}
defer subClient.Close()

// Construct a *pubsub.Subscription.
subscription, err := gcppubsub.OpenSubscriptionByPath(
	subClient, "projects/myprojectID/subscriptions/example-subscription", nil)
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

Azure Service Bus🔗

The Go CDK can recieve messages from an Azure Service Bus subscription over AMQP 1.0. The URL for subscribing is the topic name with the subscription name in the subscription query parameter. pubsub.OpenSubscription will use the environment variable SERVICEBUS_CONNECTION_STRING to obtain the Service Bus Connection String you need to copy from the Azure portal.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/azuresb"
)

// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will open the subscription "mysubscription" for the topic
// "mytopic" using a connection string from the environment variable
// SERVICEBUS_CONNECTION_STRING.
subscription, err := pubsub.OpenSubscription(ctx,
	"azuresb://mytopic?subscription=mysubscription")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

Azure Service Bus Constructor🔗

The azuresb.OpenSubscription constructor opens an Azure Service Bus subscription. You must first connect to the topic and subscription using the Azure Service Bus library and then pass the subscription to azuresb.OpenSubscription. There are also helper functions in the azuresb package to make this easier.

import (
	"context"
	"os"

	"gocloud.dev/pubsub/azuresb"
)

// Change these as needed for your application.
serviceBusConnString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
const topicName = "test-topic"
const subscriptionName = "test-subscription"

// Connect to Azure Service Bus for the given subscription.
busNamespace, err := azuresb.NewNamespaceFromConnectionString(serviceBusConnString)
if err != nil {
	return err
}
busTopic, err := azuresb.NewTopic(busNamespace, topicName, nil)
if err != nil {
	return err
}
defer busTopic.Close(ctx)
busSub, err := azuresb.NewSubscription(busTopic, subscriptionName, nil)
if err != nil {
	return err
}
defer busSub.Close(ctx)

// Construct a *pubsub.Subscription.
subscription, err := azuresb.OpenSubscription(ctx,
	busNamespace, busTopic, busSub, nil)
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

RabbitMQ🔗

The Go CDK can receive messages from an AMQP 0.9.1 queue, the dialect of AMQP spoken by RabbitMQ. A RabbitMQ URL only includes the queue name. The RabbitMQ’s server is discovered from the RABBIT_SERVER_URL environment variable (which is something like amqp://guest:guest@localhost:5672/).

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/rabbitpubsub"
)

// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will Dial the RabbitMQ server at the URL in the environment
// variable RABBIT_SERVER_URL and open the queue "myqueue".
subscription, err := pubsub.OpenSubscription(ctx, "rabbit://myqueue")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

RabbitMQ Constructor🔗

The rabbitpubsub.OpenSubscription constructor opens a RabbitMQ queue. You must first create an *amqp.Connection to your RabbitMQ instance.

import (
	"context"

	"github.com/streadway/amqp"
	"gocloud.dev/pubsub/rabbitpubsub"
)

rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
	return err
}
defer rabbitConn.Close()
subscription := rabbitpubsub.OpenSubscription(rabbitConn, "myqueue", nil)
defer subscription.Shutdown(ctx)

NATS🔗

The Go CDK can publish to a NATS subject. A NATS URL only includes the subject name. The NATS server is discovered from the NATS_SERVER_URL environment variable (which is something like nats://nats.example.com).

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/natspubsub"
)

// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and receive messages with subject "example.mysubject".
subscription, err := pubsub.OpenSubscription(ctx, "nats://example.mysubject")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

NATS guarantees at-most-once delivery; it will never redeliver a message. Therefore, Message.Ack is a no-op.

To parse messages published via the Go CDK, the NATS driver will first attempt to decode the payload using gob. Failing that, it will return the message payload as the Data with no metadata to accomodate subscribing to messages coming from a source not using the Go CDK.

NATS Constructor🔗

The natspubsub.OpenSubscription constructor opens a NATS subject as a topic. You must first create an *nats.Conn to your NATS instance.

import (
	"context"

	"github.com/nats-io/nats.go"
	"gocloud.dev/pubsub/natspubsub"
)

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	return err
}
defer natsConn.Close()

subscription, err := natspubsub.OpenSubscription(
	natsConn,
	"example.mysubject",
	nil)
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

Kafka🔗

The Go CDK can receive messages from a Kafka cluster. A Kafka URL includes the consumer group name, plus at least one instance of a query parameter specifying the topic to subscribe to. The brokers in the Kafka cluster are discovered from the KAFKA_BROKERS environment variable (which is a comma-delimited list of hosts, something like 1.2.3.4:9092,5.6.7.8:9092).

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/kafkapubsub"
)

// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
// The host + path are used as the consumer group name.
// The "topic" query parameter sets one or more topics to subscribe to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
subscription, err := pubsub.OpenSubscription(ctx,
	"kafka://my-group?topic=my-topic")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

Kafka Constructor🔗

The kafkapubsub.OpenSubscription constructor creates a consumer in a consumer group, subscribed to one or more topics.

In addition to the list of brokers, you’ll need a *sarama.Config, which exposes many knobs that can affect performance and semantics; review and set them carefully. kafkapubsub.MinimalConfig provides a minimal config to get you started.

import (
	"context"

	"gocloud.dev/pubsub/kafkapubsub"
)

// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()

// Construct a *pubsub.Subscription, joining the consumer group "my-group"
// and receiving messages from "my-topic".
subscription, err := kafkapubsub.OpenSubscription(
	addrs, config, "my-group", []string{"my-topic"}, nil)
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

In-Memory🔗

The Go CDK includes an in-memory Pub/Sub provider useful for local testing. The names in mem:// URLs are a process-wide namespace, so subscriptions to the same name will receive messages posted to that topic. For instance, if you open a topic mem://topicA and open two subscriptions with mem://topicA, you will have two subscriptions to the same topic.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/mempubsub"
)

// Create a topic.
topic, err := pubsub.OpenTopic(ctx, "mem://topicA")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

// Create a subscription connected to that topic.
subscription, err := pubsub.OpenSubscription(ctx, "mem://topicA")
if err != nil {
	return err
}
defer subscription.Shutdown(ctx)

In-Memory Constructor🔗

To create a subscription to an in-memory Pub/Sub topic, pass the topic you created into the mempubsub.NewSubscription function. You will also need to pass an acknowledgement deadline: once a message is received, if it is not acknowledged after the deadline elapses, then it will be redelivered.

import (
	"context"
	"time"

	"gocloud.dev/pubsub/mempubsub"
)

// Construct a *pubsub.Topic.
topic := mempubsub.NewTopic()
defer topic.Shutdown(ctx)

// Construct a *pubsub.Subscription for the topic.
subscription := mempubsub.NewSubscription(topic, 1*time.Minute /* ack deadline */)
defer subscription.Shutdown(ctx)