Monitoring Kubernetes Service Topology Changes in Real Time
Stateful, distributed, containerized applications require real-time monitoring of cluster changes. Learn how EndpointSlices API provides a scalable solution.
Join the DZone community and get the full member experience.
Join For FreeHorizontally scalable data stores like Elasticsearch, Cassandra, and CockroachDB distribute their data across multiple nodes using techniques like consistent hashing. As nodes are added or removed, the data is reshuffled to ensure that the load is spread evenly across the new set of nodes.
When deployed on bare-metal clusters or cloud VMs, database administrators are responsible for adding and removing nodes in a clustered system, planning the changes at times of low load to minimize disruption to production workloads.
In the world of applications running on container-orchestration platforms like Kubernetes, containers can be terminated or restarted due to frequent deployments, scheduling issues, node failures, network issues, and other unexpected changes. In this article, we’ll dive into how applications maintaining a distributed state can keep track of nodes entering and leaving the cluster.
Workload Management
Applications on Kubernetes run as containers inside pods. Since managing individual pods for distributed applications is cumbersome, Kubernetes provides higher-level abstractions that manage the pod for you.
- Deployments are suitable for running stateless applications like web servers, where all pods are equal and indistinguishable. No pod has a particular identity and can be freely replaced.
- StatefulSets are better suited for stateful applications, where pods are expected to have a distinct identity and are not freely interchangeable.
StatefulSets are the appropriate workload management choice for our particular example of a data store application. Additionally, creating a Kubernetes Service will expose the distributed application to clients as a single, named network application.
Tracking Pod Lifecycles Within a Service
You have a StatefulSet up and running with a Service that clients can talk to, which finally brings us to the main problem of tracking pod creation: deletion and modification in real-time.
Kubernetes assigns a stable hostname to each pod in a StatefulSet. A major drawback to using the hostname to address pods directly is that DNS results are cached, including results of failed requests made against unavailable pods.
Enter the EndpointSlices API, which provides a scalable tracking of network endpoints within a Kubernetes cluster. With the appropriate selectors, this can be used to track the lifecycle of pods within a specific Service, as long as the Service has a selector specified.
Specifically, the application can invoke the List and Watch API endpoints.
First, the application creates a Kubernetes API client.
import (
...
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
...
)
func main() {
ctx, cancel := base.InitContext()
defer cancel()
// Create a kubernetes API clientset
k8sRestConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("error getting in-cluster config: %v", err)
}
k8sClient, err := kubernetes.NewForConfig(k8sRestConfig)
if err != nil {
log.Fatalf("error creating k8s client: %v", err)
}
Next, the application invokes the List
endpoint to retrieve the currently available backends. Note that the app label is the value of the Service selector label. This ensures that the List and Watch APIs target the pods in our specific Service.
appLabel := "your-service-label"
podNamespace := "your-service-namespace"
// List the current service endpoints.
endpointSlice, err := k8sClient.DiscoveryV1().EndpointSlices(podNamespace).List(ctx, metav1.ListOptions{
LabelSelector: appLabel,
Watch: true,
})
if err != nil {
log.Fatalf("error listing endpoint slices: %v", err)
}
Now that the application has the current set of addresses, it can monitor for changes from this state by invoking the Watch API. This is done in a background loop to allow the application to proceed with the rest of its work. The List API provides a checkpoint called ResourceVersion
that we can instruct the Watch to start observing events from.
The Watch API pushes the results into the ResultChan
channel.
// ResourceVersion is like a checkpoint that we're instructing the Watch to start from.
resourceVersion := endpointSlice.ResourceVersion
go func() {
// Start a watch on the service endpoints
watch, err := k8sClient.DiscoveryV1().EndpointSlices(podNamespace).Watch(ctx, metav1.ListOptions{
LabelSelector: appLabel,
Watch: true,
// Start watching from the appropriate resource version
ResourceVersion: resourceVersion,
})
if err != nil {
log.Fatalf("error watching endpoint slices: %v", err)
}
// Loop until the context is done
for {
select {
case <-ctx.Done():
break
case event := <-watch.ResultChan():
handleWatchEvent(watch);
}
}
}()
// Start the server / do other work
}
The application can then handle the Watch events appropriately.
func handleWatchEvent(event watch.Event) {
// Cast the event into an EndpointSlice event
endpointSlice, ok := event.Object.(*discovery.EndpointSlice)
if !ok {
log.Fatalf("unexpected event object")
}
// From https://pkg.go.dev/k8s.io/apimachinery/pkg/watch#Event
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
switch event.Type {
case watch.Added:
// Handle Added event
case watch.Modified:
// Handle Modified event
case watch.Deleted:
// Handle Deleted event
case watch.Error:
// Handle Error event
}
}
It's important to note from the docs that the Added
and Modified
events send the full new state of the object. Therefore if the application is maintaining the endpoint addresses internally, they don't need to be incrementally updated.
The final step is to ensure that your application has the permissions to call the EndpointSlices API resources
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: server-endpoints-role
rules:
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: server-endpoints-rolebinding
subjects:
- kind: ServiceAccount
name: server
roleRef:
kind: Role
name: server-endpoints-role
apiGroup: rbac.authorization.k8s.io
Conclusion
The EndpointSlices Kubernetes API provides a scalable, extensible, and convenient way to monitor pod lifecycle events in a Kubernetes Service in real-time, enabling seamless load balancing in stateful distributed applications.
Opinions expressed by DZone contributors are their own.
Comments