Kubernetes does a great job running workloads. If you ask for three replicas, it keeps three running. If a pod crashes, it restarts it. If a node fails, it reschedules the pod. The control plane always works to match your desired state.
However, Kubernetes doesn’t know what’s happening inside your app. It can’t see if your image processing queue has 500 items waiting, or if your video encoding pipeline suddenly gets overloaded at 2 a.m. on Black Friday. It simply runs what you’ve told it to run.
Operators are built to bridge the gap between what Kubernetes knows and what your application actually needs.
I built one from scratch. Not to ship a product, but to understand how the internals work. The result is scaledjob-operator: an operator that watches a custom resource called ScaledJob, reads the depth of a Redis queue, and automatically creates Kubernetes Jobs to drain it. Queue fills up, Jobs get created. Queue empties, no new Jobs are created, and the running ones finish naturally.
In this article, I’ll explain what I built, why I built it, and the three biggest surprises I found along the way.
Why queue-based autoscaling specifically
Kubernetes usually uses the Horizontal Pod Autoscaler (HPA) for autoscaling. HPA adjusts Deployments based on CPU or memory usage, which works well for web servers where traffic matches resource use.
But many real workloads don’t work that way. Tasks like image resizing, video encoding, email sending, and payment processing are queue-based. Work comes in batches and can be unpredictable. For these, the best scaling signal isn’t CPU; it’s the queue depth, or how many messages are waiting to be processed.
KEDA is a solid solution for this in production. It’s powerful and well-maintained. But KEDA’s codebase is big and has many layers, so reading it doesn’t quickly show you how the operator pattern works behind the scenes.
Building a simpler version from scratch really helps you learn. That’s why I chose to do it myself.
What scaledjob-operator does
You create a ScaledJob resource that looks like this:
apiVersion: batch.miracle.dev/v1
kind: ScaledJob
metadata:
name: image-processor
namespace: production
spec:
queueName: image-resize-queue
redisAddress: redis.production.svc.cluster.local:6379
threshold: 10
minReplicas: 0
maxReplicas: 20
jobTemplate:
spec:
template:
spec:
restartPolicy: Never
containers:
- name: worker
image: myapp/image-worker:v1.2.0
The operator keeps an eye on this resource. Every time it reconciles, it checks the Redis queue depth, decides how many Jobs should be running, creates more if needed, and updates the resource’s status with the latest info:
status:
queueDepth: 47
activeJobs: 4
desiredJobs: 5
lastScaleTime: "2024-01-15T14:32:00Z"
conditions:
- type: Ready
status: "True"
reason: Reconciled
- type: QueueConnected
status: "True"
reason: Connected
The scaling math is straightforward: divide the queue depth by the threshold and round up. For example, if the depth is 47 and the threshold is 10, you get ceil(47/10) = 5. Then, make sure the number stays between minReplicas and maxReplicas.
I used integer ceiling division rather than math.Ceil to avoid floating-point in the hot path:
func TestCalculateDesiredJobs(t *testing.T) {
tests := []struct {
name string
depth int64
spec batchv1.ScaledJobSpec
expected int32
}{
{"zero depth returns 0", 0, spec(10, 0, 5), 0},
{"depth below threshold returns 1", 5, spec(10, 0, 5), 1},
{"depth above max is clamped", 1000, spec(10, 0, 5), 5},
{"min replicas floor applies at zero depth", 0, spec(10, 2, 5), 2},
}
// ...
}
This function is pure. It doesn’t have side effects, make API calls, or read from Redis. Keeping it pure made it easy to test: The three things that surprised me
1. Idempotency is not optional; it changes how you think
Before building this, I only understood idempotency in theory. After building the operator, I realized why it’s absolutely necessary.
The reconcile function runs all the time- not just when you update a ScaledJob, but also when a Job changes state, during regular resyncs every 30 seconds, and when the operator restarts. Your function can’t assume why it was called. If it tries to guess “what changed?” it will eventually get it wrong.
The best way to approach this is to treat every reconcile as a fresh look at the current state.
func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Fetch the current state of the ScaledJob
var sj batchv1.ScaledJob
if err := r.Get(ctx, req.NamespacedName, &sj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Read queue depth right now
depth, err := r.QueueFactory.ForAddress(sj.Spec.RedisAddress).Depth(ctx, sj.Spec.QueueName)
if err != nil {
// Mark condition, requeue sooner than normal
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// Count what's actually running right now
active := countActiveJobs(controllerOwnedJobs(jobList, &sj))
// Calculate what should be running given current depth
desired := calculateDesiredJobs(depth, sj.Spec)
// Close the gap — create Jobs until active reaches desired
for active < desired {
job := buildJob(&sj)
ctrl.SetControllerReference(&sj, job, r.Scheme)
r.Create(ctx, job)
active++
}
// Always update status, even when nothing changed
sj.Status.QueueDepth = depth
sj.Status.ActiveJobs = active
sj.Status.DesiredJobs = desired
r.Status().Update(ctx, &sj)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
Notice that the reconciler never checks if the queue depth changed since last time. It doesn’t need to. Instead, it asks, “What is the queue depth right now, and how many Jobs should there be?” This approach always gives the right answer, no matter how often the function runs.
This also means the operator doesn’t delete running Jobs when the queue is empty. It just stops creating new ones. The running Jobs finish on their own. This isn’t being lazy; it’s the right way to handle batch workloads. Stopping a worker in the middle of a task to save resources is usually a bad idea.
2. Interface-driven design is essential, not just a style choice
I used to write interfaces in Go out of habit. Building this operator showed me why they’re truly needed here, not just a nice extra.
The reconciler has to read from Redis. The simple way is to import the Redis client and call it directly. That works, but then you can’t test the reconciler without a real Redis instance. Tests become slow, depend on the environment, and are fragile.
The alternative is to define what you need as an interface:
// Package queue abstracts queue depth reads behind an interface so the
// reconciler can be tested without a real Redis instance.
package queue
type Client interface {
Depth(ctx context.Context, queueName string) (int64, error)
}
type Factory interface {
ForAddress(address string) Client
}
In production, this is implemented with a real Redis client
type RedisClient struct {
client *redis.Client
}
func (c *RedisClient) Depth(ctx context.Context, queueName string) (int64, error) {
return c.client.LLen(ctx, queueName).Result()
}
For tests, you use a fake client that returns whatever values you need:
type FakeClient struct {
DepthValue int64
Err error
}
func (f *FakeClient) Depth(_ context.Context, queueName string) (int64, error) {
return f.DepthValue, f.Err
}
The reconciler gets a Factory when it’s created and never interacts with the Redis import directly. It doesn’t know if it’s using a real Redis or a fake one. It just calls Depth() and uses the result.
This makes tests fast and predictable:
newReconciler := func(depth int64, queueErr error) *ScaledJobReconciler {
return &ScaledJobReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
QueueFactory: &queue.FakeFactory{
Client: &queue.FakeClient{DepthValue: depth, Err: queueErr},
},
Clock: metav1.Now,
Recorder: eventRecorder,
}
}
It("queue depth 0 creates no Jobs when minReplicas is 0", func() {
_, err := newReconciler(0, nil).Reconcile(ctx, reconcile.Request{NamespacedName: namespacedName})
Expect(err).NotTo(HaveOccurred())
Expect(listOwned()).To(BeEmpty())
})
It("Redis error requeues with shorter interval", func() {
result, err := newReconciler(0, fmt.Errorf("connection refused")).Reconcile(ctx, ...)
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeNumerically("<", 30*time.Second))
})
This design also makes it easy to swap out the backend in the future. If you want to support SQS, just implement ` for SQS. The reconciler doesn’t need any changes.
3. Owner references are worth more than they look
When the operator creates a Job, it sets an owner reference pointing back to the ScaledJob that created it:
func buildJob(sj *batchv1.ScaledJob) *k8sbatchv1.Job {
return &k8sbatchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: sj.Name + "-",
Namespace: sj.Namespace,
Labels: map[string]string{labelOwner: sj.Name},
},
Spec: sj.Spec.JobTemplate.Spec,
}
}
// In the reconcile loop, after building the job:
ctrl.SetControllerReference(&sj, job, r.Scheme)
r.Create(ctx, job)
It’s only one line of code, but it makes a big difference.
When you delete a ScaledJob, Kubernetes automatically deletes all the Jobs it owns. You don’t need finalizers, cleanup code in the reconciler, or background processes to watch for deletions. The garbage collector takes care of everything.
This is more important than it looks. Without owner references, deleting a ScaledJob would leave orphaned Jobs running in the cluster. In a busy cluster with many ScaledJobs from different teams, these orphans can pile up. They use resources and are hard to trace back to their source.
Owner references also fix a trickier issue: stale Jobs from a ScaledJob that was deleted and then recreated with the same name. If you delete a ScaledJob called image-processor and make a new one with that name, old Jobs might still be running if they haven’t finished. Filtering by label alone would count those old Jobs as part of the new ScaledJob. The solution is to filter by both label and owner UID:
func controllerOwnedJobs(jobs k8sbatchv1.JobList, owner *batchv1.ScaledJob) k8sbatchv1.JobList {
owned := k8sbatchv1.JobList{
Items: make([]k8sbatchv1.Job, 0, len(jobs.Items)),
}
for _, job := range jobs.Items {
controllerRef := metav1.GetControllerOf(&job)
if controllerRef != nil && controllerRef.UID == owner.UID {
owned.Items = append(owned.Items, job)
}
}
return owned
}
The test for this was one of the more satisfying ones to write:
It("same-label Jobs without matching controller owner are ignored", func() {
// Create a Job with the right label but no ownerReference
staleJob := &k8sbatchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{labelOwner: resourceName},
},
// ...
}
k8sClient.Create(ctx, staleJob)
// depth=10 → desired=1. The stale Job must not count as active.
newReconciler(10, nil).Reconcile(ctx, ...)
// Should have created 1 new Job, not 0
Expect(listOwned()).To(HaveLen(2)) // stale + 1 new
Expect(updated.Status.ActiveJobs).To(Equal(int32(1)))
})
How the pieces fit together
Here’s how a request moves through the system from start to finish:
You create a ScaledJob
↓
Kubernetes stores it (CRD is registered, API server accepts it)
↓
Operator gets a watch event
↓
Reconciler runs:
1. Fetch the ScaledJob
2. Call QueueClient.Depth(queueName) → Redis LLEN → integer
3. List Jobs by label, filter by owner UID → active count
4. calculateDesiredJobs(depth, spec) → desired count
5. Create (desired - active) new Jobs, each with ownerReference
6. Write queueDepth, activeJobs, desiredJobs to Status
7. Set QueueConnected and Ready conditions
↓
Requeue after 30 seconds → repeat forever
Each step in this flow lines up with a specific file in the codebase:
scaledjob-operator/
├── api/v1/scaledjob_types.go # CRD Go types — Spec, Status, conditions
├── internal/
│ ├── controller/
│ │ ├── scaledjob_controller.go # Reconcile() — the core loop
│ │ ├── scaling.go # calculateDesiredJobs() — pure function
│ │ ├── jobs.go # buildJob(), countActiveJobs(), controllerOwnedJobs()
│ │ └── conditions.go # setCondition(), findCondition()
│ ├── queue/
│ │ ├── interface.go # QueueClient and Factory interfaces
│ │ ├── redis.go # Real Redis implementation
│ │ └── fake.go # Fake for tests
│ └── metrics/
│ └── metrics.go # Prometheus gauges and counters
Observability from the start
One thing I did early was set up Prometheus metrics before considering the project finished. It’s easy to delay observability and then forget about it.
The operator exposes five metrics:
var labelNames = []string{"namespace", "scaledjob", "queue"}
QueueDepth *prometheus.GaugeVec // queue items observed on last reconcile
ActiveJobs *prometheus.GaugeVec // non-terminal Jobs currently running
DesiredJobs *prometheus.GaugeVec // what the controller wants running
ReconcileTotal *prometheus.CounterVec // total reconcile attempts
ReconcileErrors *prometheus.CounterVec // reconcile attempts that errored
These five metrics answer the key questions for an on-call engineer: Is the queue draining? Are workers running? Is the operator healthy?
The operator also sends a few Kubernetes Events for state changes. For example, RedisUnreachable is sent when the queue can’t be reached, QueueConnected is sent on recovery, and CreatedJobs is sent when new Jobs are created. These events are only sent once per transition, not every reconcile. You don’t want to flood the event stream every 30 seconds just to say everything is okay.
Testing without a cluster
The controller tests use envtest, a real Kubernetes API server running in-process; no cluster required. Combined with the fake QueueClient, every test scenario runs in milliseconds:
It("Redis error sets QueueConnected=False and requeues sooner", func() {
// First reconcile succeeds so the condition exists
newReconciler(0, nil).Reconcile(ctx, reconcile.Request{...})
// Second reconcile simulates Redis going down
result, err := newReconciler(0, fmt.Errorf("connection refused")).Reconcile(ctx, ...)
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeNumerically("<", 30*time.Second))
// Condition should reflect the failure
updated := &batchv1.ScaledJob{}
k8sClient.Get(ctx, namespacedName, updated)
queueCond := apimeta.FindStatusCondition(updated.Status.Conditions, "QueueConnected")
Expect(queueCond.Status).To(Equal(metav1.ConditionFalse))
})
There’s also an end-to-end test that runs on a real Kind cluster. It deploys Redis, pushes 30 messages onto a queue, applies a ScaledJob with a threshold of 10, and waits for 3 Jobs to appear. Then it deletes the ScaledJob and checks that all Jobs are cleaned up. This test gives me real confidence that the whole system works together.
What I’d tell someone starting their first operator
Before you write any code, describe the reconcile function in plain language, not pseudocode, but real sentences. For example: “When this function runs, it will do X, then Y, then Z.” If you can’t explain it clearly, you probably don’t understand the flow well enough to build it yet.
Keep business logic out of the reconciler itself. In this project, the scaling calculation is a pure function in its own file. Job construction is in its own function, and condition helpers are separate too. The reconciler just brings these pieces together, without containing their logic.
Test the failure cases first. For example, what happens if Redis goes down, the ScaledJob is deleted during a reconcile, or a Job has the right label but the wrong owner? These edge cases are where operators can fail quietly in production. If you don’t test them during development, you’ll end up fixing them at 2 a.m.
You can find the full project at github.com/am-miracle/scaledjob-operator. The README has local setup instructions and explains every part. If you’re working with Kubernetes and want to really understand operators, building a simple one from scratch is the fastest way to learn. KEDA’s documentation will make a lot more sense afterward.