in one of my recent projects, I had a need for a message queue to handle background jobs across multiple services. But here’s the thing — this was the first version of my service, and I wanted to keep my Kubernetes cluster simple. I didn’t want to introduce another stateful set like Redis, RabbitMQ, or NATS just yet.
I already had PostgreSQL in my stack, so I thought: why not try to use the database I already have?
To my surprise, PostgreSQL held up remarkably well as a message queue. In this post, I’ll explain exactly how I built the queue using PostgreSQL, walk through real-world Go code, compare it with traditional message queues like NATS and RabbitMQ, and share the lessons I learned — including where this approach works well, and where it falls short.
System Architecture Overview
Let’s look at the overall flow of the system:
- Producers: These are parts of your app that insert jobs into a PostgreSQL table.
- Consumers: Separate Go routines or services that continuously poll PostgreSQL, safely locking and processing jobs.
- Database: PostgreSQL is used both for persistent storage and as a pseudo message broker.
Benefits of this setup:
- No need for additional infrastructure like Redis or RabbitMQ.
- All state is centralized in PostgreSQL, which simplifies backups and deployment.
- You can use raw SQL or ORMs for full control and flexibility.
Of course, there are trade-offs — and we’ll get to those later.
Step 1: Designing the Jobs Table
To start, I created a table in PostgreSQL that stores each job:
CREATE TABLE jobs (
id SERIAL PRIMARY KEY,
payload JSONB NOT NULL,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT now(),
locked_at TIMESTAMP,
processed_at TIMESTAMP,
retry_count INT DEFAULT 0
);
payload
holds job-specific data (as JSON).status
tracks job state (e.g., 'pending', 'processing', 'done', 'failed').locked_at
andprocessed_at
help with retries and monitoring.retry_count
is useful for exponential backoff logic.
You can extend this table with fields like error_message
, expires_at
, or worker_id
for more advanced needs.
Step 2: Enqueuing Jobs in Go
Here’s how a producer (e.g., a REST handler) inserts a new job into the queue:
func EnqueueJob(db *pgxpool.Pool, payload map[string]interface{}) error {
jsonPayload, _ := json.Marshal(payload)
_, err := db.Exec(context.Background(),
"INSERT INTO jobs (payload) VALUES ($1)", jsonPayload)
return err
}
It’s fast, safe, and supports all PostgreSQL features like transactions and constraints.
Step 3: Consuming Jobs Safely with SELECT ... FOR UPDATE SKIP LOCKED
This is where PostgreSQL shines. We can ensure that multiple consumers don’t pick the same job:
func ConsumeJob(db *pgxpool.Pool) (*Job, error) {
tx, err := db.Begin(context.Background())
if err != nil {
return nil, err
}
defer tx.Rollback(context.Background())
row := tx.QueryRow(context.Background(), `
WITH job AS (
SELECT * FROM jobs
WHERE status = 'pending'
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs
SET status = 'processing', locked_at = now()
FROM job
WHERE jobs.id = job.id
RETURNING jobs.*;
`)
var job Job
if err := row.Scan(&job.ID, &job.Payload, &job.Status, &job.CreatedAt, &job.LockedAt, &job.ProcessedAt, &job.RetryCount); err != nil {
return nil, err
}
tx.Commit(context.Background())
return &job, nil
}
Key concepts:
FOR UPDATE SKIP LOCKED
: prevents race conditions and ensures only one worker grabs a job.- Wrapping in a transaction prevents dirty reads and ensures state consistency.
Step 4: Completing or Failing Jobs
Once a job is processed successfully:
func CompleteJob(db *pgxpool.Pool, jobID int) error {
_, err := db.Exec(context.Background(),
"UPDATE jobs SET status = 'done', processed_at = now() WHERE id = $1", jobID)
return err
}
For failures, you might do:
func FailJob(db *pgxpool.Pool, jobID int, errMsg string) error {
_, err := db.Exec(context.Background(),
"UPDATE jobs SET status = 'failed', retry_count = retry_count + 1 WHERE id = $1", jobID)
return err
}
Comparing PostgreSQL with RabbitMQ and NATS
Here’s how this PostgreSQL-based queue stacks up:
Feature | PostgreSQL | NATS / RabbitMQ |
---|---|---|
Setup Complexity | Very Low (reuse existing DB) | Medium to High |
Message Throughput | Moderate (100s-1k/sec with tuning) | Very High (10k+/sec) |
Persistence | Built-in with ACID guarantees | RabbitMQ: Persistent queues, NATS: JetStream optional |
Retry Logic | Manual (custom logic) | Built-in |
Delayed Jobs | Via scheduled_at column | Built-in (plugins or delay queues) |
Fanout / PubSub | Poor (LISTEN/NOTIFY is limited) | Excellent |
Observability | Easy via SQL queries and logs | Built-in dashboards and CLI tools |
Use Case Fit | Light async jobs, internal services | High-volume event streaming |
Performance and Monitoring Tips
You can improve performance by:
- Indexing
status
andcreated_at
fields - Batching job consumption
- Vacuum tuning and table partitioning for large queues
- Running periodic job cleanup for
done
/failed
jobs
To monitor the queue:
SELECT status, COUNT(*) FROM jobs GROUP BY status;
Export this as a Prometheus metric to monitor queue depth and failures.
Using pgmq — A PostgreSQL Message Queue Library
if you want to avoid reinventing the wheel, check out pgmq — a lightweight, battle-tested message queue built on top of PostgreSQL. It provides a clean API for producing and consuming messages with features like retries, acknowledgments, and concurrency control, all while leveraging PostgreSQL’s SKIP LOCKED
mechanism under the hood.
Why consider pgmq?
- Ready-made queue abstraction: No need to write raw SQL or worry about job locking.
- Retry and failure handling: Built-in support for retry policies and dead-letter queues.
- Concurrency safe: Designed for multiple workers to process jobs safely.
- Minimal dependencies: Works with your existing PostgreSQL instance.
Here’s a tiny Go example of publishing and consuming messages with pgmq
:
import (
"context"
"github.com/vgarvardt/pgmq"
"github.com/jackc/pgx/v4/pgxpool"
"log"
)
func main() {
ctx := context.Background()
dbpool, _ := pgxpool.Connect(ctx, "postgres://user:pass@localhost/dbname")
defer dbpool.Close()
queue := pgmq.NewQueue(dbpool, "my_jobs")
err := queue.Publish(ctx, []byte(`{"task":"send_email","to":"user@example.com"}`))
if err != nil {
log.Fatal(err)
}
messages, err := queue.Consume(ctx, 10)
if err != nil {
log.Fatal(err)
}
for _, msg := range messages {
log.Printf("Processing message: %s\n", string(msg.Body))
// Do your job...
// Acknowledge
msg.Ack(ctx)
}
}
If you want a solid, community-supported PostgreSQL queue with much of the heavy lifting done for you, pgmq
is a great place to start.
What I Learned
- PostgreSQL is underrated — it’s versatile and powerful when used wisely.
- SELECT FOR UPDATE SKIP LOCKED is gold — it enables safe parallel processing.
- Monitoring and cleanup are essential. PostgreSQL won’t delete old jobs for you.
- Not a silver bullet — don’t use this if you need fan-out, low-latency pub/sub, or streaming.
Final Thoughts
Would I do it again? Absolutely — in the right context. If you're building internal tools, backend jobs, or apps that don’t need millisecond latency or massive scale, PostgreSQL can save you a ton of infra overhead.
And you get to stay in one stack — no extra services, no message broker configs, and full SQL power.