Hub Go worker (embeddings)
Core writes to Postgres and enqueues jobs. The HubHubThe Go service that owns background processing, integrations, and the admin API. Sibling to Core. — a Go service built around River — consumes those jobs. This walkthrough tours hivecfm-hub/internal/workers/feedback_embedding.go, the worker that takes a newly created feedback row, calls out to an embedding model, and writes the vector back. Six slices; end-to-end it is a good template for any new RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. worker you write.
If you come from .NET, the closest mental model is an IHostedService + BackgroundService implementation that consumes a work queue — but RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. is opinionated about retries, timeouts, and argument serialization in a way that Hangfire or a hand-rolled consumer is not.
1. Package, imports, and the type definition
hivecfm-hub/internal/workers/feedback_embedding.golines 1–34View full file ↗// Package workers provides River job workers (e.g. webhook delivery, feedback embedding).
package workers
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/google/uuid"
"github.com/riverqueue/river"
"github.com/amrhym/hivecfm-hub/internal/huberrors"
"github.com/amrhym/hivecfm-hub/internal/models"
"github.com/amrhym/hivecfm-hub/internal/observability"
"github.com/amrhym/hivecfm-hub/internal/service"
)
// FeedbackEmbeddingWorker generates and stores embeddings for feedback records.
type FeedbackEmbeddingWorker struct {
river.WorkerDefaults[service.FeedbackEmbeddingArgs]
embeddingService feedbackEmbeddingService
embeddingClient service.EmbeddingClient
docPrefix string // model-specific prefix for document embedding
metrics observability.EmbeddingMetrics
}
// feedbackEmbeddingService is the minimal interface needed by the worker.
type feedbackEmbeddingService interface {
GetFeedbackRecord(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error)
SetEmbedding(ctx context.Context, feedbackRecordID uuid.UUID, model string, embedding []float32) error
}The package comment on line 1 is deliberate: Go’s doc tooling uses it, and workers is an umbrella for every RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. worker we ship. The imports split cleanly into standard library (context, errors, log/slog, time), external deps (uuid, river), and internal packages — service, observability, models, typed errors.
FeedbackEmbeddingWorker embeds river.WorkerDefaults[service.FeedbackEmbeddingArgs]. In Go, embedding a struct is how you get “default method implementations” — WorkerDefaults supplies sensible no-op implementations for hooks you don’t care about (e.g. NextRetry, Middleware). You only override what you need.
The generic parameter [service.FeedbackEmbeddingArgs] is the job-args type. RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. enforces the contract at compile time: if you enqueue a job with different args, the code won’t build.
The small private interface feedbackEmbeddingService right below is a Go idiom worth copying. Rather than depending on the full service.FeedbackService, we declare exactly the two methods this worker calls. Tests can then pass a tiny fake that implements those two methods and nothing else.
2. Constructor and timeout hook
hivecfm-hub/internal/workers/feedback_embedding.golines 36–58View full file ↗// NewFeedbackEmbeddingWorker creates a worker that fetches the record, calls the embedding client, and stores the result.
// docPrefix is the prefix for document text. Can be empty for some providers.
// metrics may be nil when metrics are disabled.
func NewFeedbackEmbeddingWorker(
embeddingService feedbackEmbeddingService,
embeddingClient service.EmbeddingClient,
docPrefix string,
metrics observability.EmbeddingMetrics,
) *FeedbackEmbeddingWorker {
return &FeedbackEmbeddingWorker{
embeddingService: embeddingService,
embeddingClient: embeddingClient,
docPrefix: docPrefix,
metrics: metrics,
}
}
const feedbackEmbeddingTimeout = 30 * time.Second
// Timeout limits how long a single embedding job can run.
func (w *FeedbackEmbeddingWorker) Timeout(*river.Job[service.FeedbackEmbeddingArgs]) time.Duration {
return feedbackEmbeddingTimeout
}NewFeedbackEmbeddingWorker is the explicit constructor — Go’s substitute for a DI container wiring. Everything the worker needs is an argument, nothing is reached through globals, and the returned pointer is safe to share across goroutines because all fields are set before first use.
Timeout is a RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. hook. Returning 30 * time.Second tells River to cancel the ctx passed to Work() after that duration. The embedding API is the usual culprit for slow calls, so a 30-second cap keeps a stuck job from tying up a worker slot indefinitely. When the context is cancelled River treats it as an error — the retry logic below is what determines whether the job tries again.
3. Work() — load the record, classify errors
hivecfm-hub/internal/workers/feedback_embedding.golines 60–84View full file ↗// Work loads the record, generates or clears the embedding, and persists it.
func (w *FeedbackEmbeddingWorker) Work(ctx context.Context, job *river.Job[service.FeedbackEmbeddingArgs]) error {
args := job.Args
start := time.Now()
record, err := w.embeddingService.GetFeedbackRecord(ctx, args.FeedbackRecordID)
if err != nil {
if w.metrics != nil {
w.metrics.RecordWorkerError(ctx, "get_record_failed")
w.metrics.RecordEmbeddingOutcome(ctx, "failed_final")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "failed_final")
}
slog.Error("embedding: get record failed",
"feedback_record_id", args.FeedbackRecordID,
"error", err,
)
// Only suppress retries for not-found; transient DB/network errors should retry.
if errors.Is(err, huberrors.ErrNotFound) {
return nil
}
return fmt.Errorf("get feedback record: %w", err)
}Work(ctx, job) is the method RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. calls for every attempt. The first responsibility is loading the FeedbackRecord we were told to embed. args := job.Args unpacks the typed args that were serialized when the job was enqueued; job.Attempt and job.MaxAttempts (used later) come from River’s job envelope.
The interesting part is the error classification. We check errors.Is(err, huberrors.ErrNotFound) — if the record has been deleted since the job was enqueued, we return nil. That tells RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. “the job succeeded, don’t retry”: there is no record to embed, and retrying forever would be a leak.
Any other error (connection refused, timeout, serialization) returns a wrapped error so RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. schedules a retry. Note the metrics calls: we record the worker error and a failed_final outcome even on the not-found branch — they close the observability loop for the dashboard that tracks embedding health.
slog.Error logs with structured fields. Those feedback_record_id and error keys become first-class labels in our log search, which matters when you are chasing one specific failing job out of thousands.
4. Generate the embedding, retry-aware error handling
hivecfm-hub/internal/workers/feedback_embedding.golines 86–118View full file ↗ text := service.BuildEmbeddingInput(record.FieldLabel, record.ValueText, w.docPrefix)
if text == "" {
return w.handleEmptyText(ctx, args.FeedbackRecordID, args.Model, record, start)
}
embedding, err := w.embeddingClient.CreateEmbedding(ctx, text)
if err != nil {
isLastAttempt := job.Attempt >= job.MaxAttempts
if w.metrics != nil {
w.metrics.RecordWorkerError(ctx, "embedding_api_failed")
if isLastAttempt {
w.metrics.RecordEmbeddingOutcome(ctx, "failed_final")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "failed_final")
} else {
w.metrics.RecordEmbeddingOutcome(ctx, "retry")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "retry")
}
}
if isLastAttempt {
slog.Error("embedding: API failed (final attempt)",
"feedback_record_id", args.FeedbackRecordID,
"error", err,
)
// Return error so River marks the job as failed; otherwise these records never get embeddings and don't show as failed in River UI.
return fmt.Errorf("embedding API (final attempt): %w", err)
}
return fmt.Errorf("embedding API: %w", err)
}BuildEmbeddingInput composes the text we actually send to the model — it concatenates the field label, the value, and any provider-specific prefix. If the result is empty (e.g. the user submitted a blank text answer), we short-circuit into handleEmptyText rather than spending a model call on whitespace.
The embedding client call is the expensive one. When it fails, we classify using isLastAttempt := job.Attempt >= job.MaxAttempts:
- Not the last attempt: we return a wrapped error and let River retry with its configured back-off. Metrics record a
retryoutcome so the dashboard shows the flap, not a permanent failure. - Last attempt: we log at error level, record
failed_final, and still return an error. The comment on the return is important: returning nil here would hide the failure from River’s UI — finished-with-error jobs show up in operational dashboards in a way finished-ok jobs do not.
The pattern — do the same external call, but make the retry-vs-terminal decision explicit — is one you will see repeated in every worker in the Hub.
5. Persist the embedding and record success
hivecfm-hub/internal/workers/feedback_embedding.golines 120–146View full file ↗ err = w.embeddingService.SetEmbedding(ctx, args.FeedbackRecordID, args.Model, embedding)
if err != nil {
if w.metrics != nil {
w.metrics.RecordWorkerError(ctx, "update_failed")
w.metrics.RecordEmbeddingOutcome(ctx, "failed_final")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "failed_final")
}
slog.Error("embedding: set embedding failed",
"feedback_record_id", args.FeedbackRecordID,
"error", err,
)
return fmt.Errorf("set feedback record embedding: %w", err)
}
slog.Info("embedding: stored",
"feedback_record_id", args.FeedbackRecordID,
)
if w.metrics != nil {
w.metrics.RecordEmbeddingOutcome(ctx, "success")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "success")
}
return nil
}Once we have a vector, SetEmbedding stores it on the FeedbackRecord row (in our case, via UPDATE feedback_records SET embedding = $1, embedding_model = $2 with pgvectorpgvectorA Postgres extension that adds a vector column type for similarity search. Used for AI-powered survey insights.). If that write fails, we do not have retry-vs-terminal nuance: a DB write failure is always a retry candidate, because the embedding we just paid for is still valid on the nextNext.jsReact framework used by HiveCFM Core. Handles routing, server rendering, and API routes in one bundle. attempt. Metrics record failed_final because the outcome of this attempt is final; RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run.’s retry will create a fresh attempt.
On success, we emit an info-level log and two metrics: the outcome (success) and the duration bucket. The duration metric is what feeds the p50/p95 latency panels — make sure new workers record both or the dashboards have holes.
Returning nil tells RiverRiverThe Go background-job queue Hub uses. Jobs are rows in Postgres, so there is no separate broker to run. the job is done. The row is now ready for any downstream consumer that queries by vector similarity (see the pgvector page for the search side).
6. The empty-text helper
hivecfm-hub/internal/workers/feedback_embedding.golines 148–195View full file ↗// handleEmptyText clears the embedding for text fields when value_text is empty, or records skip for non-text.
func (w *FeedbackEmbeddingWorker) handleEmptyText(
ctx context.Context,
feedbackRecordID uuid.UUID,
model string,
record *models.FeedbackRecord,
start time.Time,
) error {
if record.FieldType == models.FieldTypeText {
err := w.embeddingService.SetEmbedding(ctx, feedbackRecordID, model, nil)
if err != nil {
if w.metrics != nil {
w.metrics.RecordWorkerError(ctx, "update_failed")
w.metrics.RecordEmbeddingOutcome(ctx, "failed_final")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "failed_final")
}
slog.Error("embedding: clear failed",
"feedback_record_id", feedbackRecordID,
"error", err,
)
return fmt.Errorf("clear feedback record embedding: %w", err)
}
if w.metrics != nil {
w.metrics.RecordEmbeddingOutcome(ctx, "success")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "success")
}
slog.Info("embedding: cleared (empty value_text)",
"feedback_record_id", feedbackRecordID,
)
return nil
}
if w.metrics != nil {
w.metrics.RecordEmbeddingOutcome(ctx, "skipped")
w.metrics.RecordEmbeddingDuration(ctx, time.Since(start), "skipped")
}
slog.Info("embedding: skipped (no value_text)",
"feedback_record_id", feedbackRecordID,
)
return nil
}handleEmptyText handles the “nothing to embed” case. Two sub-cases:
- The field is a text field that is now empty. We call
SetEmbeddingwithnilto clear any previously-stored vector. This keeps the index consistent — otherwise a user who deleted a comment would leave stale embeddings pointing at gone content. - The field is not a text type. We skip entirely, record a
skippedoutcome, and log once. Non-text fields (ratings, checkboxes) legitimately have no embedding; they end up here because they share the same pipeline event envelope.
Note the symmetry with Work() — every exit point records exactly one outcome metric and exactly one log line. That is the invariant the SRE team depends on when reading the embedding health dashboard.
What to read next
- Survey response persistence — the Core side that enqueues these jobs.
- Hub workers — the broader catalog of Hub workers and how they share infrastructure.
- pgvector & similarity search — the read path for the embeddings this worker writes.