Publish Messages to a Topic

Publishing a message to a topic with the Go CDK takes two steps:

  1. Open the topic with the Pub/Sub provider of your choice (once per topic).
  2. Send messages on the topic.

The second step is the same across all providers because the first step creates a value of the portable *pubsub.Topic type. Publishing looks like this:

err := topic.Send(ctx, &pubsub.Message{
	Body: []byte("Hello, World!\n"),
	Metadata: map[string]string{
		// These are examples of metadata.
		// There is nothing special about the key names.
		"language":   "en",
		"importance": "high",
	},
})
if err != nil {
	return err
}

Note that the semantics of message delivery can vary by provider.

The rest of this guide will discuss how to accomplish the first step: opening a topic for your chosen Pub/Sub provider.

Constructors versus URL openers🔗

The easiest way to open a topic is using pubsub.OpenTopic and a URL pointing to the topic, making sure you “blank import” the driver package to link it in. See Concepts: URLs for more details. If you need fine-grained control over the connection settings, you can call the constructor function in the driver package directly (like gcppubsub.OpenTopic). This guide will show how to use both forms for each pub/sub provider.

Amazon Simple Notification Service🔗

The Go CDK can publish to an Amazon Simple Notification Service (SNS) topic. SNS URLs in the Go CDK use the Amazon Resource Name (ARN) to identify the topic. You can specify the region query parameter to ensure your application connects to the correct region, but otherwise pubsub.OpenTopic will use the region found in the environment variables or your AWS CLI configuration.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/awssnssqs"
)

const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
topic, err := pubsub.OpenTopic(ctx, "awssns://"+topicARN+"?region=us-east-2")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

SNS messages are restricted to UTF-8 clean payloads. If your application sends a message that contains non-UTF-8 bytes, then the Go CDK will automatically Base64 encode the message and add a base64encoded message attribute. When subscribing to messages on the topic through the Go CDK, these will be automatically Base64 decoded, but if you are receiving messages from a topic in a program that does not use the Go CDK, you may need to manually Base64 decode the message payload.

Amazon Simple Notification Service Constructor🔗

The awssnssqs.OpenSNSTopic constructor opens an SNS topic. You must first create an AWS session with the same region as your topic:

import (
	"context"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"gocloud.dev/pubsub/awssnssqs"
)

// Establish an AWS session.
// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
// The region must match the region for the SNS topic "mytopic".
sess, err := session.NewSession(&aws.Config{
	Region: aws.String("us-east-2"),
})
if err != nil {
	return err
}

// Create a *pubsub.Topic.
const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
topic := awssnssqs.OpenSNSTopic(ctx, sess, topicARN, nil)
defer topic.Shutdown(ctx)

Amazon Simple Notification Service🔗

The Go CDK can publish to an Amazon Simple Queue Service (SQS) topic. SQS URLs closely resemble the the queue URL, except the leading https:// is replaced with awssqs://. You can specify the region query parameter to ensure your application connects to the correct region, but otherwise pubsub.OpenTopic will use the region found in the environment variables or your AWS CLI configuration.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/awssnssqs"
)

// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
topic, err := pubsub.OpenTopic(ctx, "awssqs://"+queueURL+"?region=us-east-2")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

SQS messages are restricted to UTF-8 clean payloads. If your application sends a message that contains non-UTF-8 bytes, then the Go CDK will automatically Base64 encode the message and add a base64encoded message attribute. When subscribing to messages on the topic through the Go CDK, these will be automatically Base64 decoded, but if you are receiving messages from a topic in a program that does not use the Go CDK, you may need to manually Base64 decode the message payload.

Amazon Simple Queue Service Constructor🔗

The awssnssqs.OpenSQSTopic constructor opens an SQS topic. You must first create an AWS session with the same region as your topic:

import (
	"context"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"gocloud.dev/pubsub/awssnssqs"
)

// Establish an AWS session.
// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
// The region must match the region for the SQS queue "myqueue".
sess, err := session.NewSession(&aws.Config{
	Region: aws.String("us-east-2"),
})
if err != nil {
	return err
}

// Create a *pubsub.Topic.
const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
topic := awssnssqs.OpenSQSTopic(ctx, sess, queueURL, nil)
defer topic.Shutdown(ctx)

Google Cloud Pub/Sub🔗

The Go CDK can publish to a Google Cloud Pub/Sub topic. The URLs use the project ID and the topic ID. pubsub.OpenTopic will use Application Default Credentials.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/gcppubsub"
)

topic, err := pubsub.OpenTopic(ctx, "gcppubsub://projects/myproject/topics/mytopic")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Google Cloud Pub/Sub Constructor🔗

The gcppubsub.OpenTopic constructor opens a Cloud Pub/Sub topic. You must first obtain GCP credentials and then create a gRPC connection to Cloud Pub/Sub. (This gRPC connection can be reused among topics.)

import (
	"context"

	"gocloud.dev/gcp"
	"gocloud.dev/pubsub/gcppubsub"
)

// Your GCP credentials.
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
	return err
}
// Open a gRPC connection to the GCP Pub/Sub API.
conn, cleanup, err := gcppubsub.Dial(ctx, creds.TokenSource)
if err != nil {
	return err
}
defer cleanup()

// Construct a PublisherClient using the connection.
pubClient, err := gcppubsub.PublisherClient(ctx, conn)
if err != nil {
	return err
}
defer pubClient.Close()

// Construct a *pubsub.Topic.
topic, err := gcppubsub.OpenTopicByPath(pubClient, "projects/myprojectID/topics/example-topic", nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Azure Service Bus🔗

The Go CDK can publish to an Azure Service Bus topic over AMQP 1.0. The URL for publishing is the topic name. pubsub.OpenTopic will use the environment variable SERVICEBUS_CONNECTION_STRING to obtain the Service Bus connection string. The connection string can be obtained from the Azure portal.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/azuresb"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will open the topic "mytopic" using a connection string
// from the environment variable SERVICEBUS_CONNECTION_STRING.
topic, err := pubsub.OpenTopic(ctx, "azuresb://mytopic")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Azure Service Bus Constructor🔗

The azuresb.OpenTopic constructor opens an Azure Service Bus topic. You must first connect to the topic using the Azure Service Bus library and then pass it to azuresb.OpenTopic. There are also helper functions in the azuresb package to make this easier.

import (
	"context"
	"os"

	"gocloud.dev/pubsub/azuresb"
)

// Change these as needed for your application.
connString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
topicName := "test-topic"

if connString == "" {
	log.Fatal("Service Bus ConnectionString is not set")
}

// Connect to Azure Service Bus for the given topic.
busNamespace, err := azuresb.NewNamespaceFromConnectionString(connString)
if err != nil {
	return err
}
busTopic, err := azuresb.NewTopic(busNamespace, topicName, nil)
if err != nil {
	return err
}
defer busTopic.Close(ctx)

// Construct a *pubsub.Topic.
topic, err := azuresb.OpenTopic(ctx, busTopic, nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

RabbitMQ🔗

The Go CDK can publish to an AMQP 0.9.1 fanout exchange, the dialect of AMQP spoken by RabbitMQ. A RabbitMQ URL only includes the exchange name. The RabbitMQ’s server is discovered from the RABBIT_SERVER_URL environment variable (which is something like amqp://guest:guest@localhost:5672/).

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/rabbitpubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the RabbitMQ server at the URL in the environment
// variable RABBIT_SERVER_URL and open the exchange "myexchange".
topic, err := pubsub.OpenTopic(ctx, "rabbit://myexchange")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

RabbitMQ Constructor🔗

The rabbitpubsub.OpenTopic constructor opens a RabbitMQ exchange. You must first create an *amqp.Connection to your RabbitMQ instance.

import (
	"context"

	"github.com/streadway/amqp"
	"gocloud.dev/pubsub/rabbitpubsub"
)

rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
	return err
}
defer rabbitConn.Close()
topic := rabbitpubsub.OpenTopic(rabbitConn, "myexchange", nil)
defer topic.Shutdown(ctx)

NATS🔗

The Go CDK can publish to a NATS subject. A NATS URL only includes the subject name. The NATS server is discovered from the NATS_SERVER_URL environment variable (which is something like nats://nats.example.com).

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/natspubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and send messages with subject "example.mysubject".
topic, err := pubsub.OpenTopic(ctx, "nats://example.mysubject")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Because NATS does not natively support metadata, messages sent to NATS will be encoded with gob.

NATS Constructor🔗

The natspubsub.OpenTopic constructor opens a NATS subject as a topic. You must first create an *nats.Conn to your NATS instance.

import (
	"context"

	"github.com/nats-io/nats.go"
	"gocloud.dev/pubsub/natspubsub"
)

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	return err
}
defer natsConn.Close()

topic, err := natspubsub.OpenTopic(natsConn, "example.mysubject", nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Kafka🔗

The Go CDK can publish to a Kafka cluster. A Kafka URL only includes the topic name. The brokers in the Kafka cluster are discovered from the KAFKA_BROKERS environment variable (which is a comma-delimited list of hosts, something like 1.2.3.4:9092,5.6.7.8:9092).

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/kafkapubsub"
)

// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// The host + path are the topic name to send to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

Kafka Constructor🔗

The kafkapubsub.OpenTopic constructor opens a Kafka topic to publish messages to. Depending on your Kafka cluster configuration (see auto.create.topics.enable), you may need to provision the topic beforehand.

In addition to the list of brokers, you’ll need a *sarama.Config, which exposes many knobs that can affect performance and semantics; review and set them carefully. kafkapubsub.MinimalConfig provides a minimal config to get you started.

import (
	"context"

	"gocloud.dev/pubsub/kafkapubsub"
)

// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()

// Construct a *pubsub.Topic.
topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

In-Memory🔗

The Go CDK includes an in-memory Pub/Sub provider useful for local testing. The names in mem:// URLs are a process-wide namespace, so subscriptions to the same name will receive messages posted to that topic. This is detailed more in the subscription guide.

import (
	"context"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/mempubsub"
)

topic, err := pubsub.OpenTopic(ctx, "mem://topicA")
if err != nil {
	return err
}
defer topic.Shutdown(ctx)

In-Memory Constructor🔗

To create an in-memory Pub/Sub topic, use the mempubsub.NewTopic function. You can use the returned topic to create in-memory subscriptions, as detailed in the subscription guide.

import (
	"context"

	"gocloud.dev/pubsub/mempubsub"
)

topic := mempubsub.NewTopic()
defer topic.Shutdown(ctx)