Hub (Go)Workers

Workers

The HubHubThe Go service that owns background processing, integrations, and the admin API. Sibling to Core.’s async backbone is RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. — a Go job queue whose durable store is Postgres itself. If you are coming from .NET, the closest analog is Hangfire: jobs serialise, land in a table, and a worker pool drains them.

The River model in one paragraph

RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. defines a job as a Go struct with JSON-encodable args. A worker is a type that implements river.Worker[Args] with a Work(ctx, job) method. You register workers on a river.Client, start the client, and it polls the river_job table with SELECT ... FOR UPDATE SKIP LOCKED, hands rows to goroutines, runs Work, and acks.

Enqueue from anywhere that can open a transaction on the same Postgres:

client.Insert(ctx, service.FeedbackEmbeddingArgs{ FeedbackRecordID: id }, nil)

From TypeScriptTypeScriptJavaScript with a static type system. Every HiveCFM Node service, the frontend, and the dev hub are written in it. core, the same shape works — insert a row whose kind matches the worker’s args and args is the JSON body. Both sides see the same queue.

What each worker does

Source: hivecfm-hub/internal/workers/.

FileWorkerPurpose
feedback_embedding.goFeedbackEmbeddingWorkerFetches a feedback record, calls the embedding client (OpenAI-compatible), writes a 768-d vector to the embeddings table. Emits metrics on duration and failure class.
sentiment_analysis.goSentimentAnalysisWorkerRuns sentiment classification on a feedback record’s text and updates the record’s metadata with sentiment + confidence.
webhook_dispatch.goWebhookDispatchWorkerDelivers one event to one webhook endpoint. Signs the payload, handles retries with exponential backoff, records last_status on the webhook row.
webhook_dispatch_test.goTest harness for the dispatch worker. Not a worker itself.

Each worker lives in ~50–200 lines, exposes a NewXWorker(...) constructor that takes every dependency explicitly, and delegates to the relevant internal/service/... type for the actual business logic.

Anatomy of a worker

Using FeedbackEmbeddingWorker as the example:

type FeedbackEmbeddingWorker struct {
    river.WorkerDefaults[service.FeedbackEmbeddingArgs]
 
    embeddingService feedbackEmbeddingService
    embeddingClient  service.EmbeddingClient
    docPrefix        string
    metrics          observability.EmbeddingMetrics
}
 
func (w *FeedbackEmbeddingWorker) Work(ctx context.Context, job *river.Job[service.FeedbackEmbeddingArgs]) error {
    // 1. Load the record
    // 2. Call the embedding client
    // 3. Write to the embeddings table
    // 4. Emit metrics
}

Four things worth noting:

  1. river.WorkerDefaults[Args] — embeds sensible defaults (retry policy, timeout). Override only what you need.
  2. Minimal interfaces. The worker depends on a trimmed interface (feedbackEmbeddingService) rather than the concrete service, so tests can inject a stub.
  3. Errors return error. A non-nil return causes River to retry with backoff; a river.JobCancelError aborts the job permanently.
  4. Metrics are first-class. Every worker takes an observability.*Metrics value and emits duration + outcome. See DevOps / Observability.

Retry and timeout policy

  • Default retries — 25 attempts with exponential backoff, capped at 1 hour between retries. Applies unless the worker overrides Timeout() / NextRetry().
  • Webhook dispatch overrides retry to match the webhook spec (e.g. abort after N days of failures, mark the endpoint unhealthy).
  • Timeout per attempt — configured per worker; CPU-bound workers have longer budgets.

Registering workers

hivecfm-hub/cmd/api/app.go builds the RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. client at startup, calls river.AddWorker(workers, NewFeedbackEmbeddingWorker(...)) for each worker type, then client.Start(ctx). Adding a new worker is three steps:

  1. Define the job args struct (typically in internal/service/<feature>_job_args.go).
  2. Implement river.Worker[Args] in internal/workers/<feature>.go.
  3. Register it in cmd/api/app.go.

No magic, no reflection, no auto-discovery.

⚠️

Do not block in Work on an unbounded external call. Set a context deadline and let River retry. A worker that hangs forever holds a row-level lock and starves other jobs on that queue.

Enqueuing from core

TypeScriptTypeScriptJavaScript with a static type system. Every HiveCFM Node service, the frontend, and the dev hub are written in it. core currently enqueues through a thin wrapper around pg that writes directly to river_job. The kind column must match the Go-side worker’s registration. If you add a worker on the HubHubThe Go service that owns background processing, integrations, and the admin API. Sibling to Core., add a matching enqueue helper on the core side — the kind string is the contract.