Workers
The HubHubThe Go service that owns background processing, integrations, and the admin API. Sibling to Core.’s async backbone is RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. — a Go job queue whose durable store is Postgres itself. If you are coming from .NET, the closest analog is Hangfire: jobs serialise, land in a table, and a worker pool drains them.
The River model in one paragraph
RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. defines a job as a Go struct with JSON-encodable args. A worker is a type that implements river.Worker[Args] with a Work(ctx, job) method. You register workers on a river.Client, start the client, and it polls the river_job table with SELECT ... FOR UPDATE SKIP LOCKED, hands rows to goroutines, runs Work, and acks.
Enqueue from anywhere that can open a transaction on the same Postgres:
client.Insert(ctx, service.FeedbackEmbeddingArgs{ FeedbackRecordID: id }, nil)From TypeScriptTypeScriptJavaScript with a static type system. Every HiveCFM Node service, the frontend, and the dev hub are written in it. core, the same shape works — insert a row whose kind matches the worker’s args and args is the JSON body. Both sides see the same queue.
What each worker does
Source: hivecfm-hub/internal/workers/.
| File | Worker | Purpose |
|---|---|---|
feedback_embedding.go | FeedbackEmbeddingWorker | Fetches a feedback record, calls the embedding client (OpenAI-compatible), writes a 768-d vector to the embeddings table. Emits metrics on duration and failure class. |
sentiment_analysis.go | SentimentAnalysisWorker | Runs sentiment classification on a feedback record’s text and updates the record’s metadata with sentiment + confidence. |
webhook_dispatch.go | WebhookDispatchWorker | Delivers one event to one webhook endpoint. Signs the payload, handles retries with exponential backoff, records last_status on the webhook row. |
webhook_dispatch_test.go | — | Test harness for the dispatch worker. Not a worker itself. |
Each worker lives in ~50–200 lines, exposes a NewXWorker(...) constructor that takes every dependency explicitly, and delegates to the relevant internal/service/... type for the actual business logic.
Anatomy of a worker
Using FeedbackEmbeddingWorker as the example:
type FeedbackEmbeddingWorker struct {
river.WorkerDefaults[service.FeedbackEmbeddingArgs]
embeddingService feedbackEmbeddingService
embeddingClient service.EmbeddingClient
docPrefix string
metrics observability.EmbeddingMetrics
}
func (w *FeedbackEmbeddingWorker) Work(ctx context.Context, job *river.Job[service.FeedbackEmbeddingArgs]) error {
// 1. Load the record
// 2. Call the embedding client
// 3. Write to the embeddings table
// 4. Emit metrics
}Four things worth noting:
river.WorkerDefaults[Args]— embeds sensible defaults (retry policy, timeout). Override only what you need.- Minimal interfaces. The worker depends on a trimmed interface (
feedbackEmbeddingService) rather than the concrete service, so tests can inject a stub. - Errors return
error. A non-nil return causes River to retry with backoff; ariver.JobCancelErroraborts the job permanently. - Metrics are first-class. Every worker takes an
observability.*Metricsvalue and emits duration + outcome. See DevOps / Observability.
Retry and timeout policy
- Default retries — 25 attempts with exponential backoff, capped at 1 hour between retries. Applies unless the worker overrides
Timeout()/NextRetry(). - Webhook dispatch overrides retry to match the webhook spec (e.g. abort after N days of failures, mark the endpoint unhealthy).
- Timeout per attempt — configured per worker; CPU-bound workers have longer budgets.
Registering workers
hivecfm-hub/cmd/api/app.go builds the RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. client at startup, calls river.AddWorker(workers, NewFeedbackEmbeddingWorker(...)) for each worker type, then client.Start(ctx). Adding a new worker is three steps:
- Define the job args struct (typically in
internal/service/<feature>_job_args.go). - Implement
river.Worker[Args]ininternal/workers/<feature>.go. - Register it in
cmd/api/app.go.
No magic, no reflection, no auto-discovery.
Do not block in Work on an unbounded external call. Set a context deadline and let River retry. A worker that hangs forever holds a row-level lock and starves other jobs on that queue.
Enqueuing from core
TypeScriptTypeScriptJavaScript with a static type system. Every HiveCFM Node service, the frontend, and the dev hub are written in it. core currently enqueues through a thin wrapper around pg that writes directly to river_job. The kind column must match the Go-side worker’s registration. If you add a worker on the HubHubThe Go service that owns background processing, integrations, and the admin API. Sibling to Core., add a matching enqueue helper on the core side — the kind string is the contract.
Read next
- Search — the synchronous Hub surface (not a worker, but colocated).
- DevOps / Observability — how worker metrics reach a dashboard.