Publish Messages to a Topic
Publishing a message to a topic with the Go CDK takes two steps:
- Open a topic with the Pub/Sub provider of your choice (once per topic).
- Send messages on the topic.
Opening a Topic🔗
The first step in publishing messages to a topic is to instantiate a
portable *pubsub.Topic
for your service.
The easiest way to do so is to use pubsub.OpenTopic
and a service-specific URL
pointing to the topic, making sure you “blank import” the driver package to
link it in.
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/<driver>"
)
...
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "<driver-url>")
if err != nil {
return fmt.Errorf("could not open topic: %v", err)
}
defer topic.Shutdown(ctx)
// topic is a *pubsub.Topic; see usage below
...
See Concepts: URLs for general background and the guide below for URL usage for each supported service.
Alternatively, if you need fine-grained
control over the connection settings, you can call the constructor function in
the driver package directly (like gcppubsub.OpenTopic
).
import "gocloud.dev/pubsub/<driver>"
...
topic, err := <driver>.OpenTopic(...)
...
You may find the wire
package useful for managing your initialization code
when switching between different backing services.
See the guide below for constructor usage for each supported service.
Sending Messages on a Topic🔗
Sending a message on a Topic looks like this:
err := topic.Send(ctx, &pubsub.Message{
Body: []byte("Hello, World!\n"),
// Metadata is optional and can be nil.
Metadata: map[string]string{
// These are examples of metadata.
// There is nothing special about the key names.
"language": "en",
"importance": "high",
},
})
if err != nil {
return err
}
Note that the semantics of message delivery can vary by backing service.
Other Usage Samples🔗
Supported Pub/Sub Services🔗
Google Cloud Pub/Sub🔗
The Go CDK can publish to a Google Cloud Pub/Sub topic. The URLs use the project ID and the topic ID.
pubsub.OpenTopic
will use Application Default Credentials; if you have
authenticated via gcloud auth application-default login
, it will use those credentials. See
Application Default Credentials to learn about authentication
alternatives, including using environment variables.
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/gcppubsub"
)
topic, err := pubsub.OpenTopic(ctx, "gcppubsub://projects/myproject/topics/mytopic")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
Google Cloud Pub/Sub Constructor🔗
The gcppubsub.OpenTopic
constructor opens a Cloud Pub/Sub topic. You
must first obtain GCP credentials and then create a gRPC
connection to Cloud Pub/Sub. (This gRPC connection can be reused among
topics.)
import (
"context"
"gocloud.dev/gcp"
"gocloud.dev/pubsub/gcppubsub"
)
// Your GCP credentials.
// See https://cloud.google.com/docs/authentication/production
// for more info on alternatives.
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
return err
}
// Open a gRPC connection to the GCP Pub/Sub API.
conn, cleanup, err := gcppubsub.Dial(ctx, creds.TokenSource)
if err != nil {
return err
}
defer cleanup()
// Construct a PublisherClient using the connection.
pubClient, err := gcppubsub.PublisherClient(ctx, conn)
if err != nil {
return err
}
defer pubClient.Close()
// Construct a *pubsub.Topic.
topic, err := gcppubsub.OpenTopicByPath(pubClient, "projects/myprojectID/topics/example-topic", nil)
if err != nil {
return err
}
defer topic.Shutdown(ctx)
Amazon Simple Notification Service🔗
The Go CDK can publish to an Amazon Simple Notification Service (SNS)
topic. SNS URLs in the Go CDK use the Amazon Resource Name (ARN) to identify
the topic. You should specify the region
query parameter to ensure your
application connects to the correct region.
pubsub.OpenTopic
will create a default AWS Session with the
SharedConfigEnable
option enabled; if you have authenticated with the AWS CLI,
it will use those credentials. See AWS Session to learn about authentication
alternatives, including using environment variables.
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
)
const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
// Note the 3 slashes; ARNs have multiple colons and therefore aren't valid
// as hostnames in the URL.
topic, err := pubsub.OpenTopic(ctx, "awssns:///"+topicARN+"?region=us-east-2")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
SNS messages are restricted to UTF-8 clean payloads. If your application
sends a message that contains non-UTF-8 bytes, then the Go CDK will
automatically Base64 encode the message and add a base64encoded
message
attribute. When subscribing to messages on the topic through the Go CDK,
these will be automatically Base64 decoded, but if you are
receiving messages from a topic in a program that does not use the Go CDK,
you may need to manually Base64 decode the message payload.
Amazon SNS Constructor🔗
The awssnssqs.OpenSNSTopic
constructor opens an SNS topic. You must first
create an AWS session with the same region as your topic:
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"gocloud.dev/pubsub/awssnssqs"
)
// Establish an AWS session.
// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
// The region must match the region for the SNS topic "mytopic".
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-2"),
})
if err != nil {
return err
}
// Create a *pubsub.Topic.
const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
topic := awssnssqs.OpenSNSTopic(ctx, sess, topicARN, nil)
defer topic.Shutdown(ctx)
Amazon Simple Queue Service🔗
The Go CDK can publish to an Amazon Simple Queue Service (SQS)
topic. SQS URLs closely resemble the the queue URL, except the leading
https://
is replaced with awssqs://
. You can specify the region
query parameter to ensure your application connects to the correct region, but
otherwise pubsub.OpenTopic
will use the region found in the environment
variables or your AWS CLI configuration.
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
)
// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
const queueURL = "sqs.us-east-2.amazonaws.com/123456789012/myqueue"
topic, err := pubsub.OpenTopic(ctx, "awssqs://"+queueURL+"?region=us-east-2")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
SQS messages are restricted to UTF-8 clean payloads. If your application
sends a message that contains non-UTF-8 bytes, then the Go CDK will
automatically Base64 encode the message and add a base64encoded
message
attribute. When subscribing to messages on the topic through the Go CDK,
these will be automatically Base64 decoded, but if you are
receiving messages from a topic in a program that does not use the Go CDK,
you may need to manually Base64 decode the message payload.
Amazon SQS Constructor🔗
The awssnssqs.OpenSQSTopic
constructor opens an SQS topic. You must first
create an AWS session with the same region as your topic:
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"gocloud.dev/pubsub/awssnssqs"
)
// Establish an AWS session.
// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
// The region must match the region for the SQS queue "myqueue".
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-2"),
})
if err != nil {
return err
}
// Create a *pubsub.Topic.
const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
topic := awssnssqs.OpenSQSTopic(ctx, sess, queueURL, nil)
defer topic.Shutdown(ctx)
Azure Service Bus🔗
The Go CDK can publish to an Azure Service Bus topic.
The URL for publishing is the topic name. pubsub.OpenTopic
will use the
environment variable SERVICEBUS_CONNECTION_STRING
to obtain the Service Bus
connection string. The connection string can be obtained
from the Azure portal.
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/azuresb"
)
// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will open the topic "mytopic" using a connection string
// from the environment variable SERVICEBUS_CONNECTION_STRING.
topic, err := pubsub.OpenTopic(ctx, "azuresb://mytopic")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
Azure Service Bus Constructor🔗
The azuresb.OpenTopic
constructor opens an Azure Service Bus topic. You
must first connect to the topic using the Azure Service Bus library and
then pass it to azuresb.OpenTopic
. There are also helper functions in the
azuresb
package to make this easier.
import (
"context"
"os"
"gocloud.dev/pubsub/azuresb"
)
// Change these as needed for your application.
connString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
topicName := "test-topic"
if connString == "" {
log.Fatal("Service Bus ConnectionString is not set")
}
// Connect to Azure Service Bus for the given topic.
sbClient, err := azuresb.NewClientFromConnectionString(connString, nil)
if err != nil {
return err
}
sbSender, err := azuresb.NewSender(sbClient, topicName, nil)
if err != nil {
return err
}
defer sbSender.Close(ctx)
// Construct a *pubsub.Topic.
topic, err := azuresb.OpenTopic(ctx, sbSender, nil)
if err != nil {
return err
}
defer topic.Shutdown(ctx)
RabbitMQ🔗
The Go CDK can publish to an AMQP 0.9.1 fanout exchange, the dialect of
AMQP spoken by RabbitMQ. A RabbitMQ URL only includes the exchange name.
The RabbitMQ’s server is discovered from the RABBIT_SERVER_URL
environment
variable (which is something like amqp://guest:guest@localhost:5672/
).
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/rabbitpubsub"
)
// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the RabbitMQ server at the URL in the environment
// variable RABBIT_SERVER_URL and open the exchange "myexchange".
topic, err := pubsub.OpenTopic(ctx, "rabbit://myexchange")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
RabbitMQ Constructor🔗
The rabbitpubsub.OpenTopic
constructor opens a RabbitMQ exchange. You
must first create an *amqp.Connection
to your RabbitMQ instance.
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"gocloud.dev/pubsub/rabbitpubsub"
)
rabbitConn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
return err
}
defer rabbitConn.Close()
topic := rabbitpubsub.OpenTopic(rabbitConn, "myexchange", nil)
defer topic.Shutdown(ctx)
NATS🔗
The Go CDK can publish to a NATS subject. A NATS URL only includes the
subject name. The NATS server is discovered from the NATS_SERVER_URL
environment variable (which is something like nats://nats.example.com
).
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/natspubsub"
)
// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// This URL will Dial the NATS server at the URL in the environment variable
// NATS_SERVER_URL and send messages with subject "example.mysubject".
topic, err := pubsub.OpenTopic(ctx, "nats://example.mysubject")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
Because NATS does not natively support metadata, messages sent to NATS will be encoded with gob.
NATS Constructor🔗
The natspubsub.OpenTopic
constructor opens a NATS subject as a topic. You
must first create an *nats.Conn
to your NATS instance.
import (
"context"
"github.com/nats-io/nats.go"
"gocloud.dev/pubsub/natspubsub"
)
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
return err
}
defer natsConn.Close()
topic, err := natspubsub.OpenTopic(natsConn, "example.mysubject", nil)
if err != nil {
return err
}
defer topic.Shutdown(ctx)
Kafka🔗
The Go CDK can publish to a Kafka cluster. A Kafka URL only includes the
topic name. The brokers in the Kafka cluster are discovered from the
KAFKA_BROKERS
environment variable (which is a comma-delimited list of
hosts, something like 1.2.3.4:9092,5.6.7.8:9092
).
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/kafkapubsub"
)
// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
// The host + path are the topic name to send to.
// The set of brokers must be in an environment variable KAFKA_BROKERS.
topic, err := pubsub.OpenTopic(ctx, "kafka://my-topic")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
Kafka Constructor🔗
The kafkapubsub.OpenTopic
constructor opens a Kafka topic to publish
messages to. Depending on your Kafka cluster configuration (see
auto.create.topics.enable
), you may need to provision the topic beforehand.
In addition to the list of brokers, you’ll need a *sarama.Config
, which
exposes many knobs that can affect performance and semantics; review and set
them carefully. kafkapubsub.MinimalConfig
provides a minimal config to get
you started.
import (
"context"
"gocloud.dev/pubsub/kafkapubsub"
)
// The set of brokers in the Kafka cluster.
addrs := []string{"1.2.3.4:9092"}
// The Kafka client configuration to use.
config := kafkapubsub.MinimalConfig()
// Construct a *pubsub.Topic.
topic, err := kafkapubsub.OpenTopic(addrs, config, "my-topic", nil)
if err != nil {
return err
}
defer topic.Shutdown(ctx)
In-Memory🔗
The Go CDK includes an in-memory Pub/Sub provider useful for local testing.
The names in mem://
URLs are a process-wide namespace, so subscriptions to
the same name will receive messages posted to that topic. This is detailed
more in the subscription guide.
import (
"context"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/mempubsub"
)
topic, err := pubsub.OpenTopic(ctx, "mem://topicA")
if err != nil {
return err
}
defer topic.Shutdown(ctx)
In-Memory Constructor🔗
To create an in-memory Pub/Sub topic, use the mempubsub.NewTopic
function. You can use the returned topic to create in-memory
subscriptions, as detailed in the subscription guide.
import (
"context"
"gocloud.dev/pubsub/mempubsub"
)
topic := mempubsub.NewTopic()
defer topic.Shutdown(ctx)