Dynamic Kubernetes Informers

How we updated our Kubernetes integration at FireHydrant.

Robert Rossprofile image

By Robert Ross on 8/28/2019

In the past I’ve written about how to use informers in Kubernetes for particular resources, but what if you need to be able to receive events for any Kubernetes resource dynamically? Well, there’s a client-go package for that too. At FireHydrant, we recently updated our Kubernetes integration to watch changes for any resource you configure and I wanted to write down how we made it at a high level.

Events

We have a change log feature in our product, and for a long time, we’ve supported receiving change events in Kubernetes for all of the workload API (Deployments, ReplicaSets, etc). But we recently had an incident ourselves where it would have been great if we were able to see other resources such as nodes, certificates, etc. We could bake it into the already existing Go code we built (What’s another few lines of code to watch a different resource type?). But we decided configuration here would be better for us, and others!

Basically, we wanted to be able to have a configuration file that looked like this:

                                
watch:
  - namespace: "*"
	environment: development
	resources:
	- resource: deployments.v1.apps
	  updateOn: ["spec", "metadata", "status"]
	  includeDiff: true
	  skipServiceCreate: false
	- resource: configmaps
	- resource: mutatingwebhookconfigurations.v1beta1.admissionregistration.k8s.io
	  updateOn: ["spec", "metadata"]
                                
                        

This allows us to watch anything in our Kubernetes cluster for events, but how do we go from this to actually watching for these events?

Dynamic Informers

In the client-go package there’s a subfolder called “dynamic” that contains all of the primitives we need to make this requirement a reality. The package relies on just the rest.Config configuration object, which is a bit different than using other informers which use a clientset type. Let’s get started by writing some simple code to get a rest.Config type that we can use in our dynamic informer.

                                
package main
import (
	"os"
	"github.com/sirupsen/logrus"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)
func main() {
	cfg, err := restConfig()
	if err != nil {
		logrus.WithError(err).Fatal("could not get config")
	}
}
func restConfig() (*rest.Config, error) {
	kubeCfg, err := rest.InClusterConfig()
	if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
		kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
	}
	if err != nil {
		return nil, err
	}
	return kubeCfg, nil
}
                                
                        

In this example, we’re asking if we can obtain an InClusterConfig() rest configuration, which is great if you’re running this project in the Kubernetes cluster you actually want to receive events for. Otherwise, if you have a KUBECONFIG environment variable set, we’ll use that file instead.

Configuring a Dynamic Informer

Next we have to do a few things to actually to get the point of generating an informer that can receive updates for any resource type.

  1. Create a Dynamic client from our REST config

  2. Create a DynamicInformer factory from the client

  3. Generate a GroupVersionResource type that we give our factory to generate an informer (this is important, it’s what indicates which resource we actually want updates for).

  4. Give the GVR to our factory and receive an informer.

                                
package main

import (
	"os"

	"github.com/sirupsen/logrus"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	cfg, err := restConfig()
	if err != nil {
		logrus.WithError(err).Fatal("could not get config")
	}

	// Grab a dynamic interface that we can create informers from
	dc, err := dynamic.NewForConfig(cfg)
	if err != nil {
		logrus.WithError(err).Fatal("could not generate dynamic client for config")
	}

	// Create a factory object that we can say "hey, I need to watch this resource"
	// and it will give us back an informer for it
	f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 0, v1.NamespaceAll, nil)

	// Retrieve a "GroupVersionResource" type that we need when generating our informer from our dynamic factory
	gvr, _ := schema.ParseResourceArg("deployments.v1.apps")

	// Finally, create our informer for deployments!
	i := f.ForResource(*gvr)
}
                                
                        

Now we have a GenericInformer interface returned from our factory, woot! This means we have the ability to start watching for events on it and act on them. Let’s do that next.

Handling events from an informer

It’s best practice to push events an informer gives you onto a queue that another piece of code then handles it (usually called a controller). Kubernetes provides all of the packages to handle this. For starters, however, we’re going to skip this to keep this tutorial relatively brief.

Next, let’s make a simple function that handles the events that will come from our informer and start the watcher.

                                
func startWatching(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
	handlers := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			logrus.Info("received add event!")
		},
		UpdateFunc: func(oldObj, obj interface{}) {
			logrus.Info("received update event!")
		},
		DeleteFunc: func(obj interface{}) {
			logrus.Info("received update event!")
		},
	}

	s.AddEventHandler(handlers)
	s.Run(stopCh)
}
                                
                        

Next, let’s hook up this function to our informer that we generated above:

                                
// ... continuing from our GVR creation
i := f.ForResource(*gvr)
stopCh := make(chan struct{})
go startWatching(stopCh, i.Informer())
sigCh := make(chan os.Signal, 0)
signal.Notify(sigCh, os.Kill, os.Interrupt)
<-sigCh
close(stopCh)
                                
                        

First, we create a channel that we can close to indicate the Informer should stop, and then pass that channel to our startWatching function that we push into a Go routine.

Lastly, we create a signal channel that we can be notified on when our program receives a signal so we can gracefully stop our informer from any more processing.

Here’s our main.go in full at this point of the tutorial:

                                
package main

import (
	"os"
	"os/signal"

	"github.com/sirupsen/logrus"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	cfg, err := restConfig()
	if err != nil {
		logrus.WithError(err).Fatal("could not get config")
	}

	// Grab a dynamic interface that we can create informers from
	dc, err := dynamic.NewForConfig(cfg)
	if err != nil {
		logrus.WithError(err).Fatal("could not generate dynamic client for config")
	}

	// Create a factory object that we can say "hey, I need to watch this resource"
	// and it will give us back an informer for it
	f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 0, v1.NamespaceAll, nil)

	// Retrieve a "GroupVersionResource" type that we need when generating our informer from our dynamic factory
	gvr, _ := schema.ParseResourceArg("deployments.v1.apps")

	// Finally, create our informer for deployments!
	i := f.ForResource(*gvr)

	stopCh := make(chan struct{})
	go startWatching(stopCh, i.Informer())

	sigCh := make(chan os.Signal, 0)
	signal.Notify(sigCh, os.Kill, os.Interrupt)

	<-sigCh
	close(stopCh)
}

func restConfig() (*rest.Config, error) {
	kubeCfg, err := rest.InClusterConfig()
	if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
		kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
	}

	if err != nil {
		return nil, err
	}

	return kubeCfg, nil
}

func startWatching(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
	handlers := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			logrus.Info("received add event!")
		},
		UpdateFunc: func(oldObj, obj interface{}) {
			logrus.Info("received update event!")
		},
		DeleteFunc: func(obj interface{}) {
			logrus.Info("received update event!")
		},
	}

	s.AddEventHandler(handlers)
	s.Run(stopCh)
}
                                
                        

Trying It Out

Locally I run Minikube to build these types of projects. So in my terminal I run:

                                
minikube start
export KUBECONFIG=~/.kube/config
go run main.go
                                
                        

Then, voila!

                                
➜  dynamic-informers go run main.go
INFO[0000] received add event!
                                
                        

Success! We receiving events from a local minikube cluster.

Getting more useful information

There are some downsides to using Dynamic informers, when they come across to our handler functions, they’re the type of unstructured.Unstructured which comes from this package: unstructured - GoDoc

This type is fine, however, to retrieve simple details like name, namespace, labels, annotations, etc. Which for our purposes at FireHydrant were all we needed to build our change event watcher. Let’s convert our event objects and print some more interesting details:

                                
func startWatching(stopCh <-chan struct{}, s cache.SharedIndexInformer) {
	handlers := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			u := obj.(*unstructured.Unstructured)

			logrus.WithFields(logrus.Fields{
				"name":	  u.GetName(),
				"namespace": u.GetNamespace(),
				"labels":	u.GetLabels(),
			}).Info("received add event!")
		},
		UpdateFunc: func(oldObj, obj interface{}) {
			logrus.Info("received update event!")
		},
		DeleteFunc: func(obj interface{}) {
			logrus.Info("received update event!")
		},
	}

	s.AddEventHandler(handlers)
	s.Run(stopCh)
}
                                
                        

This converts our object into a more concrete type from the interface{} we receive in the function parameter. From here we then have all of the same methods that v1.Object interface defines.

Closing Up

I hope this guide was helpful. Dynamic informers were exactly what we needed to build our updated integration for Kubernetes but there weren’t that many resources available explaining how to hook them up.

I’ve posted this blog’s final result on our GitHub organization! Check it out here: https://github.com/firehydrant/blog-dynamic-informers


You just got paged. Now what?

FireHydrant helps every team master incident response with straightforward processes that build trust and make communication easy.

Learn How

See FireHydrant in action

See how service catalog, incident management, and incident communications come together in a live demo.

Get a demo