Replace Rabbits with Elephants: Use PostgreSQL as Message Queue

Replace Rabbits with Elephants: Use PostgreSQL as Message Queue

in one of my recent projects, I had a need for a message queue to handle background jobs across multiple services. But here’s the thing — this was the first version of my service, and I wanted to keep my Kubernetes cluster simple. I didn’t want to introduce another stateful set like Redis, RabbitMQ, or NATS just yet.

I already had PostgreSQL in my stack, so I thought: why not try to use the database I already have?

To my surprise, PostgreSQL held up remarkably well as a message queue. In this post, I’ll explain exactly how I built the queue using PostgreSQL, walk through real-world Go code, compare it with traditional message queues like NATS and RabbitMQ, and share the lessons I learned — including where this approach works well, and where it falls short.

System Architecture Overview

Let’s look at the overall flow of the system:

  • Producers: These are parts of your app that insert jobs into a PostgreSQL table.
  • Consumers: Separate Go routines or services that continuously poll PostgreSQL, safely locking and processing jobs.
  • Database: PostgreSQL is used both for persistent storage and as a pseudo message broker.

Benefits of this setup:

  • No need for additional infrastructure like Redis or RabbitMQ.
  • All state is centralized in PostgreSQL, which simplifies backups and deployment.
  • You can use raw SQL or ORMs for full control and flexibility.

Of course, there are trade-offs — and we’ll get to those later.

Step 1: Designing the Jobs Table

To start, I created a table in PostgreSQL that stores each job:

CREATE TABLE jobs (
  id SERIAL PRIMARY KEY,
  payload JSONB NOT NULL,
  status TEXT DEFAULT 'pending',
  created_at TIMESTAMP DEFAULT now(),
  locked_at TIMESTAMP,
  processed_at TIMESTAMP,
  retry_count INT DEFAULT 0
);
  • payload holds job-specific data (as JSON).
  • status tracks job state (e.g., 'pending', 'processing', 'done', 'failed').
  • locked_at and processed_at help with retries and monitoring.
  • retry_count is useful for exponential backoff logic.

You can extend this table with fields like error_message, expires_at, or worker_id for more advanced needs.

Step 2: Enqueuing Jobs in Go

Here’s how a producer (e.g., a REST handler) inserts a new job into the queue:

func EnqueueJob(db *pgxpool.Pool, payload map[string]interface{}) error {
    jsonPayload, _ := json.Marshal(payload)
    _, err := db.Exec(context.Background(),
        "INSERT INTO jobs (payload) VALUES ($1)", jsonPayload)
    return err
}

It’s fast, safe, and supports all PostgreSQL features like transactions and constraints.

Step 3: Consuming Jobs Safely with SELECT ... FOR UPDATE SKIP LOCKED

This is where PostgreSQL shines. We can ensure that multiple consumers don’t pick the same job:

func ConsumeJob(db *pgxpool.Pool) (*Job, error) {
    tx, err := db.Begin(context.Background())
    if err != nil {
        return nil, err
    }
    defer tx.Rollback(context.Background())

    row := tx.QueryRow(context.Background(), `
        WITH job AS (
            SELECT * FROM jobs
            WHERE status = 'pending'
            ORDER BY created_at
            FOR UPDATE SKIP LOCKED
            LIMIT 1
        )
        UPDATE jobs
        SET status = 'processing', locked_at = now()
        FROM job
        WHERE jobs.id = job.id
        RETURNING jobs.*;
    `)

    var job Job
    if err := row.Scan(&job.ID, &job.Payload, &job.Status, &job.CreatedAt, &job.LockedAt, &job.ProcessedAt, &job.RetryCount); err != nil {
        return nil, err
    }

    tx.Commit(context.Background())
    return &job, nil
}

Key concepts:

  • FOR UPDATE SKIP LOCKED: prevents race conditions and ensures only one worker grabs a job.
  • Wrapping in a transaction prevents dirty reads and ensures state consistency.

Step 4: Completing or Failing Jobs

Once a job is processed successfully:

func CompleteJob(db *pgxpool.Pool, jobID int) error {
    _, err := db.Exec(context.Background(),
        "UPDATE jobs SET status = 'done', processed_at = now() WHERE id = $1", jobID)
    return err
}

For failures, you might do:

func FailJob(db *pgxpool.Pool, jobID int, errMsg string) error {
    _, err := db.Exec(context.Background(),
        "UPDATE jobs SET status = 'failed', retry_count = retry_count + 1 WHERE id = $1", jobID)
    return err
}

Comparing PostgreSQL with RabbitMQ and NATS

Here’s how this PostgreSQL-based queue stacks up:

FeaturePostgreSQLNATS / RabbitMQ
Setup ComplexityVery Low (reuse existing DB)Medium to High
Message ThroughputModerate (100s-1k/sec with tuning)Very High (10k+/sec)
PersistenceBuilt-in with ACID guaranteesRabbitMQ: Persistent queues, NATS: JetStream optional
Retry LogicManual (custom logic)Built-in
Delayed JobsVia scheduled_at columnBuilt-in (plugins or delay queues)
Fanout / PubSubPoor (LISTEN/NOTIFY is limited)Excellent
ObservabilityEasy via SQL queries and logsBuilt-in dashboards and CLI tools
Use Case FitLight async jobs, internal servicesHigh-volume event streaming

Performance and Monitoring Tips

You can improve performance by:

  • Indexing status and created_at fields
  • Batching job consumption
  • Vacuum tuning and table partitioning for large queues
  • Running periodic job cleanup for done/failed jobs

To monitor the queue:

SELECT status, COUNT(*) FROM jobs GROUP BY status;

Export this as a Prometheus metric to monitor queue depth and failures.

Using pgmq — A PostgreSQL Message Queue Library

if you want to avoid reinventing the wheel, check out pgmq — a lightweight, battle-tested message queue built on top of PostgreSQL. It provides a clean API for producing and consuming messages with features like retries, acknowledgments, and concurrency control, all while leveraging PostgreSQL’s SKIP LOCKED mechanism under the hood.

Why consider pgmq?

  • Ready-made queue abstraction: No need to write raw SQL or worry about job locking.
  • Retry and failure handling: Built-in support for retry policies and dead-letter queues.
  • Concurrency safe: Designed for multiple workers to process jobs safely.
  • Minimal dependencies: Works with your existing PostgreSQL instance.

Here’s a tiny Go example of publishing and consuming messages with pgmq:

import (
    "context"
    "github.com/vgarvardt/pgmq"
    "github.com/jackc/pgx/v4/pgxpool"
    "log"
)

func main() {
    ctx := context.Background()
    dbpool, _ := pgxpool.Connect(ctx, "postgres://user:pass@localhost/dbname")
    defer dbpool.Close()

    queue := pgmq.NewQueue(dbpool, "my_jobs")

    
    err := queue.Publish(ctx, []byte(`{"task":"send_email","to":"user@example.com"}`))
    if err != nil {
        log.Fatal(err)
    }

    
    messages, err := queue.Consume(ctx, 10)
    if err != nil {
        log.Fatal(err)
    }

    for _, msg := range messages {
        log.Printf("Processing message: %s\n", string(msg.Body))
        // Do your job...

        // Acknowledge
        msg.Ack(ctx)
    }
}

If you want a solid, community-supported PostgreSQL queue with much of the heavy lifting done for you, pgmq is a great place to start.

What I Learned

  1. PostgreSQL is underrated — it’s versatile and powerful when used wisely.
  2. SELECT FOR UPDATE SKIP LOCKED is gold — it enables safe parallel processing.
  3. Monitoring and cleanup are essential. PostgreSQL won’t delete old jobs for you.
  4. Not a silver bullet — don’t use this if you need fan-out, low-latency pub/sub, or streaming.

Final Thoughts

Would I do it again? Absolutely — in the right context. If you're building internal tools, backend jobs, or apps that don’t need millisecond latency or massive scale, PostgreSQL can save you a ton of infra overhead.

And you get to stay in one stack — no extra services, no message broker configs, and full SQL power.