Implementing NATS JetStream as Queues in Golang
Dive into implementing NATS JetStream as queues in Golang with this guide. Learn essential concepts and techniques for building scalable distributed systems.
Join the DZone community and get the full member experience.
Join For FreeIn recent years, the increasing demand for efficient and scalable distributed systems has driven the development and adoption of various message queuing solutions. These solutions enable the decoupling of components within distributed architectures, ensuring fault tolerance and load balancing.
Recently, we faced the challenge of selecting a message queue system for a new project in our microservice architecture. After conducting extensive research and evaluation, we chose NATS JetStream. This decision led us to explore the integration of NATS JetStream with Golang, which ultimately served as the basis for this article.
This article aims to comprehensively understand implementing NATS JetStream as a queue in Golang, covering fundamental concepts, methodologies, and advanced techniques.
Fundamentals of Message Queue Systems
First, we need to understand message queue systems. At their core, these systems are designed to facilitate communication between distributed components within a software architecture. They enable the exchange of messages containing information or commands between different components in a decoupled manner. This decoupling allows for enhanced fault tolerance, scalability, and maintainability, as components can function independently without direct knowledge of one another.
Message queue systems typically implement the publish-subscribe pattern or point-to-point communication, ensuring that messages are delivered to the appropriate recipients. Key aspects of any message queue system include message persistence, guaranteed delivery, and the ability to handle varying workloads.
History of NATS
Today, NATS JetStream is an excellent implementation of a message queue system, but this was not the case several years ago. It has evolved significantly since its inception, transitioning from a simple publish-subscribe system to a more feature-rich, resilient, and scalable messaging solution. This journey consists of three main stages: NATS as a Pub/Sub system, NATS Streaming, and NATS JetStream.
NATS as a Pub/Sub System
NATS was created in 2010. It was designed as a lightweight, high-performance publish-subscribe system in its early days. As a result, it was well-suited for real-time message passing, offering low-latency communication and ease of use. However, it lacked essential features like message persistence, durability, and guaranteed delivery. These limitations made NATS less suitable for scenarios that required reliable message delivery and storage.
NATS Streaming
NATS Streaming was introduced to address the original NATS system's limitations. This new iteration brought features such as message persistence, at-least-once delivery guarantees, and support for message replay. While NATS Streaming significantly improved the reliability and capability of NATS, it had some drawbacks:
- Inability to remove acknowledged messages from the system
- The lack of "Nak" (not acknowledge) support for messages
- Clients being restricted to only receiving pushed messages without the option to pull messages
- A lack of horizontal scalability.
The drawbacks led the community to create a new implementation of NATS that has a built-in mechanism called "JetStream."
NATS JetStream
NATS JetStream combines the strengths of both NATS and NATS Streaming while addressing their weaknesses. The main features of NATS JetStream include:
- Streams, consumers, and messages: Enhances the messaging solution for better organization and management of message flows
- Push and pull subscriptions: Allows clients to consume messages at their own pace
- NAck (not acknowledge) with optional delay: Provides better message handling control
- Event streaming capabilities: Supports event-driven architectures, similar to Apache Kafka
- Durability, replication, and clustering: Ensures message persistence, high availability, and load balancing
- Full integration with the NATS ecosystem: Simplifies deployment and management
Let us consider NATS JetStream basics concepts before we use it in a Golang app.
NATS JetStream: Theoretical Background
As previously mentioned, NATS JetStream is built upon the foundation of three primary concepts: streams, consumers, and messages. In essence, streams serve as channels that store a sequence of messages, facilitating the organization and management of message flows. Furthermore, each stream can have multiple subjects, allowing for the categorization of messages based on their content or purpose. Therefore, streams can be considered queues when viewing NATS JetStream as a message queue system.
Each stream (or queue) is composed of messages. These messages contain the information or commands to be exchanged between distributed components. Publishers send messages to streams, and recipients subsequently consume them.
These recipients are known as "consumers." They are responsible for reading and processing messages from subjects. Consumers can be considered subscribers in the publish-subscribe pattern. They receive and act upon messages that match their configured subjects. Consumers can also acknowledge the receipt and processing of messages, which helps manage message delivery and ensures that messages are successfully consumed.
We are now well-equipped to delve into the integration with NATS JetStream. The following sections will explore the practical aspects of implementing NATS JetStream publishers and consumers using the Golang programming language.
Sending Messages With NATS JetStream in Golang
In this section, we will first explore the implementation of NATS JetStream publishers in Golang before moving on to consumers.
First, connect to the NATS JetStream server using the "nats.go" library. To do that, import the NATS package in a Golang file. Then, use the nats.Connect()
function to establish a connection, specifying the appropriate server address and options.
import (
"github.com/nats-io/nats.go"
)
func connectToNATS() (*nats.Conn, error) {
nc, err := nats.Connect("nats://nats:4222")
if err != nil {
return nil, fmt.Errorf("nats connect: %w", err)
}
return nc, nil
}
With a connection in place, the next step is to create a JetStream context, which can be done using the nats.JetStream()
function. This context will interact with the JetStream system, allowing for the creation of streams and the publishing of messages.
func natsJetStream(nc *nats.Conn) (nats.JetStreamContext, error) {
jsCtx, err := nc.JetStream()
if err != nil {
return nil, fmt.Errorf("jetstream: %w", err)
}
return jsCtx, nil
}
Once the JetStream context is acquired, it is essential to ensure that the desired stream exists or create one if necessary. The jsCtx.AddStream()
method can specify the stream's configuration, including its name, subjects, storage type, and retention policy. It is important to note that stream creation is typically an administrative task and may only be required in some publisher implementations.
func createStream(ctx context.Context, jsCtx nats.JetStreamContext) (*nats.StreamInfo, error) {
stream, err := jsCtx.AddStream(&nats.StreamConfig{
Name: "test_stream",
Subjects: []string{"subject.1", "subject.2", "subject.N"},
Retention: nats.InterestPolicy, // remove acked messages
Discard: nats.DiscardOld, // when the stream is full, discard old messages
MaxAge: 7 * 24 * time.Hour, // max age of stored messages is 7 days
Storage: nats.FileStorage, // type of message storage
MaxMsgsPerSubject: 100_000_000, // max stored messages per subject
MaxMsgSize: 4 << 20, // max single message size is 4 MB
NoAck: false, // we need the "ack" system for the message queue system
}, nats.Context(ctx))
if err != nil {
return nil, fmt.Errorf("add stream: %w", err)
}
return stream, nil
}
With the stream in place, publishing messages to NATS JetStream is straightforward. The nats.Publish()
method can be called on the JetStream context, providing the subject and payload for the message.
func publishMsg(nc *nats.Conn, subject string, payload []byte) error {
err := nc.Publish(subject, payload)
if err != nil {
return fmt.Errorf("publish: %w", err)
}
return nil
}
We have successfully published a message to NATS JetStream, where it is stored in the specified stream. It is time to implement a consumer to receive and process the message.
Consuming Messages With NATS JetStream in Golang Using Pull Subscriptions
This section will discuss the process of consuming messages from NATS JetStream using the Golang programming language and pulling subscriptions, providing a comprehensive understanding of the necessary steps.
First, ensure a connection to the NATS JetStream server is established using the "nats.go" library, as described in the previous chapter. Then, with a connection in place, create a JetStream context using the nats.JetStream()
function, allowing you to interact with the JetStream system and consume messages from streams.
Next, it is crucial to configure and create a consumer for the desired stream. The jsCtx.AddConsumer()
method can specify the consumer's configuration, such as its name, durable name, and acknowledgment policy.
func createConsumer(ctx context.Context, jsCtx nats.JetStreamContext, consumerGroupName, streamName string) (*nats.ConsumerInfo, error) {
consumer, err := jsCtx.AddConsumer(streamName, &nats.ConsumerConfig{
Durable: consumerGroupName, // durable name is the same as consumer group name
DeliverPolicy: nats.DeliverAllPolicy, // deliver all messages, even if they were sent before the consumer was created
AckPolicy: nats.AckExplicitPolicy, // ack messages manually
AckWait: 5 * time.Second, // wait for ack for 5 seconds
MaxAckPending: -1, // unlimited number of pending acks
}, nats.Context(ctx))
if err != nil {
return nil, fmt.Errorf("add consumer: %w", err)
}
return consumer, nil
}
Once the consumer is created and configured, the next step is subscribing to the relevant subject in the stream. For the scope of this article, we will focus on pull subscriptions, which allow for more direct control over message delivery.
func subscribe(ctx context.Context, js nats.JetStreamContext, subject, consumerGroupName, streamName string) (*nats.Subscription, error) {
pullSub, err := js.PullSubscribe(
subject,
consumerGroupName,
nats.ManualAck(), // ack messages manually
nats.Bind(streamName, consumerGroupName), // bind consumer to the stream
nats.Context(ctx), // use context to cancel the subscription
)
if err != nil {
return nil, fmt.Errorf("pull subscribe: %w", err)
}
return pullSub, nil
}
To consume messages, use the Fetch()
method on the Subscription()
object. This method will return the following available message from the stream, allowing an application to process it accordingly. It is crucial to acknowledge the message using the msg.Ack()
method; otherwise, it will be redelivered continuously.
func fetchOne(ctx context.Context, pullSub *nats.Subscription) (*nats.Msg, error) {
msgs, err := pullSub.Fetch(1, nats.Context(ctx))
if err != nil {
return nil, fmt.Errorf("fetch: %w", err)
}
if len(msgs) == 0 {
return nil, errors.New("no messages")
}
return msgs[0], nil
}
We considered the essential way of integration with NATS JetStream. In the following sections, we will explore some advanced features of NATS JetStream to enhance the system's capabilities further.
Delving Deeper: Advanced NATS JetStream Features in Golang
As we continue our exploration of NATS JetStream integration with Golang, we must understand some advanced concepts that can improve the reliability and durability of distributed systems.
Handling NATS Reconnections in Golang
In real-world scenarios, network disruptions and server failures can lead to connection losses between the client and the NATS server. Therefore, it is crucial to handle reconnections gracefully. The "nats.go" library provides built-in reconnection support, allowing you to specify a custom reconnection logic. Configuring these options allows a Golang application to recover from connection losses and ensure continued operation.
func SetReconnectionHandler(nc *nats.Conn) {
nc.SetReconnectHandler(func(nc *nats.Conn) {
// Update the connection in the publisher and consumers
...
})
}
Stream and Consumer Durability
Durability is an essential aspect of ensuring message persistence and reliable delivery. In NATS JetStream, both streams and consumers can be made durable by specifying a durable name when creating them. This feature ensures that their state is preserved across restarts or failures, allowing messages to be stored reliably and consumers to resume processing messages from where they left off.
To create a durable stream, provide a name in the stream configuration. Also, for a durable consumer, specify a durable name in the consumer configuration. The steps configure NATS JetStream to store the state of these entities, making them resilient to failures and ensuring reliable message delivery.
...
stream, err := jsCtx.AddStream(&nats.StreamConfig{
...
Name: "test_stream",
Subjects: []string{"subject.1", "subject.2", "subject.N"},
MaxAge: 7 * 24 * time.Hour, // max age of stored messages is 7 days
Storage: nats.FileStorage, // type of message storage
...
}, nats.Context(ctx))
...
consumer, err := jsCtx.AddConsumer(streamName, &nats.ConsumerConfig{
Durable: consumerGroupName, // durable name is the same as consumer group name
...
}, nats.Context(ctx))
...
Incorporating these advanced concepts into NATS JetStream implementation with Golang allows us to create a more resilient and reliable messaging system for our distributed architecture.
Conclusion
In conclusion, NATS JetStream offers a robust, scalable, and feature-rich message queue solution for distributed systems. By integrating it with Golang, developers can harness its capabilities to create resilient, high-performance applications. This article provides a comprehensive understanding of essential concepts, methodologies, and advanced techniques for implementing NATS JetStream as queues in Golang. By delving into the theoretical background, connecting to NATS JetStream, publishing and consuming messages, and exploring advanced features like reconnections and durability, developers can efficiently build and maintain reliable distributed systems that effectively address modern architectural challenges.
Opinions expressed by DZone contributors are their own.
Comments