Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

flusso

flusso keeps an OpenSearch index in sync with Postgres, driven by declarative config — no cron job, no nightly reindex, no hand-rolled reindex script.

The whole model: two files

A deployment is two kinds of file. That’s the mental model.

FileOne perHoldsReference
flusso.tomldeploymentwhere data comes from, where it goes, which indexes to buildConfiguring a deployment
*.schema.ymlindexwhat one search document looks like — its table, fields, and the related tables that fold inAuthoring schemas

Every field declares its type from a fixed set that bridges a Postgres column and an OpenSearch mapping. A schema is therefore self-describing: flusso derives the full index mapping — and validates the config — without touching a database.

Change a user or one of their orders, and flusso rebuilds the whole users document and re-emits it. It works out which documents a changed row affects, reassembles each, and writes it by a deterministic id — no instructions about what to update.

This manual

  • Getting started — run flusso in three commands, and what Postgres and OpenSearch need first.
  • Authoring schemas — the *.schema.yml format: field types, objects, maps, joins, aggregates, geo, filters, soft-delete, validation.
  • Configuring a deployment — the flusso.toml format, every source and sink option, secrets and { env = "VAR" } references, the FLUSSO_* flag overrides, the index prefix, and logging/telemetry.
  • Querying — the typed query-side client, flusso-query, and its #[derive(FlussoDocument)].
  • Deploying — Docker recipes: smallest image, baking or compiling a flusso.lock, scoped .dockerignore.

The source, the contributor architecture tour, the Helm chart, and the runnable dev/ example all live in the repository on GitHub.

Getting started

Run flusso against the bundled dev stack in three commands, then point it at your own Postgres and OpenSearch.

Quickstart

The dev/ directory is a complete, runnable example — a docker-compose stack (Postgres wired for logical replication, OpenSearch, Dashboards, Prometheus, Grafana), seeded data, and a matching config. With just installed (cargo install just --locked):

just up        # bring the whole stack up and wait for it to be healthy
just check     # validate the config + schemas against the database
just run       # backfill OpenSearch, then follow live changes (serves /status + /metrics)

Then, in another terminal, make changes and watch them stream through:

just psql                                            # make some changes
curl -s localhost:9200/users/_search?pretty          # see them land in OpenSearch
just status                                          # live pipeline status

Run just on its own to see every recipe. The full walk-through — resetting state, inspecting the slot, OpenSearch Dashboards on :5601 — lives in the dev/ README.

ℹ️ Info — No just? Every recipe is a thin wrapper; the raw cargo run -- … and docker compose … commands are in the justfile.

The CLI

Three subcommands. Every flag also reads a FLUSSO_* env var (the flag wins when both are set) — handy for containers.

CommandDoesDatabase?
flusso buildCompile config + schemas into one portable flusso.lock. No secrets baked in ({ env = "VAR" } refs carry through).no
flusso runStream changes through the engine. Like cargo run: with a flusso.toml present it recompiles + rewrites flusso.lock, then runs; with no config it loads the existing lock; --locked runs the lock as-is. Credentials resolve here, at run time.yes
flusso checkValidate and print the fully-typed mapping. --offline skips the database; without it, declared types are also confirmed against live columns.optional
flusso --help
flusso build  --config flusso.toml -o flusso.lock   # build the portable artifact
flusso check  --config flusso.toml                  # validate (+ check vs database)
flusso check  --config flusso.toml --offline        # validate without a database
flusso run                                          # run the compiled flusso.lock
flusso run    --config flusso.toml                  # compile from source and run
flusso run    --skip-backfill                       # resume live capture only

Logging honors RUST_LOG (default info); FLUSSO_LOG_FORMAT=json for structured logs. Set the standard OTEL_EXPORTER_OTLP_ENDPOINT and traces export there too. Every environment variable flusso reads — secrets, the FLUSSO_* flags, telemetry — is collected in Configuring a deployment.

Requirements

flusso doesn’t own Postgres or OpenSearch — it’s a guest in both. A few things have to be true before it runs. The dev/ stack sets all of this up; below is what to replicate against your own infrastructure. Full per-source/per-sink options are in Configuring a deployment.

Postgres (the source)

RequirementDetail
PG 14+, wal_level = logicalA restart-required setting. max_wal_senders / max_replication_slots high enough for flusso plus any other consumers.
A publicationCovers every table any index reads — root tables and every table a join or aggregate pulls from. flusso manages it when the role is privileged enough (see below); otherwise it logs the exact SQL.
A replication slotflusso always creates it on first connect (needs only REPLICATION).
Row identityA primary key (usual case) or an explicit REPLICA IDENTITY on every replicated table. Keyless tables are skipped in backfill and error on a live change.
A role with REPLICATION + SELECTEnough to stream and create the slot. Managing the publication needs more — see below.

ℹ️ Info — managing the publication. flusso derives the table set from the schema and creates/extends the publication itself, exactly as it does the slot — if the source role can. Creating or extending a publication needs table ownership plus CREATE on the database (or superuser), a stronger grant than the read-only role above. When the role can’t, flusso doesn’t fail: it logs the exact CREATE PUBLICATION / ALTER PUBLICATION … ADD TABLE to run, and flusso check prints the same. Set [source] manage_publication = false (or FLUSSO_MANAGE_PUBLICATION=false) to manage it yourself.

⚠️ Warning — Postgres retains WAL until flusso confirms it. A flusso that’s down for a long time means WAL piling up on the server. Drop the slot when you retire a deployment.

OpenSearch (the sink)

RequirementDetail
OpenSearch 2.xAlso speaks Elasticsearch 7.x on the query side via flusso-query.
A reachable HTTP(S) endpointThe sink url. Optional HTTP Basic auth (username / password); tls_verify defaults to true — turn it off only for self-signed dev clusters.
A user that can manage and write flusso’s indexesPlus the small hidden flusso_meta index where seeded state is recorded.

💡 Did you know — flusso owns the index lifecycle. It derives a strict typed mapping per schema and names each index from a hash of that schema (users_<hash>), so a structural change rolls onto a fresh index and re-seeds instead of fighting a mismatched one. The plain logical name (users) is kept as an alias on the current index, queryable without knowing the hash.

Deploying it

  • Container image — the Dockerfile builds a registry-ready, config-less image (mount a config or bake your own flusso.lock). Its demo target bakes the dev config in, which is what just demo runs. See Deploying for the recipes.
  • Kubernetes — the Helm chart deploys flusso as a single instance (one replication slot → replicas: 1) with config via ConfigMap, secrets via env, a Service, and an optional Prometheus ServiceMonitor.

Authoring schemas

One *.schema.yml file describes one search document — its root table, its fields, and how related tables fold in. This guide is the full reference for that format. For the deployment side (flusso.toml), see Configuration.

Quick reference

Fields are written type-first: - <type>: <name>, with siblings the type allows.

fields:
  - keyword: email          # scalar; document key `email`
    required: true
  - has_many: orders        # join; folds a related table in
    table: orders
    foreign_key: user_id
    primary_key: id
    fields: [ { double: total } ]
  - count: orderCount       # aggregate; rolls a related table up
    table: orders
    foreign_key: user_id
NeedType keysJump to
A scalar columntext identifier keyword enum uuid boolean short integer long float double decimal date timestamp binary json customTypes
Same-row sub-objectobjectObjects
Dynamic-key object (translations, …)mapMaps
A geographic pointgeoGeo points
Fold in a related tablebelongs_to has_one has_many many_to_manyJoins
Roll a related table upcount sum avg min max idsAggregates
A fixed valueconstantFields
Treat rows as deletedtop-level soft_deletesoft_delete
Index only a subset of rowstop-level filtersRoot filters

ℹ️ Info — Two JSON Schemas are the machine-readable source of truth for the file formats: config.schema.json and index.schema.yml. Point an editor at them for completion and inline validation.


*.schema.yml

Each schema file describes one search document: the root table it is built from, the fields it contains, and how related tables fold in.

Top-level keys

KeyTypeRequiredDefaultDescription
versionintyesSchema format version. Only 1 is supported.
tablePostgres identifieryesThe root table the document is built from.
schemaPostgres identifiernopublicThe database schema the root table lives in.
primary_keyPostgres identifiernoThe root table’s primary-key column. Used to derive the document id and to resolve which documents a related-row change affects. Relations and reverse-resolution require it.
doc_idstringnoNot supported yet — setting it is a hard error. The document _id is always derived from primary_key. (A non-pk _id needs the id value at delete time, which the pk-keyed tombstone path can’t supply; tracked as a follow-up.)
soft_deleteobjectnoTreat rows as deleted based on a column/field rather than a physical DELETE. See below.
filterslistnoRoot filters: only root rows matching every filter become documents. See below.
fieldslistyesThe document’s fields. See Fields.
version: 1
table: users
schema: public
primary_key: id

soft_delete

When soft_delete is set, a row matching the soft-delete condition emits a tombstone (a delete to the sink) instead of an upsert. Key it off either a mapped field or a raw column, and optionally narrow it with when filters.

# Off a column: users.deleted = true → delete.
soft_delete:
  column: deleted

# Off a mapped field, narrowed to a subset of rows.
soft_delete:
  field: status
  when:
    - { column: archived, op: eq, value: true }
KeyRequiredDescription
column or fieldexactly oneThe column (Postgres identifier) or mapped field (field name) signalling deletion.
whennoA list of filters; the soft-delete applies only to matching rows.

Root filters

When only a subset of a table should be an index, a top-level filters list (same filter forms as joins use) scopes which root rows become documents:

version: 1
table: item
primary_key: id
filters:
  - { column: item_type, op: eq, value: serialized }
  - { column: archived_at, op: is_null }

A row outside the set never produces a document; a row that leaves the set (an UPDATE that stops matching) emits a tombstone on its next rebuild, exactly like soft_delete — both fold into the document query’s WHERE, so “no row came back” means “this document should not exist”. A row that enters the set upserts. Backfill walks the whole root table and lets the same predicate decide, so filtered-out rows cost a no-op delete during seeding.


Fields

fields is a list. Each item is written type-first: a single type key whose value is the document key, plus the siblings that type allows.

fields:
  - keyword: email        # a `keyword` scalar; document key `email`
    required: true
  - text: bio             # analyzed full text
    required: false
  - integer: age
    required: false

The type key is one of:

  • a scalar type — text, identifier, keyword, enum, uuid, boolean, short, integer, long, float, double, decimal, date, timestamp, binary, json (see Types) — or custom (an explicit Postgres/OpenSearch pair);
  • geo — a geographic point (see Geo points);
  • object — a same-row sub-object (see Objects);
  • map — a dynamic-key object (see Maps);
  • belongs_to / has_one / has_many / many_to_many — a related table folded in (see Joins);
  • count / sum / avg / min / max — a rollup over a related table (see Aggregates);
  • constant — a fixed value.

There is exactly one type key per field. Which siblings a field accepts depends on that type:

SiblingApplies toDescription
requiredscalar, geo, map, to-one jointrue forces a scalar/geo/map leaf non-null (nullable otherwise); on a to-one join (belongs_to/has_one) it maps the object non-null — see Joins. Rejected on to-many joins and aggregates (their nullability is structural).
columnscalar, geo, belongs_toThe source column — for a belongs_to, this table’s column pointing at the related row. Defaults to the document key when omitted.
optionstypes with a mappingExtra OpenSearch mapping properties merged beside the derived type (e.g. analyzer, format, scaling_factor).
transformsscalarValue transforms to apply. See Transforms.
defaultscalarValue to coalesce a null column to. Must be a scalar (string, number, bool, or date) — an array/object/binary default is a hard error.
postgres / opensearchcustomThe Postgres types accepted and the OpenSearch type emitted.
lat / longeoThe two coordinate columns (two-column form).
valuesmapMandatory — the leaf type shared by every value (a leaf kind: text/keyword/number/date). See Maps.
fieldsobject, joinsThe nested projection.
table, primary_key, column/foreign_key/through, order_by, filters, limitjoinsWhich key sibling applies depends on the verb. See Joins.
table, column, value_type, element_type, foreign_key, through, filtersaggregatesSee Aggregates.
valueconstantThe fixed value (null/absent renders as JSON null).
fields:
  # column source, renamed + transformed + defaulted
  - keyword: email
    column: email_address
    required: false
    transforms: [lowercase, trim]
    default: "unknown@example.com"

Objects

An object nests sibling columns of the same row under one document key, without reading a related table. It renders as an OpenSearch object and is never null; its members declare their own types.

- object: address
  fields:
    - keyword: street
      column: address_street
      required: true
    - keyword: city
      column: address_city
      required: true
    - keyword: zip
      column: address_zip
      required: false

{ "address": { "street": …, "city": …, "zip": … } }, all from one row.

An object differs from a to-one join (belongs_to/has_one): the join reads a related table by key, an object stays put on the current row. Optional options pass extra properties to the object mapping.

Maps

A map is a dynamic-key object over a json/jsonb column: the keys are runtime-determined, but every value shares one leaf type. The motivating case is translations — {"en": "…", "it": "…"} with an open-ended language set, where declaring each language as its own field would defeat the purpose.

- map: title
  values: text          # the leaf type of every value (required)
  required: true

values must be a leaf kindtext/identifier (text), keyword/enum/ uuid (keyword), a numeric, or date/timestamp. boolean, binary, json, geo, and custom are rejected. The field maps to an OpenSearch object with dynamic: true (injected automatically — an explicit dynamic in options wins), so unmapped keys are accepted and stay searchable rather than rejected by the index’s dynamic: strict. column defaults to the document key.

On the query side a map gets a typed handle (flusso-query): .key("it") returns a fully-typed leaf of the declared kind, and a text map also offers a cross-key .search(..) with per-key preference. See Querying.

Types

A scalar field declares its type from a fixed set. Each type bridges a Postgres column type and an OpenSearch mapping type, so the schema describes the document fully — flusso derives the index mapping (and validates a config) without a database. Shorthand fields and columns with no type default to keyword.

typePostgresOpenSearchNotes
texttext, varchartextAnalyzed natural-language full text (descriptions, bios) — the default analyzed type. Plain tokenize + accent/case fold.
identifiertext, varchartextAnalyzed identifier-like short text (names, SKUs, codes, statuses) — splits on punctuation/case so C-01234 is found by C01234, c-01234, or 01234.
keywordtext, varcharkeywordExact, aggregatable.
enumtext, varchar, PG enumkeywordA closed string set stored as text, indexed exactly.
uuiduuidkeyword
booleanbooleanboolean
shortsmallint / int2short
integerinteger / int4integer
longbigint / int8long
floatreal / float4float
doubledouble precision / float8double
decimalnumeric / moneydoubleLossy; use a custom scaled_float when exactness matters.
datedatedate
timestamptimestamp(tz), timedate
binarybyteabinary
jsonjson, jsonbobject

(A geographic point is a geo field, not a scalar type — see Geo points.)

For anything the named types don’t cover, declare a custom field with the OpenSearch type and the Postgres types it accepts:

- custom: price
  postgres: [numeric]
  opensearch: scaled_float
  required: false
  options: { scaling_factor: 100 }

options carries any extra OpenSearch mapping properties (analyzers, formats, …) merged beside the derived type. Objects, joins, aggregates, and geo points carry their own type keys rather than a scalar type; their shape is structural.

Production-ready defaults. The OpenSearch sink does not emit your text/keyword fields bare. By default it attaches a strong analyzer and a set of subfields (keyword, keyword_lowercase, text) so search, exact filtering, and case-insensitive sort all work out of the box — see Index analysis & subfields. Anything in options overrides the default for that field.

text vs identifier

Both are analyzed (full-text searchable) text fields; they differ only in the analyzer:

  • text — natural language (descriptions, bios, comments). The default analyzed type; tokenizes on word boundaries with accent/case folding.
  • identifier — short structured strings (names, SKUs, codes, statuses). Splits on punctuation/case so C-01234 is found by C01234, c-01234, or 01234.
fields:
  - text: bio             # natural-language analyzer + default subfields
    required: false
  - identifier: sku       # punctuation/case-splitting analyzer + default subfields
    required: false

(Use keyword instead for exact match, sort, or aggregation rather than full-text search.) Both apply only to scalar column fields, and an explicit analyzer in options always wins over the type’s default. The analyzers themselves are documented in Index analysis & subfields.

Geo points

A geo field is a geographic point → OpenSearch geo_point. Two forms:

Two columns — a latitude and a longitude column assembled into a point. A missing coordinate makes the whole point null (never {lat: null, lon: null}, which OpenSearch rejects):

- geo: location
  lat: latitude
  lon: longitude
  required: false

Single column — a column already holding a geo_point-shaped value: a json/jsonb {"lat": …, "lon": …} or [lon, lat], or a text "lat,lon":

- geo: location
  column: location_json
  required: false

PostGIS geometry and PG-native point aren’t accepted directly (they serialize as WKB / (x,y), which OpenSearch won’t take); expose a generated jsonb/text column in one of the shapes above. The two-column form needs no such column — flusso assembles the point in the document query.

Transforms

A list applied in sequence to a column value before it lands in the document:

TransformEffect
lowercaseLowercase the string value.
trimStrip leading/trailing whitespace.
- keyword: email
  required: false
  transforms: [trim, lowercase]

Joins

Fold rows from a related table into the document as nested documents. The join’s relationship verb is its type key, and the verb names which table holds the key.

Type keyThe key lives on…Reads asRenders as
belongs_tothis table (column)“my column points at the related row”object (nullable; required: true → non-null)
has_onethe related table (foreign_key)“one related row points back at me”object (nullable; required: true → non-null)
has_manythe related table (foreign_key)“many related rows point back at me”nested array (never null)
many_to_manya junction table (through)“we connect through a junction”nested array (never null)

The fields — the projection from each related row — and the related table’s primary_key are siblings of the type key. The field reading that primary_key is marked non-null automatically, like the root primary_key.

# My column points at them: embed the user a `created_by` column references.
# `column` defaults to the field name — here, the FK column IS `created_by`.
- belongs_to: created_by
  table: users
  primary_key: id
  fields:
    - keyword: email
      required: true
    - text: name
      required: false

# Their column points at me: fold in the rows holding my key.
- has_many: orders
  table: orders
  foreign_key: user_id
  primary_key: id
  order_by:
    - { column: created_at, direction: desc }
  limit: 5
  fields:
    - integer: id
      required: false
    - double: total
      required: true
    - keyword: status
      required: true
KeyTypeRequiredDescription
(type key)field nameyesbelongs_to, has_one, has_many, or many_to_many; its value is the document key.
tablePostgres identifieryesThe related table.
primary_keyPostgres identifieryesThe related table’s primary key. The projected field reading it is forced non-null.
columnPostgres identifierbelongs_to onlyThis table’s column pointing at the related row. Defaults to the field name (so belongs_to: created_by reads the created_by column).
foreign_keyPostgres identifierhas_one/has_manyThe related table’s column pointing back at the parent.
throughobjectmany_to_manyA junction table.
requiredboolto-one onlybelongs_to/has_one only. true maps the object non-null; omitted means nullable. Rejected on has_many/many_to_many (always a non-null array).
filterslistnoFilters narrowing which related rows are folded in.
order_bylistnoOrdering — a list of { column, direction }, where direction is asc (default) or desc. Not allowed on belongs_to (its target is unique); on has_one it picks which row becomes the object.
limitint ≥ 1has_many/many_to_many onlyCap the number of related rows folded in (the to-one verbs imply their own LIMIT 1).
fieldslistyesThe fields projected from each related row.

Key arity rule: a join takes exactly the key sibling its verb implies — column for belongs_to, foreign_key for has_one/has_many, through for many_to_many. Anything else is a load-time error naming the right one.

A belongs_to target that changes — or is deleted — re-emits every document pointing at it: flusso finds the referrers on the parent table itself (WHERE column = <changed key>), so a deleted target rebuilds those documents with a null object rather than leaving them stale.

The through object (junction table for many-to-many):

KeyRequiredDescription
tableyesThe junction table.
left_keyyesColumn joining the junction to the parent.
right_keyyesColumn joining the junction to the related table.
- many_to_many: tags
  table: tags
  through:
    table: post_tags
    left_key: post_id
    right_key: tag_id
  primary_key: id
  fields:
    - keyword: name
      required: true

Aggregates

Reduce rows from a related table to a single value. The operation is the type key: count, sum, avg, min, max, or ids.

A count is always a non-null long and an avg a nullable double, so they take no value_type. A sum/min/max mirrors the aggregated column, so it must declare a column and a value_type.

- count: orderCount
  table: orders
  foreign_key: user_id

- sum: lifetimeValue
  table: orders
  column: total
  value_type: decimal
  foreign_key: user_id
  filters:
    - { column: status, op: eq, value: paid }
KeyTypeRequiredDescription
(type key)field nameyescount/sum/avg/min/max/ids; its value is the document key.
tablePostgres identifieryesThe related table.
columnPostgres identifierconditionalThe column to reduce. Required for sum/avg/min/max; not used by count/ids.
value_typetype nameconditionalThe result type. Required for sum/min/max (it mirrors the column); not used by count/avg/ids.
element_typetype nameconditionalRequired for ids (and only ids): the scalar type of each collected primary key — long or keyword.
foreign_keyPostgres identifierconditionalThe aggregated table’s column pointing back at the parent (exactly one of foreign_key xor through).
throughobjectconditionalJunction table for aggregating across many-to-many.
filterslistnoFilters restricting which rows count.

ids collects the related table’s primary key into a flat scalar array (it takes no column — the key is always the related table’s PK). OpenSearch has no array type, so the field’s mapping type is just the element type (element_type: longtype: long, keywordtype: keyword); the value is multi-valued. An empty relation yields [], never null, so the field is non-null (project it as a bare Vec<…>, not Option<Vec<…>>).

# one-to-many: orders.user_id points back at this row
- ids: orderIds
  table: orders
  foreign_key: user_id
  element_type: long

# many-to-many: collected straight off the junction's right_key
- ids: tagIds
  table: tags
  through: { table: post_tags, left_key: post_id, right_key: tag_id }
  element_type: long

Filters

Filters narrow which related rows a join or aggregate sees, which rows a soft_delete applies to, and — as the top-level filters key — which root rows become documents at all. Three forms:

Raw SQL — an escape hatch for conditions the structured forms can’t express:

- { raw: "amount > 0 AND currency = 'USD'" }

Null check — no value operand:

- { column: deleted_at, op: is_null }
- { column: confirmed_at, op: is_not_null }

Value comparison — operator plus a value whose shape matches its arity:

- { column: status, op: eq,  value: paid }
- { column: status, op: in,  value: [paid, shipped] }
- { column: total,  op: between, value: [10, 100] }
opValue shape
eq, neq, lt, lte, gt, gte, like, ilikea single scalar
in, not_ina list
betweena list of exactly two values [lower, upper]
is_null, is_not_null(no value)

A value op with a missing value, a list op given a scalar, or a between with other than two values is a load-time error.


Conventions

Identifiers

Two distinct identifier rules apply depending on what is being named:

RuleApplies toPatternNotes
Postgres identifiertable, column, schema, index, and sink names^[a-z_][a-z0-9_]*$, max 63 charsLowercased and trimmed on load, matching Postgres’ folding of unquoted identifiers. A name that isn’t a valid identifier this way must be addressed explicitly (e.g. set column:).
Field namethe document key a field lands under (field:)^[a-zA-Z_][a-zA-Z0-9_]*$, max 63 charsCase is preservedfield: orderCount stays camelCase in the emitted document. Only trimmed.

The split is deliberate: the value comes from a Postgres column (lowercase identifier) but lands under a document key you choose (which may be camelCase to suit the search index).


Validation, in one place

Loading enforces — beyond what the file format itself can express — that:

  • the schema version is supported (only 1);
  • all table/column/schema/index/sink names are valid Postgres identifiers, and field names are valid field-name identifiers;
  • each field has exactly one type key, and only the siblings that type allows;
  • a join carries exactly the key sibling its verb implies — column for belongs_to (defaulting to the field name), foreign_key for has_one/has_many, through for many_to_many — and the to-one verbs take no limit (nor order_by, for belongs_to);
  • an aggregate specifies exactly one of foreign_key or through;
  • sum/avg/min/max aggregates carry a column, and sum/min/max also declare a value_type (it mirrors the column);
  • an ids aggregate declares an element_type (a scalar type) and takes no column or value_type; element_type is rejected on every other op;
  • a geo field gives either lat and lon, or a single column;
  • a between filter has exactly two values, and in/not_in get a list.

A failure at any of these stops the load with a specific error naming the cause. None of it needs a database. When the source is reachable, flusso check additionally confirms each declared type and nullability against the live columns and reports any disagreement.


A complete example

users.schema.yml:

version: 1
table: users
schema: public
primary_key: id

soft_delete:
  column: deleted

fields:
  - integer: id
    required: false
  - keyword: email
    required: true
    transforms: [lowercase, trim]
  - text: name
    required: false

  - has_many: orders
    table: orders
    foreign_key: user_id
    primary_key: id
    order_by:
      - { column: id, direction: asc }
    fields:
      - integer: id
        required: false
      - double: total
        required: true
      - keyword: status
        required: true

  - count: orderCount
    table: orders
    foreign_key: user_id

A change to a users row — or to any of that user’s orders — rebuilds the whole users document and re-emits it to every sink. Setting users.deleted = true emits a tombstone instead.

Configuring a deployment

One flusso.toml file describes a deployment — the source database, the sink destinations, and which indexes to build — plus the environment that feeds it secrets and runtime flags. This guide is the single reference for that file and that environment. (Each index’s own *.schema.yml is covered in schema authoring.)

Quick reference

Looking for…Jump to
Every flusso.toml top-level keyThe flusso.toml format
Postgres source optionsPostgres
OpenSearch sink options + defaultsOpenSearch
Which subfield to query (exact / full-text / sort)Index analysis & subfields
Stdout sink envelopeStdout
Secrets, { env = "VAR" }, the reserved overrides + precedenceSecrets & connection values
The FLUSSO_* flag env varsCLI flags as env vars
Status / metrics / control ports + authHTTP surfaces
Sharing one cluster across deploymentsIndex prefix
RUST_LOG, OTLP, PrometheusLogging & telemetry
A copy-paste env blockCheat sheet

flusso reads env vars for three jobs: filling in config values (the secrets story), setting CLI flags (every flag has a FLUSSO_* twin — CLI flags as env vars), and logging & telemetry (below).

Every key + default

KeyWhereDefaultPurpose
on_errortop-level / [[index]]"stop"item-rejection policy — stop or skip (on_error)
prefixtop-levelprepend to every owned index name (index prefix)
type[source]postgres
connection_url[source]full URL or parts; DATABASE_URL overrides
manage_publication[source]trueauto-create/extend the publication
type[sinks.<name>]opensearch or stdout
urlopensearch sinkcluster URL; <NAME>_OPENSEARCH_URL overrides
username / passwordopensearch sinkHTTP Basic auth
tls_verifyopensearch sinktrueverify TLS certs
batch_sizeopensearch sink1000docs per bulk chunk
max_bytesopensearch sink10 MiBbytes per bulk chunk
timeout_secsopensearch sink30HTTP request timeout
max_retriesopensearch sink3transient-failure retries
pipelineopensearch sinkingest pipeline applied on index
number_of_shardsopensearch sink1primary shards per index
number_of_replicasopensearch sink1replica shards per index
refresh_intervalopensearch sink"10s"steady-state refresh ceiling ("-1" disables)
text_analysisopensearch sinkbuiltinanalyzer toolkit — builtin or icu
auto_subfieldsopensearch sinktrueauto subfields on text/keyword
prettystdout sinkfalsepretty JSON instead of NDJSON
name / schema / enabled[[index]]logical name / schema path / build on this run
public_address / private_address[server]127.0.0.1:9464 / :9465HTTP bind addresses (HTTP surfaces)

ℹ️ Infoschema::load("flusso.toml") is the front door: it reads the config and every schema it references, validates both layers, and returns one fully-validated Config. Schema paths resolve relative to the config file’s directory. Two JSON Schemas are the machine-readable source of truth — config.schema.json and index.schema.yml; point an editor at them for completion.


The flusso.toml format

Top-level table. Only [source] is required.

KeyRequiredDescription
[source]yesThe database to read from.
[sinks.<name>]noNamed destinations. Zero or more; each key is a sink name (a Postgres identifier).
[[index]]noThe indexes to build. Zero or more array entries.
on_errornoWhat to do when a sink rejects a document at the item level: "stop" (default) or "skip". See on_error.
prefixnoLiteral string prepended to every index name flusso owns (indexes, aliases, flusso_meta), so deployments can share one cluster — e.g. prefix = "dev_"dev_users. Overridable at runtime by --index-prefix / FLUSSO_INDEX_PREFIX. See Index prefix.

[source]

The database documents are read from — one per deployment. type selects the kind:

typeReference
postgresPostgres source
[source]
type = "postgres"
connection_url = "postgresql://user:pass@localhost:5432/mydb"

Connection options (full-URL and individual-parts forms, the DATABASE_URL override) and capture behavior live in Sources and Secrets & connection values.

[sinks.<name>]

Named destinations; each key is a sink name (a Postgres identifier) and type selects the kind. Define more than one and flusso fans out — every document is written to all of them. If no sinks are defined, the CLI falls back to a stdout sink.

typeReference
opensearchOpenSearch sink
stdoutStdout sink
[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }

[sinks.audit]
type = "stdout"
pretty = true

Each type’s full option set and behavior is documented in Sinks.

[[index]]

One array entry per index to build.

KeyTypeRequiredDescription
namePostgres identifieryesThe logical index name — the pipeline’s stable identity.
schemapathyesPath to the index’s *.schema.yml, relative to the config file. Must end in .yml/.yaml.
enabledboolyesWhether this index is built on this run.
on_error"stop" | "skip"noOverride the global on_error for this index. Omitted inherits the global default.
[[index]]
name = "users"
schema = "users.schema.yml"
enabled = true

The *.schema.yml referenced here is documented in schema authoring.

on_error

When a sink accepts a flush but rejects a specific document — a mapping conflict, a value the destination can’t index — on_error decides what happens. It governs only these item-level rejections; a flush-wide failure (the destination unreachable, the whole request refused) always stops the run.

ValueBehavior
"stop" (default)Stop the run. The batch is left unconfirmed and redelivered on restart, so a persistently-bad document halts sync until the data is fixed or the policy changes. Dropping data is opt-in.
"skip"Quarantine the document (logged, counted in flusso.documents.quarantined and the /status documents_quarantined) and continue. The rest of the batch is applied and acked; the document never lands until its source row changes again.

A global on_error is the default for every index; override it per index with on_error inside an [[index]] entry. The policy is operational, not part of the document shape, so changing it never triggers a reindex.

on_error = "stop"   # global default

[[index]]
name = "analytics"
schema = "analytics.schema.yml"
enabled = true
on_error = "skip"   # this index tolerates bad rows

prefix

See Index prefix for the full behavior, the runtime overrides, and the rules for sharing one OpenSearch cluster across deployments.


The model

One source, many sinks. Source, sink, and the in-process queue are all trait objects, so backends swap without touching the engine. Today that’s Postgres in, OpenSearch (or stdout) out — the only backends documented here.


Sources

A source is where rows come from. There is exactly one per deployment, and one type today.

Postgres

[source]
type = "postgres"
connection_url = "postgresql://user:pass@localhost:5432/mydb"
manage_publication = true   # optional; default true

flusso follows Postgres’ logical replication stream to capture changes, and snapshots tables to seed an index before following live changes.

KeyTypeDefault
connection_urlURL / partssee below
manage_publicationbooltrueauto-create/extend the publication when privileged; see capture

Connection

connection_url takes one of two shapes.

A full URL — a string or env_or_value. Must match ^(postgresql|postgres)://…:

connection_url = "postgresql://user:pass@localhost:5432/mydb"
connection_url = { env = "DATABASE_URL" }

Individual partsdatabase is required; the rest default:

PartTypeDefault
hoststring127.0.0.1
portint (1–65535)5432
userstringpostgres
passwordenv_or_valueoptional
databasestringrequired
[source.connection_url]
host = "127.0.0.1"
port = 5432
user = "postgres"
password = { env = "PGPASSWORD" }
database = "mydb"

Either shape can be overridden by a reserved deployment variable, so the same config travels across environments unedited — see Secrets & connection values for the override and precedence rules.

How it captures changes

  • Logical replication (WAL). flusso consumes a logical replication slot. The slot is created automatically if it does not exist. The publication is also managed automatically: flusso derives the full table set from your schema (root tables plus every table a join or aggregate reads) and creates or extends the publication to cover it — provided the source role can (it must own those tables and hold CREATE on the database, or be superuser). When it can’t, flusso logs the exact CREATE/ALTER PUBLICATION SQL instead and keeps going; flusso check prints the same coverage report. Set manage_publication = false (or FLUSSO_MANAGE_PUBLICATION=false / --manage-publication false) to opt out and manage the publication yourself. Slot and publication names are CLI flags (--slot, --publication, both defaulting to flusso).
  • Backfill. Before live capture, the engine asks each sink whether an index is already seeded and, for those that aren’t, snapshots the root tables to seed them. --skip-backfill resumes live capture only.
  • Requires wal_level = logical on the server. See the dev/ environment for a ready-to-run Postgres configured for this.

Sinks

A sink is where assembled documents land. Each entry under [sinks] is a named destination — the key is a sink name (a Postgres identifier) and type selects the kind:

[sinks.primary]            # name: "primary"
type = "opensearch"
url = "https://localhost:9200"

[sinks.audit]              # name: "audit" — documents go here too (fan-out)
type = "stdout"
pretty = true

OpenSearch

[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }
batch_size = 2000

Writes documents to an OpenSearch cluster via the bulk API — the sink you deploy.

url, username, and password each accept an env_or_value (a literal or a { env = "VAR" } reference resolved at run time), and can also be supplied or overridden per sink via reserved deployment-override variables — naming and precedence in Secrets & connection values.

KeyTypeDefaultDescription
urlenv_or_value— (required)Base URL of the cluster, e.g. https://search.example.com:9200.
usernameenv_or_valueHTTP Basic Auth username.
passwordenv_or_valueHTTP Basic Auth password.
tls_verifybooltrueVerify TLS certificates. Set false only for local development.
batch_sizeint ≥ 11000Maximum documents per bulk-request chunk.
max_bytesint10485760 (10 MiB)Maximum bytes per bulk chunk; within OpenSearch’s recommended 5–15 MB range, well under the 100 MB http.max_content_length default. A single document larger than this is sent on its own.
timeout_secsint ≥ 130HTTP request timeout, in seconds.
max_retriesint ≥ 03Additional retry attempts on transient failures (exponential backoff).
pipelinestringOptional OpenSearch ingest pipeline applied on every index operation.
number_of_shardsint ≥ 11Primary shards for each created index.
number_of_replicasint ≥ 01Replica shards for each created index.
refresh_intervalstring"10s"OpenSearch refresh_interval applied to each index after seeding — the steady-state visibility ceiling (e.g. "10s", "1s", or "-1" to disable auto-refresh). flusso forces an immediate refresh whenever the pipeline catches up, so this only bounds staleness while a backlog drains (see below).
text_analysisbuiltin | icubuiltinAnalysis backend for the flusso_* analyzers (see below). icu requires the analysis-icu plugin on every node.
auto_subfieldsbooltrueAuto-enrich text/keyword fields with a good analyzer and subfields. A field’s explicit options always win; set false to emit fields bare.

How it owns its indexes:

  • Explicit, fully-typed mapping. The sink creates each index up front from the resolved schema mapping, dynamic: strict — field types come from the schema, not OpenSearch’s dynamic guesses, and only configured fields are accepted. An index that already exists is left untouched.
  • Hashed name over generations. The addressable name is {logical}_{hash} (the hash derives from the parsed index schema) — itself a hash alias over a concrete generation index {logical}_{hash}_{gen} that holds the data. A structural schema change moves the hash, so the sink writes a fresh alias + generation (re-seeded from scratch) rather than into the old, mismatched shape. An on-demand reindex builds the next generation behind the same hash alias and repoints atomically when it’s seeded. flusso and the flusso-query client address {logical}_{hash}; the generation detail is documented in the flusso-sinks-opensearch crate.
  • Convenience alias. The bare logical name (users) is also kept as an alias on the current generation, so a human or ad-hoc tool can GET /users/_search without knowing the hash. Best-effort: if it fails (e.g. the cluster already has a real index named like the alias), flusso logs a warning and carries on — correctness never depends on it.
  • Refresh adapts to the backlog. Created with auto-refresh disabled (refresh_interval: -1) for fast bulk seeding; on seeding completion the index is handed the configured refresh_interval (default "10s") — the steady-state visibility ceiling. A flush also forces an immediate refresh whenever the pipeline has caught up (no backlog behind the batch), so search is fresh when traffic is light but bulk indexing stays cheap while a backlog drains. The refresh_interval only bounds staleness during sustained backlog — raise it for more write throughput under load, lower it (toward 1s) for fresher search while behind.
  • Production-ready defaults. Created indexes ship a tuned analysis block and, unless auto_subfields is off, well-shaped text/keyword fields — see Index analysis & subfields.
  • Seeding markers. Seeded state is persisted in a hidden flusso_meta index, so a restart skips a completed backfill instead of redoing it.

Index analysis & subfields

The sink creates every index with a good search setup out of the box. flusso owns the index (mapping + analyzers + subfields); your application owns the queries. The notes below say which subfield to target for each job.

Analyzers (always defined, named flusso_*):

NameWhat it does
flusso_codeThe type: identifier analyzer (and the analyzer on a keyword field’s text subfield). Splits on punctuation, case, and letter↔digit boundaries, then lowercases and folds accents. C-01234 indexes as c-01234, c01234, c, 01234, so it’s found by C01234, c-01234, or 01234 — but not by a fuzzy c1234 (add fuzziness on the query side if you want that). Tuned for identifier-like short text.
flusso_textThe default for type: text — natural language. Plain tokenize + lowercase + accent fold, no code-splitting.
flusso_lowercaseA normalizer (single token, no splitting) for case- and accent-insensitive exact match and sort.

With text_analysis = "icu" the folding/tokenizing swaps to the analysis-icu plugin (icu_tokenizer / icu_folding / icu_normalizer) for stronger multilingual handling — proper CJK/Thai segmentation and folding across every script. The plugin must be installed on every node (opensearch-plugin install analysis-icu) or index creation fails; builtin (the default) needs no plugins.

Default subfields (when auto_subfields is on — the default):

Field typeShapeQuery each subfield for…
textanalyzer: flusso_text + .keyword + .keyword_lowercasethe field itself → full-text search; .keyword → exact filter / aggregation / exact sort; .keyword_lowercase → case-insensitive sort & exact lookup.
keyword.text (flusso_code) + .keyword_lowercasethe field itself → exact term / aggregation; .text → full-text search; .keyword_lowercase → case-insensitive sort.

keyword subfields cap at ignore_above: 256. Other types (long, date, boolean, …) and the object/nested containers are emitted as-is. Any key you set in a field’s options overrides the auto default for that field — e.g. your own analyzer replaces flusso_code, and your own fields replaces the auto subfields wholesale.

Example query against a text field name, precise (all terms must match) and case/punctuation-insensitive:

{ "query": { "match": { "name": { "query": "C-01234", "operator": "and" } } } }

Stdout

[sinks.audit]
type = "stdout"
pretty = true

Writes each operation to standard output as a JSON envelope — for development and piping into jq.

KeyTypeDefaultDescription
prettyboolfalsePretty-print JSON instead of compact one-line NDJSON.

Every envelope carries provenance and bookkeeping so a stream is self-describing: which sink and version produced it (sink, version), when (ts), in what order (seq), the index, the op (upsert / delete) and id, plus a meta summary (top-level field count and serialized byte size). An upsert carries the document; a delete does not.

{"document":{"email":"ada@x.io"},"id":"42","index":"users","meta":{"bytes":20,"fields":1},"op":"upsert","seq":1,"sink":"stdout","ts":"2026-06-03T10:20:30.123Z","version":"0.1.0"}
{"id":"7","index":"users","op":"delete","seq":2,"sink":"stdout","ts":"2026-06-03T10:20:30.124Z","version":"0.1.0"}

Logs go to stderr, so stdout stays a clean data stream.


Secrets & connection values

flusso never bakes a secret into a compiled config. A flusso.lock carries only the names of the variables to read; the real values are read in the environment that runs the pipeline. Compile in CI, run in prod, and the secret never rides along in between.

env_or_value references

Anywhere a secret or connection string is expected in flusso.toml, give either a literal string or a reference to an environment variable:

password = "literal-secret"          # literal — carried as-is
password = { env = "OS_PASSWORD" }   # read from $OS_PASSWORD when the pipeline runs

Either form is accepted wherever this guide says a value is an env_or_value. The variable name is yours. Resolution is deferred to run time — which lets a compiled artifact travel without baking in its secrets. An unset variable fails at run time, not compile time — by design, so the compile step needs no secrets.

Reserved deployment-override variables

A few well-known names act as a deployment override layer: set them and the same flusso.toml works across environments unedited (12-factor). When set, they take priority over the file value (and the override is logged at startup):

VariableOverrides / fillsNotes
DATABASE_URLthe source connection_urlThe source is a singleton, so one well-known name is unambiguous.
<SINK>_OPENSEARCH_URLa sink’s url<SINK> is the uppercased sink name[sinks.primary]PRIMARY_OPENSEARCH_URL.
<SINK>_OPENSEARCH_USERNAMEa sink’s usernameSame naming.
<SINK>_OPENSEARCH_PASSWORDa sink’s passwordSame naming.

The per-sink prefix means several OpenSearch sinks never collide (PRIMARY_…, SECONDARY_…).

Precedence

When more than one source could supply a value, highest wins:

  1. An explicit { env = "X" } reference — names its own source, never overridden by a reserved variable. (If X is unset, that’s an error.)
  2. The reserved variable, if set — overrides a literal in the file and fills a value the file omitted.
  3. The literal value in the config.
  4. Otherwise → an error, for anything required (the source URL, a sink url).

Shortcut: “I asked for a specific variable” beats “the deployment set the well-known one” beats “whatever’s written in the file.”

[source]
type = "postgres"
connection_url = "postgres://localhost/dev"   # $DATABASE_URL wins if set

[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"                 # $PRIMARY_OPENSEARCH_URL wins if set
# username / password omitted → filled from
# $PRIMARY_OPENSEARCH_USERNAME / $PRIMARY_OPENSEARCH_PASSWORD

CLI flags as env vars

Every flusso flag also reads a FLUSSO_* environment variable. The flag wins when both are set — env is the fallback.

VariableFlagCommands
FLUSSO_CONFIG--configbuild, check, run
FLUSSO_OUT--outbuild
FLUSSO_LOCK--lockrun
FLUSSO_LOCKED--lockedrun
FLUSSO_SLOT--slotrun
FLUSSO_PUBLICATION--publicationrun, check
FLUSSO_MANAGE_PUBLICATION--manage-publicationrun, check
FLUSSO_SKIP_BACKFILL--skip-backfillrun
FLUSSO_PRETTY--prettyrun
FLUSSO_QUEUE_CAPACITY--queue-capacityrun
FLUSSO_PUBLIC_ADDRESS--public-addressrun
FLUSSO_PRIVATE_ADDRESS--private-addressrun
FLUSSO_ADMIN_USER--admin-userrun, indexes, reindex
FLUSSO_ADMIN_PASSWORD--admin-passwordrun, indexes, reindex
FLUSSO_SERVER--serverindexes, reindex
FLUSSO_LAG_POLL_SECS--lag-poll-secsrun
FLUSSO_INDEX_PREFIX--index-prefixrun
FLUSSO_OFFLINE--offlinecheck
FLUSSO_FORMAT--formatcheck
FLUSSO_SCHEMAthe schema-kind argumentschema

flusso <cmd> --help shows the matching [env: FLUSSO_…] next to each flag.

The two HTTP surfaces’ bind addresses also fall back to a [server] table in flusso.toml — see HTTP surfaces.


HTTP surfaces

flusso serves two HTTP surfaces; both read the daemon’s live status.

SurfaceDefault bindAuthEndpoints
Public (read-only)127.0.0.1:9464none/healthz /readyz /status /metrics
Private (control)127.0.0.1:9465HTTP Basic/indexes, /reindex
  • Bind address--public-address / --private-address, the FLUSSO_PUBLIC_ADDRESS / FLUSSO_PRIVATE_ADDRESS env vars, or a [server] table in flusso.toml. Precedence: flag > env > [server] config > default.
  • Basic-auth credentials--admin-user / --admin-password (default admin / flusso). Flag/env only, never the config file — they’re secrets. The indexes / reindex client subcommands reuse them and take --server / FLUSSO_SERVER to address a running server’s private surface.

⚠️ Warning — The default control-surface credentials are admin / flusso. Change them before binding the private surface anywhere but localhost.


Index prefix

--index-prefix / FLUSSO_INDEX_PREFIX (also the prefix key in flusso.toml) prepends a literal string to every index name flusso owns — the indexes, their aliases, and the flusso_meta index. Precedence is flag > env > config > none. Use it to run several deployments against one OpenSearch cluster without collision: set dev_, staging_, nightly_, and each gets its own dev_users / staging_users / … with independent seed state.

  • You include the separator. dev_dev_users; devdevusers.
  • Validated at startup. Lowercase, no characters OpenSearch forbids, and a leading letter or digit (an index name can’t start with _/-/+). A bad prefix fails the run fast.
  • Read side must match. The flusso-query consumer applies the prefix at runtime (Client::index_prefix, typically wired from the same FLUSSO_INDEX_PREFIX) — see querying. The compile-time derive is unaffected, so one consumer binary serves every environment.
  • Changing it re-roots everything. Turning a prefix on (or changing it) points flusso at brand-new names and triggers a full reseed; the old indexes/aliases are left orphaned.

Logging & telemetry

VariableDefaultEffect
RUST_LOGinfoLog verbosity / filtering (tracing env filter syntax, e.g. flusso=debug,info).
FLUSSO_LOG_FORMATtextSet to json for structured JSON logs (one object per line).
NO_COLORunsetSet to anything to disable colored CLI output (also auto-off when not a TTY).
OTEL_EXPORTER_OTLP_ENDPOINTunsetBase OTLP endpoint. Its presence turns on trace + metric export.
OTEL_EXPORTER_OTLP_TRACES_ENDPOINTunsetTraces-only endpoint (enables trace export on its own).
OTEL_EXPORTER_OTLP_METRICS_ENDPOINTunsetMetrics-only endpoint (enables metric export on its own).
OTEL_EXPORTER_OTLP_PROTOCOLhttp/protobufOTLP transport for both signals: http/protobuf or grpc. Unrecognized values warn and fall back to http/protobuf.
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL(general)Per-signal transport override for traces.
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL(general)Per-signal transport override for metrics.

With no OTLP endpoint set, the exporters aren’t installed and cost nothing — telemetry is opt-in. When an endpoint is configured, the rest of the standard OTEL_* variables (OTEL_EXPORTER_OTLP_HEADERS, OTEL_SERVICE_NAME, …) are honored by the OpenTelemetry SDK.

Transport and port go together. flusso defaults to OTLP over HTTP/protobuf (conventionally port :4318). Set OTEL_EXPORTER_OTLP_PROTOCOL=grpc to switch to OTLP/gRPC (conventionally :4317 — the OpenTelemetry Collector’s and Jaeger’s default receiver). flusso does not infer the protocol from the port, so the endpoint must match the protocol you choose: pointing the default HTTP exporter at a gRPC :4317 port just loops HTTP export failed: network error. The per-signal *_TRACES_PROTOCOL / *_METRICS_PROTOCOL override the general one.

Prometheus metrics are a separate, pull-based path: served at /metrics on the public surface (default 127.0.0.1:9464), no env var required.

The derive (compile-time)

#[derive(FlussoDocument)] (the flusso-query client) reads FLUSSO_CONFIG at compile time to locate flusso.toml when it can’t be found by walking up from the crate’s CARGO_MANIFEST_DIR. Same name as the CLI flag, consumed by the proc-macro instead of the binary. (You can also point a single struct at a config with #[flusso(config = "…")].)


Compiling

flusso build --config config.toml -o flusso.lock runs all validation and writes the whole validated configuration — every schema inlined — to a single binary artifact (MessagePack). Because schemas are self-describing and secrets are deferred, compiling needs no database and bakes in no secret: { env = … } references travel as references, not values.

flusso run with no --config loads that artifact and resolves the connection and credentials in its own environment; flusso run --config flusso.toml compiles from source and runs that. So a deployment ships one file — no YAML tree, no source checkout — and the same artifact runs anywhere its environment provides the secrets. The Docker shipping recipes are in deploying.


Cheat sheet

# secrets & connections (resolved at run time)
DATABASE_URL=postgres://user:pass@host:5432/db
PRIMARY_OPENSEARCH_URL=https://opensearch:9200
PRIMARY_OPENSEARCH_USERNAME=flusso
PRIMARY_OPENSEARCH_PASSWORD=…           # plus any names you used in { env = "…" }

# CLI flags (flag wins if both set) — see the table above for the full list
FLUSSO_CONFIG=flusso.toml
FLUSSO_SLOT=flusso
FLUSSO_PUBLICATION=flusso
FLUSSO_MANAGE_PUBLICATION=false         # off = never issue publication DDL, only warn
FLUSSO_PUBLIC_ADDRESS=0.0.0.0:9464      # read-only surface (health/status/metrics)
FLUSSO_PRIVATE_ADDRESS=0.0.0.0:9465     # control surface (indexes/reindex), Basic auth
FLUSSO_ADMIN_USER=admin                 # change these before exposing the private port!
FLUSSO_ADMIN_PASSWORD=change-me
FLUSSO_SKIP_BACKFILL=true

# logging & telemetry
RUST_LOG=flusso=debug,info
FLUSSO_LOG_FORMAT=json
OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4318

A complete example

flusso.toml:

[source]
type = "postgres"
connection_url = { env = "DATABASE_URL" }

[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }

[sinks.audit]
type = "stdout"
pretty = true

[[index]]
name = "users"
schema = "users.schema.yml"
enabled = true

The accompanying users.schema.yml (and the full schema-authoring reference) lives in schema authoring. A change to a users row — or to any related row the schema folds in — rebuilds the whole users document and re-emits it to every sink.

flusso-query — the typed query client

flusso keeps an OpenSearch index in sync with Postgres from a declarative schema (the write side; see README.md). That schema is a contract: it fixes the shape of every document — which fields exist, their types, which are nested arrays, which are scalars. flusso enforces that contract on the write side; flusso-query enforces it on the read side.

flusso-query does not generate the document types for you. You write the document struct by hand and keep full control — its derives, field types, which fields it projects, how doc keys map to Rust names. #[derive(FlussoDocument)] then, at compile time and with no database, does two things against the resolved schema:

  1. Validates that every struct field lines up with the schema — the field exists, its Rust type matches, its nullability matches. A struct that has drifted stops compiling, pointing at the offending field.
  2. Generates the typed query surface from the schema — field handles, get/query, the schema hash — so a service queries the index like a typed function: field names checked at compile time, operators that only exist for the field types that support them, results that deserialize into your struct.

The query surface comes from the full schema, not from the struct — so you can filter or sort on a field even if your struct doesn’t deserialize it. When the schema changes and the index is rebuilt, anything that no longer fits stops compiling at cargo build.

Quick reference

Looking for…Jump to
A full worked example (structs + query)A query, from the caller’s seat
Which operators each field type exposesWhat the field type buys you
and/or/not, clause vs combinator style, count/idsComposing queries
Per-query options, compound/scoring, standalone, sort, search-levelQuery options and extra query types
Nested arrays — filter by vs filter of (any/all, filter_nested)Filtering nested collections
Several searches, one round-trip_msearch
One blended result list across indexesCombined search
How the macro finds the schema; path for nested structsBinding to the schema
flusso type → Rust type → handleflusso types → Rust types
Anything the typed builder can’t expressThe escape hatch
Reading a prefixed deploymentResolving the index name

A query, from the caller’s seat

A service that searches the users index from the schema guide. You write the structs; #[derive(FlussoDocument)] validates them and generates the query surface. The only input is the index name — it finds flusso.toml itself (see Binding to the schema).

The document structs

#![allow(unused)]
fn main() {
use flusso_query::{Client, FlussoDocument};

/// A `users` document — *you* write this. It's a **projection**: it deserializes
/// the fields below and omits the rest of the index (addresses, profile,
/// avgOrderValue, …), which the derive allows. The derive checks every field
/// against the `users` schema and hangs the typed query surface off `User`.
#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users")]                     // ← the only input: which index
pub struct User {
    pub id: i32,                                // primary key (integer) — never null
    pub email: String,                          // keyword, required → never null
    #[serde(rename = "fullName")]
    pub full_name: Option<String>,              // text, not required → nullable
    pub account: Account,                       // an object — always assembled
    pub orders: Vec<Order>,                     // has_many join → nested, never null
    #[serde(rename = "orderCount")]
    pub order_count: i64,                        // count aggregate → long, never null
    #[serde(rename = "lifetimeValue")]
    pub lifetime_value: Option<f64>,            // sum aggregate → nullable
}

/// The `account` object — a same-row sub-object. A nested/object struct validates
/// against its `path` in the same index; it has no entry points of its own.
#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users", path = "account")]
pub struct Account {
    pub tier: String,                           // enum → keyword, required
    pub country: Option<String>,                // keyword, not required
    #[serde(rename = "createdAt")]
    pub created_at: time::OffsetDateTime,       // timestamp, required
}

#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users", path = "orders")]
pub struct Order {
    pub status: String,                         // enum, required
    pub total: Decimal,                         // decimal column (`decimal` feature); or `f64`
    #[serde(rename = "placedAt")]
    pub placed_at: time::OffsetDateTime,        // timestamp, required
    pub items: Vec<Item>,                       // a deeper has_many → nested
}

#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users", path = "orders.items")]
pub struct Item {
    #[serde(rename = "productId")]
    pub product_id: i32,
    pub quantity: i32,
    #[serde(rename = "unitPrice")]
    pub unit_price: Decimal,
}
}

The query

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Transport. Points at OpenSearch, not at flusso — the engine is write-only;
    // reads go straight to the index it maintains.
    let client = Client::connect("https://localhost:9200")?
        .basic_auth("admin", std::env::var("OS_PASSWORD")?);

    // Fetch one document by its id. `id` is typed as the root table's primary
    // key (i32 here), and the result is Option<User> — None when absent.
    let user: Option<User> = User::get(&client, 42).await?;

    // A typed search. Every `User::field()` is a handle that only exposes the
    // operators its mapping supports (see "What the field type buys you"). The
    // handles cover the *whole schema* — so `User::addresses()` filters fine even
    // though this projection never deserializes addresses.
    let page = User::query()
        .filter(User::email().eq("ada@example.com"))      // keyword → exact
        .filter(User::order_count().gte(5))               // long → range
        .filter(User::account().tier().eq("gold"))        // into the object's handles
        .query(User::full_name().matches("ada lovelace")) // text → analyzed match
        .filter(User::orders().any(Order::status().eq("delivered")))   // nested, via your Order struct
        .filter(User::addresses().any(AddressFields::city().eq("Boston"))) // not projected — generated namespace
        .sort(User::order_count().desc())
        .from(0)
        .size(20)
        .send(&client)
        .await?;

    println!("{} total matches", page.total);
    for hit in page.hits {
        // hit.source is a fully-typed User. hit.id and hit.score come from the
        // search envelope, not the document body.
        let u: &User = &hit.source;
        println!("{:.3}  {}  ({} orders)", hit.score, u.email, u.order_count);
        for order in &u.orders {                          // Vec<Order>
            println!("    order — {} — {}", order.total, order.status);
        }
    }

    Ok(())
}

What you can’t write

The compiler refuses the mistakes:

  • User::email().matches(..)matches is a text operator and email is a keyword.
  • User::full_name().eq(..)full_name is analyzed text, so it has no exact eq.
  • User::nmae() — there is no such handle.
  • hit.source.totl — no such field.

And the struct itself can’t drift. Declaring email: i32, or email: Option<String> when the field is required, or a field totl the schema doesn’t have, is a compile error from the derive. The schema has been lifted into the type system, so the compiler checks both your queries and your struct against it.

OS_PASSWORD is just a variable name picked for the example. The full env-var story (secrets, the reserved deployment overrides, FLUSSO_CONFIG) lives in the configuration guide.


What the field type buys you

The handle a schema field generates determines the operators in scope: an operator that doesn’t make sense for a field’s type doesn’t exist on its handle, so the mistake is a compile error rather than a 400 from OpenSearch.

HandleOperators
Keywordeq, any_of, prefix, wildcard, regexp, fuzzy, exists
Textmatches, match_phrase, match_phrase_prefix, matches_fuzzy, any_of (exact, via .keyword), existsno exact eq (it’s analyzed)
Booleq, exists, asc/desc
Numbereq, any_of, lt, lte, gt, gte, between, exists
Dateeq, any_of, lt, lte, gt, gte, between, exists
Object<S>exists (a same-document sub-object — an object field or a to-one join (belongs_to/has_one); S is the enclosing scope). Its sub-fields are flattened, so query them via the child struct’s dotted-path handles (Account::tier()), not through this handle
Nested<S, T>any(q) / all(q) to match parents and lift the child query into scope Sq is a child query built from T’s handles (merging); matching(q) (with .sort/.size) to shape what’s returned — see Filtering nested collections; plus exists
Geowithin(Distance::km(12.0), center), within_box, within_polygon, exists; sort with distance_from(center) (nearest-first sugar) or distance_sort(center, order, DistanceUnit)
TextMapkey(k) → a Text leaf for one key; search(q) (cross-key, .prefer(key, weight) / .only_preferred()); has_key(k), exists
KeywordMapkey(k) → a Keyword leaf for one key; has_key(k), existsno search (exact-match, use key(..).eq(..))
NumberMapkey(k) → a Number leaf for one key (eq/lt/gte/between/…); has_key(k), exists
DateMapkey(k) → a Date leaf for one key (eq/gte/between/…); has_key(k), exists
Binaryexists — base64-encoded, not searchable
Jsonexists, raw(serde_json::Value) — the untyped fallback

Each operator’s argument is typed by kind, not by one fixed Rust type — a handle accepts any value implementing FlussoValue<kind::…> (which requires serde::Serialize):

  • Numerics are split per type (kind::Byte/Short/Integer/Long/Float/ Double/Decimal), and a value is accepted only if it widens into that kind without loss. So a Long field takes any integer, a Double takes f32/ f64 or a small int, a Decimal takes Decimal (decimal feature) or an integer — but a float on an integer field, or an i64 on a Short, is a compile error. Bare literals work where lossless: eq(5) on long/integer/ double/decimal, eq(5.0) on float/double. A byte/short field needs a typed literal (eq(5i16)), since 5 defaults to i32 (which would narrow).
  • Keyword takes a String/&str or a #[derive(FlussoValue)] keyword enum/newtype (matched against its serde string form).
  • Bool takes a bool or a bool #[derive(FlussoValue)] newtype.
  • Date takes a String/&str ISO-8601 literal or — with the chrono feature — a NaiveDate / NaiveDateTime / DateTime<Utc>.

A custom money/quantity type queries with no cast — Order::total().eq(Money(d)) — as long as it’s a FlussoValue of the field’s kind (see below). A value of the wrong kind is a compile error (Order::created_at().gte(5), Order::age().eq(1.5)).

Subfield accessors. flusso’s sink auto-enriches text/keyword fields (auto_subfields, on by default) with exact / sortable / searchable subfields, and the handles expose them — no Keyword::at("code.keyword") string path:

#![allow(unused)]
fn main() {
User::full_name()                      // Text   → analyzed full-text match
User::full_name().keyword()            // Keyword → exact / wildcard / prefix
User::full_name().keyword_lowercase()  // Keyword → case-insensitive match / sort
User::email().text()                   // Text    → full-text over a keyword field
}

(A wildcard belongs on .keyword(), not the analyzed handle, which matches tokens not the whole value.) These accessors exist only when the subfield is actually provisioned — and the derive enforces it at compile time: a text/keyword handle is stamped with subfields only when every OpenSearch sink has auto_subfields on and the field declares no custom fields. Otherwise the handle is …<NoSubfields> and .keyword() / .text() / .keyword_lowercase() (and the Text::any_of / Text::asc sugar built on them) simply don’t exist — calling one is a compile error, not a runtime 400.

Sorting is similar — sort(…) accepts handles whose type is sortable (numbers, dates, keywords, booleans, and text). Text::asc/desc is sugar that sorts via the case-insensitive .keyword_lowercase subfield automatically; reach for User::full_name().keyword().desc() when you want an exact-case sort instead. A geo_point sorts by proximity with User::location().distance_from(center).

A few clauses span more than one field, so they’re free functions: multi_match("ada", [User::full_name(), User::bio()]) runs one analyzed query across several Text fields (weight one with User::full_name().boosted(3.0)).

The typed surface is broad — see Query options and extra query types for the per-query options (boost/case_insensitive/fuzziness/…), the compound/scoring queries (constant_score/dis_max/function_score/boosting), the standalone ones (ids/query_string/simple_query_string/script_score/…), and the search-level controls (min_score/collapse/search_after/highlight/…). What’s left for the raw hatch is knn, geo_shape, span and parent/child queries — types with no corresponding flusso field.

Composing queries

A handle’s operator produces a Query<S> — carrying the scope it was built in. The root document and any flattened object/to-one-join sub-field share the scope Root; only a nested array introduces a fresh scope, tagged with its element type (Query<Order>).

Queries compose with and / or / not within a scope, and the Search builder exposes the bool-query clauses directly:

#![allow(unused)]
fn main() {
// Combinator style.
let q = User::email().eq("ada@example.com")
    .and(User::order_count().gte(5))
    .and(User::orders().any(Order::status().eq("delivered").and(Order::total().gt(0.0))));

User::query().query(q).send(&client).await?;

// Clause style — `filter`/`must_not` don't score, `query`(=must)/`should` do.
User::query()
    .query(User::full_name().matches("ada"))       // scored
    .filter(User::order_count().gte(5))            // filtered, cached, no score
    .must_not(User::email().prefix("test-"))
    .should(User::orders().any(Order::status().eq("delivered")))
    .send(&client)
    .await?;
}

query/filter/must_not/should accept anything that is a Query<Root>, so the two styles mix freely. Which clause to reach for:

ClauseScores?Bool slotUse for
queryyesmustfull-text relevance you want ranked
filterno (cached)filterexact constraints — terms, ranges, nested any
must_notnomust_notexclusions
shouldyesshouldoptional boosts; a real constraint with min_should_match

A built search can finish as a count instead of a page: .count() sends the same query to _count and returns the number of matching documents (u64) — cheaper than send() when the hits aren’t needed. Sort, from/size, and filter_nested projections are ignored; they never change which documents match.

#![allow(unused)]
fn main() {
let open_orders: u64 = User::query()
    .filter(User::orders().any(Order::status().eq("open")))
    .count(&client)
    .await?;
}

Or as an id page: .ids() runs the same search with _source: false and returns Vec<String> — the matching document ids (root primary keys, stringified), in order, no sources fetched. Sort and from/size apply as in send(); filter_nested projections are dropped. The cheap way to feed another lookup — e.g. search in OpenSearch, then load the full rows from Postgres:

#![allow(unused)]
fn main() {
let user_ids: Vec<String> = User::query()
    .filter(User::orders().any(Order::status().eq("open")))
    .sort(User::order_count().desc())
    .size(100)
    .ids(&client)
    .await?;
}

Query options and extra query types

Each leaf operator returns a small builder that carries that query’s options plus the universal boost(f32) and name(&str) (_name, surfaced in a hit’s matched_queries). With no option set it renders the DSL shorthand; set one and it expands. A builder drops straight into a clause (it’s an AsQuery), so no .build() is needed:

#![allow(unused)]
fn main() {
User::query()
    .should(User::full_name().matches("acme").boost(2.0))         // weighted text
    .should(User::full_name().keyword().wildcard("*acme*").case_insensitive())
    .should(User::full_name().matches("acme").fuzziness(Fuzziness::Auto))  // typo-tolerant
    .min_should_match(1)                                          // make should a real filter
    .filter(User::owner_id().eq(owner_uuid))                      // uuid keyword (feature)
    .filter(User::tier().eq(Tier::Pro))                           // enum keyword
    .sort(User::created_at().desc().missing_first())              // null-aware sort
    .send(&client).await?;
}

The options per query type (all optional; plus the universal boost/name on every builder):

QueryOptions
term / prefix / wildcard / regexpcase_insensitive
prefix / wildcardrewrite
regexpflags, max_determinized_states
fuzzyfuzziness, prefix_length, max_expansions, transpositions
matchesfuzziness, operator, minimum_should_match, prefix_length, analyzer, zero_terms_query, lenient
phrase matchesslop, analyzer
multi_matchtype, operator, fuzziness, tie_breaker, minimum_should_match
rangeformat, time_zone, relation
within (geo)distance_type, validation_method
nested anyscore_mode, ignore_unmapped

The enumerable options are closed enums, not strings — a typo is a compile error, not a 400. operator/default_operator take Operator { And, Or }; fuzziness takes Fuzziness { Auto, AutoBounds(u32, u32), Edits(u32) }; multi_match’s type takes MultiMatchType; zero_terms_query takes ZeroTermsQuery { None, All }; a range relation takes RangeRelation { Intersects, Contains, Within }; function_score’s score_mode/boost_mode take ScoreMode/BoostMode; a nested score_mode takes NestedScoreMode (which, unlike ScoreMode, has None for a filter-only nested clause). Geo’s distance_type/validation_method take DistanceType/ValidationMethod, and a sort’s numeric_type / Sort::script type take NumericType / ScriptSortType. minimum_should_match takes a MinimumShouldMatch (2/.into() for a count, MinimumShouldMatch::percent(75), or ::raw("3<90%") for the combining mini-language). Genuinely open-ended params (analyzer, format, time_zone, unmapped_type, flags) stay String.

.or() / .and() on a builder need use flusso_query::AsQuery; in scope (the combinators are provided methods on that trait). Composing via the Search clauses (.should()/.filter()/…) needs no import.

Bool / compound & scoring. Search::min_should_match(n) (or Query::min_should_match on an or-group) turns a top-level free-text should group into a real constraint instead of scoring-only. The scoring wrappers are free functions: constant_score(filter), dis_max([..]).tie_breaker(..), boosting(positive, negative, negative_boost), and function_score(query).weight(..)/.weight_when(.., filter)/.boost_mode(..).

Standalone query types (free functions, each an AsQuery): ids([..]) (get-many-by-_id), query_string(..) / simple_query_string(..) / combined_fields(.., [fields]) (user-facing full-text), and the relevance ones script(..), script_score(query, source), distance_feature(..), rank_feature(..), more_like_this([fields], [like]). match_bool_prefix is a Text operator (search-as-you-type).

Sort. .asc()/.desc() on a sortable handle (number, date, keyword, bool, or text — the last routing through .keyword_lowercase) return a Sort builder: chain .missing_first()/.missing_last()/.missing(v), .mode(SortMode::..), .unmapped_type(..)/.numeric_type(..)/.format(..), or .nested(path)/.nested_filtered(path, q). Also Sort::score() (by _score), Sort::script(ScriptSortType, source, order), and Geo::distance_from(center) / Geo::distance_sort(center, order, DistanceUnit) for proximity sorting. A geo radius is a typed Distance (Distance::km(12.0) / ::miles(5.0) / ::meters(800.0) / …), so a malformed "12 km" can’t reach the query.

Search-level controls on the Search builder: min_score, track_total_hits, track_scores, search_after([..]) (deep pagination), collapse(field), post_filter(q), and highlight(Highlight::new().field(..)).

Queries are values; the client appears once

Type::query() takes no client: a Search<T> is a plain, Cloneable value with no lifetime. Build it anywhere — a helper, a struct field, a cached “prepared search” — and hand a &Client to the terminal (send / ids / count, all &self) when it’s time to run. One built query can run many times, against different clients, or with per-call tweaks via clone():

#![allow(unused)]
fn main() {
fn busy_users() -> Search<User> {                      // no client in sight
    User::query().filter(User::order_count().gte(5))
}

let page = busy_users().send(&client).await?;          // run it
let next = busy_users().from(20).send(&client).await?; // tweak a copy
}

Several searches, one round-trip (_msearch)

Independent typed searches — different indexes, different document types — can share one HTTP round-trip. client.msearch(…) takes a tuple of &Search<T> (arity 1–8) and returns one typed SearchResponse per slot, in order:

#![allow(unused)]
fn main() {
let users_q  = User::query().query(User::full_name().matches(&q)).size(10);
let orders_q = Order::query().filter(Order::status().eq("open")).size(5);

let (users, orders) = client.msearch((&users_q, &orders_q)).await?;
}

The “search page with separate sections” primitive: each slot keeps its own query, sort, pagination, and filter_nested projections, and decodes into its own type. A slot-level failure fails the whole call with an error naming the slot (no partial results). For many searches of one type there’s client.msearch_all(&searches)Vec<SearchResponse<T>>.

The other multi-index shape: one query over several indexes, hits ranked together in a single list — the “one search box over everything” primitive. You declare which document types blend by writing an enum with one variant per type:

#![allow(unused)]
fn main() {
/// One item in the storefront's global search.
#[derive(Debug, FlussoMultiDocument)]          // the `derive` feature, like FlussoDocument
enum StoreItem {
    User(User),
    Order(Order),
}

let page = StoreItem::query()                  // MultiSearch<StoreItem> — client-free too
    .query(multi_match("ada", [User::full_name(), Order::customer_name()]))
    .size(20)
    .send(&client)
    .await?;

for hit in page.hits {
    match hit.source {                         // dispatched by the hit's `_index`
        StoreItem::User(u) => render_user(u, hit.score),
        StoreItem::Order(o) => render_order(o, hit.score),
    }
}
}

Root-scope queries compose across document types (Query<Root> carries no document type), and each hit is decoded into the variant that owns its index. The query goes through the stable {INDEX}_{HASH} alias, but OpenSearch reports the concrete index behind it ({INDEX}_{HASH}_{generation}) in each hit — so the crate normalizes that generation suffix before dispatch; you just match on hit.source. A hit from an index no variant claims is an error, not a skip. count(&client) works on the union too.

Two semantics to know:

  • A query on a field that exists in only one of the indexes is fine — it just doesn’t match in the others.
  • A sort on such a field is rejected by OpenSearch unless it carries an unmapped_type. Sort the blended list by relevance, or on fields all the union’s indexes share.

The derive validates the enum’s shape (single-field tuple variants, no duplicate payload types) and generates the trait’s two members. Without the derive feature, the impl is two short members written by hand — a TARGETS const listing each variant’s (INDEX, SCHEMA_HASH) and a decode that matches on Type::physical_index().

Building a child filter and merging it into the parent

Because the scope is part of the type, a query is a value you can build, name, store, and reuse. A nested child struct (one whose path ends in a nested array) carries its own field handles tagged with the child scope — so they produce Query<Order>, not Query<Root>:

#![allow(unused)]
fn main() {
// Built from Order's own handles. Reusable — a plain function returning a query:
fn big_delivered() -> Query<Order> {
    Order::status().eq("delivered")
        .and(Order::total().gt(100.0))
}
}

To merge a child filter into a parent, lift it through the nesting that holds it: User::orders().any(child) (or .all(child)) takes a Query<Order> and returns a Query<Root> — a nested clause at the orders path — which composes with parent-scope queries like any other:

#![allow(unused)]
fn main() {
let q = User::email().eq("ada@example.com")
    .and(User::orders().any(big_delivered()));   // Query<Order> → lifted → Query<Root>

User::query().filter(q).send(&client).await?;
}

The scope tag keeps this honest: User::email().and(Order::status().eq(…)) does not compile — you can’t and a Query<Root> with a Query<Order>; the child query has to be lifted through User::orders() first. A child constraint can never be silently applied at the wrong level.

Lifting composes through depth: Order::items().any(Item::quantity().gt(1)) is a Query<Order>, which User::orders().any(…) then lifts the rest of the way to Query<Root>.

Optional filters

Callers build queries from optional inputs — request params, form fields — and the if let Some(x) = … { q = q.filter(…) } dance breaks the fluent chain. The primitive that fixes it: Option<Q> is itself a Query, where None contributes nothing in any clause — must_not(None) excludes nothing, and(None) is the identity. So every clause and combinator accepts an optional; you .map the value into the handle:

#![allow(unused)]
fn main() {
User::query()
    .filter(params.email.map(|e| User::email().eq(e)))          // skipped when None
    .filter(params.min_orders.map(|n| User::order_count().gte(n)))
    .send(&client).await?;

// Composes inside and/or too — a None branch just drops out:
let q = User::email().eq("ada@example.com")
    .and(params.tier.map(|t| User::account().tier().eq(t)));    // None → just the email clause
}

A named filter_some(value, |v| …) sugar that drops the .map is an obvious follow-on, left out of the first cut.


Filtering nested collections

orders is a nested array, and there are two independent things you might filter — flusso keeps them separate:

  • Filter by nested — choose which users come back, based on their orders. The any/all you’ve already seen: it’s a Query, so it goes in filter/query/etc. A matching user still carries its whole orders array.
  • Filter of nested — shape the orders array each user comes back with, without changing which users return. A separate clause, filter_nested.

They compose: use either alone, or both together (often with the same predicate).

filter_nested — shaping the returned array

#![allow(unused)]
fn main() {
let page = User::query()
    // filter BY: only users with a delivered order
    .filter(User::orders().any(Order::status().eq("delivered")))
    // filter OF: and within each, keep only the delivered orders, newest first, ≤5
    .filter_nested(
        User::orders()
            .matching(Order::status().eq("delivered"))
            .sort(Order::placed_at().desc())
            .size(5),
    )
    .send(&client).await?;

for hit in &page.hits {
    // `source.orders` IS the filtered subset — no extra accessor:
    for order in &hit.source.orders {       // delivered, newest first, ≤ 5
        println!("{} — {}", order.total, order.status);
    }
}
}

User::orders().matching(q) is a nested projection: q is a Query<Order> built from Order’s handles, plus optional .sort(Order::field().desc()), .size(n), .from(n). matching itself is optional — drop it to keep every child but still sort or cap the array.

Because filter_nested does not touch which parents match, a user with no delivered orders still comes back — with orders: []. Pair it with a filter(User::orders().any(…)) when you also want to drop those users.

filter_nested always replaces hit.source.<path> with the matched subset: the client fetches the nested matches and substitutes them for that field before deserializing.

Not yet built: a .keep_source() opt-out that leaves the stored array intact and a typed hit.nested(handle) side accessor. Today filter_nested always replaces the array in source.

Depth

filter_nested shapes one nested level — orders. You can still match on deeper nesting from inside the predicate (Order::items().any(Item::quantity().gt(1))), and the returned orders honor it; returning a filtered items array inside each returned order is a deeper inner-hits case left to the raw hatch for now.


Results

#![allow(unused)]
fn main() {
pub struct SearchResponse<T> {
    pub total: u64,            // total matches (not the page size)
    pub max_score: Option<f32>,
    pub hits: Vec<Hit<T>>,
    pub took: std::time::Duration,
}

pub struct Hit<T> {
    pub id: String,            // the document id (root primary key, stringified)
    pub score: f32,
    pub source: T,             // the fully-typed document
}
}

get returns Option<T>; search returns SearchResponse<T>, where T is the struct you wrote. There is no serde_json::Value in the common path.


Binding to the schema

The macro validates against the resolved mapping — flusso’s IndexMapping: every field with a concrete type, whether it is nullable, its nested children, and the schema hash.

flusso’s schemas are self-describing: every leaf declares its type and whether it’s required, and joins/objects/aggregates have structural types — so the mapping resolves with no database, exactly as flusso build does when it writes flusso.lock. The client reuses that resolution. (See the schema guide for the schema format and the configuration guide for how the index is written.)

The one input: the index name

You never point the macro at a file. You name the index, and the macro finds the schema:

#![allow(unused)]
fn main() {
#[derive(serde::Deserialize, FlussoDocument)]
#[flusso(index = "users")]
pub struct User { /* … */ }
}

At compile time the macro:

  1. Locates flusso.toml by walking up from the consuming crate’s CARGO_MANIFEST_DIR. (Override with #[flusso(index = "users", config = "…")] or the FLUSSO_CONFIG env var — see the configuration guide.)
  2. Selects the [[index]] whose name matches "users" — the reason an index name is required, since one flusso.toml defines several.
  3. Loads that index’s schema: file (resolved relative to flusso.toml) and resolves the IndexMapping in-process — the same resolution flusso build performs. Hermetic: no database, no network.
  4. Tracks flusso.toml and every schema file it read as build inputs (via include_bytes!), so editing the config or a schema retriggers compilation and a drifted struct fails the next build.

The resolved mapping’s content hash is the binding’s SCHEMA_HASH — the same hash flusso build writes into flusso.lock and the engine folds into the physical index name, so binding and index are provably the same schema version.

There is no build.rs, no generated .rs file to include!, and no committed mapping artifact to keep in sync — the struct is the only file you maintain.

A nested or object struct names its path

A same-row object, a to-one join, or a nested join is its own struct, validated against a dotted path into the same index — account, orders, orders.items. It declares the same index so the macro resolves the same config, then walks to that path’s children:

#![allow(unused)]
fn main() {
#[flusso(index = "users", path = "orders.items")]
pub struct Item { /* … */ }
}

These contribute field validation and their own field handles — Order::status(), Order::total(), … — producing Query<Order> values you can compose, store, and lift into a parent query (see Building a child filter). Only the root struct gets entry points (get/query) and SCHEMA_HASH.

What the derive expands to

#[derive(FlussoDocument)] on the root User emits (roughly):

#![allow(unused)]
fn main() {
impl User {
    // Entry points.
    pub fn get(client: &Client, id: i32) -> impl Future<Output = Result<Option<User>>>;
    pub fn query() -> Search<User>;   // client-free: a plain, reusable value

    // Field handles — one per *schema* field, carrying its type. These are what
    // the query builder consumes. They exist for every field in the mapping,
    // whether or not `User` projects it.
    pub fn id() -> Number<kind::Integer> { /* … */ }
    pub fn email() -> Keyword { /* … */ }
    pub fn full_name() -> Text { /* … */ }
    pub fn account() -> Object { /* … */ }                  // object/to-one join → `Object<Root>` (scope-only; `.exists()`)
    pub fn addresses() -> Nested<Root, AddressFields> { /* … */ } // not projected — generated namespace
    pub fn orders() -> Nested<Root, Order> { /* … */ }      // projected — `Nested<enclosing scope, your struct>`
    pub fn order_count() -> Number<kind::Long> { /* … */ }
    pub fn lifetime_value() -> Number<kind::Double> { /* … */ }
    pub fn avg_order_value() -> Number<kind::Double> { /* … */ } // not projected by `User`
    pub fn last_order_at() -> Date { /* … */ }                // not projected by `User`
    // …one per schema field.

    /// The physical index this binds to — `get`/`query` use it.
    pub const INDEX: &str = "users_3f2a1b9c…";
    /// The schema hash this binding was generated from (the `INDEX` suffix).
    pub const SCHEMA_HASH: &str = "3f2a1b9c…";
}

// Each nested path has ONE handle namespace whose functions build a `Query` in
// that path's scope. When you wrote a struct for the path, that struct IS the
// namespace — its derive adds the handles, covering the full sub-schema (not just
// the fields it deserializes), producing `Query<Order>`. A `nested` array
// introduces its own scope: `Order`'s handles are tagged `<Order>` (the root and
// flattened objects stay `<Root>`); they must be lifted before joining a root query.
impl Order {
    pub fn status() -> Keyword<Order> { /* … */ }
    pub fn total() -> Number<kind::Decimal, Order> { /* … */ }
    pub fn placed_at() -> Date<Order> { /* … */ }
    pub fn items() -> Nested<Order, Item> { /* … */ }   // deeper nested: enclosing scope `Order`, child `Item`
    // …one per field at the `orders` path.
}

// For a nested path you DIDN'T give a struct, the root derive generates a
// handles-only namespace named `<Path>Fields`, so it's still queryable:
pub struct AddressFields;
impl AddressFields {
    pub fn city() -> Keyword<AddressFields> { /* … */ }       // nested scope, like `Order`
    pub fn postal_code() -> Keyword<AddressFields> { /* … */ }
    // …one per field at the `addresses` path.
}
}

What the derive checks

For each field the struct declares, the macro resolves the matching schema field by its document key — honoring #[serde(rename = "…")] and a container #[serde(rename_all = "…")], so the struct’s serde config and flusso’s validation agree — then checks three things:

CheckPassCompile error
field existsthe doc key is in the schemano field `totl` in index `users` (span on the field)
type matchesleaf Rust type matches the field’s typeemail is `keyword` → expected `String`, found `i32`
nullability matchesOption<_> iff the field is nullableemail is required → expected `String`, found `Option<String>`

The rules that make this full control rather than a straitjacket:

  • Partial projections are allowed. Leaving schema fields off your struct is fine — you only deserialize the subset you declare. Only the three checks above fail.
  • Type matching is by leaf identifier + Option shape. The macro can’t resolve arbitrary type aliases, so it compares the final path segment (String, i32, f64, OffsetDateTime, …) and the Option<_> wrapper against the type table. For an object field it expects a struct, for a nested field a Vec<_>, and defers the inner field checks to that struct’s own FlussoDocument derive.
  • Escape hatches. A field typed serde_json::Value opts out of type checking. #[flusso(skip)] drops a field from validation entirely — for a computed or app-only field not backed by the index (pair it with #[serde(skip)] or #[serde(default)]).

flusso types → Rust types

The type the derive expects for each schema type (the same bridge the schema guide defines, with the Rust side added). Declare something else and it won’t compile (modulo the leaf-identifier rule above).

flusso typeOpenSearchRust typeField handle
texttextStringText
identifiertextStringText
keywordkeywordString (or a FlussoValue newtype)Keyword
enumkeywordString or a #[derive(FlussoValue)] enumKeyword
uuidkeywordString, or uuid::Uuid (uuid feature)Keyword
booleanbooleanboolBool
shortshorti16Number<kind::Short>
integerintegeri32Number<kind::Integer>
longlongi64Number<kind::Long>
floatfloatf32Number<kind::Float>
doubledoublef64Number<kind::Double>
decimaldoubleDecimal (decimal feature) or f64Number<kind::Decimal>
datedatetime::Date (feature)Date
timestampdatetime::OffsetDateTime (feature)Date
binarybinaryString (base64)Binary
jsonobjectserde_json::ValueJson
geo_pointgeo_pointGeoPoint ({ lat, lon })Geo
custom { opensearch }(given)matching scalar, else serde_json::Valueby OS type
objectobjecta structObject
join belongs_to / has_oneobjectOption< a struct >Object
join has_many / many_to_manynestedVec< a struct >Nested<S, T>

Decimals query without a cast. type: decimal maps to OpenSearch double (lossy in storage) but gets a Number<kind::Decimal> handle, distinct from a true double’s Number<kind::Double> — so it accepts a Decimal or a losslessly-widening integer (eq(5)), but not an f64 (that’s lossy, a compile error). To pass a rust_decimal::Decimal, enable the decimal feature on flusso-query (it re-exports Decimal and implements FlussoValue<kind::Decimal> for it); the document field can be Decimal or f64 (both deserialize from the stored double). Query precision is f64-bound (serde_json has no arbitrary precision) — fine for a double-stored column; if you need exact querying, declare a custom scaled_float (also kind::Decimal) and note this limit.

Dates are behind a feature so a caller picks time or chrono (or String for raw ISO-8601); the derive accepts whichever leaf type the chosen feature settles on.

Custom value types — #[derive(FlussoValue)]. A newtype inherits its inner type’s kinds automatically, so struct Money(Decimal) is a decimal value and struct Sku(String) a keyword + text value — no kind tag needed, and each is queryable and rejected exactly where the inner type would be. FlussoValue requires serde::Serialize (a supertrait), so derive it too. An enum has no inner type, so it needs an explicit string kind — #[flusso(keyword)] (default) or #[flusso(text)]; numeric/date tags don’t exist (use a newtype, which inherits). Enum keyword fields stay typed — never #[flusso(skip)] them.

#![allow(unused)]
fn main() {
// Newtype: inherits Decimal's kinds — a `decimal`/`scaled_float` value.
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, FlussoValue)]
struct Money(Decimal);

// Enum: string-valued, so a kind tag is required (keyword default).
#[derive(serde::Serialize, serde::Deserialize, FlussoValue)]
#[serde(rename_all = "camelCase")]
#[flusso(keyword)]
enum Tier { Pro, Enterprise, Free }

#[derive(serde::Deserialize, FlussoDocument)]
#[flusso(index = "customers")]
struct Customer { tier: Tier, balance: Money /* … */ }

Customer::tier().eq(Tier::Pro);              // term against "pro"
Customer::balance().gte(Money(Decimal::ONE)); // a decimal value, no cast
}

uuid::Uuid is a keyword value behind the uuid feature — id / foreign-key fields stay in the struct as Uuid (no #[flusso(skip)], no Keyword::at("…")), and Customer::owner_id().eq(some_uuid) works without .to_string().

Nullability is declared, not guessed

A field is T or Option<T>, and the derive checks it against the resolved mapping. Nullability comes straight from the schema with no database round-trip: a leaf states it with required, and joins, objects, and aggregates carry it structurally. ResolvedField records the resulting nullable: bool; the derive requires nullable: false → T, nullable: true → Option<T>.

Field sourcenullableWhy
root primary_key columnfalseforced non-null — it backs the document id
join primary_key fieldfalseforced non-null, just like the root key
leaf column, required: truefalsedeclared non-null
leaf column, required: falsetruenullable by default
objectfalsealways assembled from the same row
join belongs_to / has_one (object)truethere may be no related row
join has_many / many_to_manyfalsea Vec, empty when there are none, never null
aggregate countfalsea non-null long — zero rows is 0, not null
aggregate avgtruea nullable double — null over zero rows
aggregate sum / min / maxtruenull over zero rows; the result mirrors the column

required is rejected by the schema on joins and aggregates precisely because their nullability is structural — so there’s nothing for the author to declare.


The escape hatch

Anything the typed builder can’t express stays reachable, and still deserializes into the typed struct:

#![allow(unused)]
fn main() {
let page: SearchResponse<User> = User::query()
    .raw(serde_json::json!({
        "knn": { "embedding": { "vector": [/* … */], "k": 10 } }
    }))
    .send(&client)
    .await?;
}

raw takes the OpenSearch query DSL verbatim — the pressure-release valve for the few types with no flusso field (knn/vector, geo_shape, span and parent/child queries) and for percolators — without dropping to an untyped client or losing typed results. Most of what used to need it (function_score, script, constant_score, query_string, search_after, …) is now in the typed surface.


Resolving the index name

The physical index carries the hash suffix (users_3f2a1b9c) — exactly what the OpenSearch sink writes — and rotates on a structural schema change.

Because the binding is generated from the schema at compile time, the derive knows that hash and emits it as a const: User::INDEX is the physical name, and get/query use it. So User::query() addresses the right index directly, with the hash hidden from the caller — no convenience alias required.

This is self-correcting: a structural schema change rotates the hash and changes the resolved mapping, so the next cargo build regenerates the binding against the new physical index. (User::INDEX and User::SCHEMA_HASH are exposed for logging, admin, or a hand-built Search.)

The convenience alias (users → current generation) is still worthwhile for clients that don’t recompile against the schema — dynamic/scripting use, dashboards. For a derived binding it’s unnecessary: the compile-time hash is the stable name.

Reading a prefixed deployment

If flusso runs with an index prefix (FLUSSO_INDEX_PREFIX, e.g. dev_ so it writes dev_users_<hash>), tell the client the same prefix:

#![allow(unused)]
fn main() {
let client = Client::connect("https://localhost:9200")?
    .index_prefix(std::env::var("FLUSSO_INDEX_PREFIX").unwrap_or_default());
}

The prefix is applied at runtime, on the transport — the derive still bakes the unprefixed User::INDEX/SCHEMA_HASH, and the client prepends the prefix to every request path. So one compiled consumer binary serves every environment: point it at dev or staging by setting FLUSSO_INDEX_PREFIX, no rebuild. It must match the writer’s prefix exactly, or queries hit an empty (or wrong) index.


Out of scope for the first cut

  • Search aggregations (facets, histograms, cardinality). The typed surface is filter/query/sort + typed hits first; aggregations need their own typed result tree, and the raw hatch covers them in the meantime.
  • Writes. flusso owns the index; the client never upserts or deletes.
  • Correlating hits across indexes. Both multi-index shapes ship — one blended result list and independent searches via _msearch — but correlating hits across indexes remains the caller’s job.
  • Scroll pagination. from/size and search_after (deep pagination) ship; a scroll cursor is a follow-on.
  • Generating the document struct. By design — the developer owns the struct.

Where this lands in the workspace

CrateRole
flusso-queryRuntime: the Client transport, the field-handle/Query/Search builder, SearchResponse. Generic over the developer’s document types. Targets OpenSearch / Elasticsearch (shared DSL). Re-exports the derive behind a derive feature, so callers use flusso_query::FlussoDocument.
flusso-query-deriveThe #[derive(FlussoDocument)] proc-macro crate. At compile time it discovers flusso.toml, resolves the named index’s IndexMapping from the self-describing schema (no database), validates the annotated struct, and emits the field handles, entry points, and schema hash. Reuses schema-config-toml, schema-index-yaml, and schema-core.

Both crates sit above schema-core and depend only downward — no dependency on the engine, the sources, or the sinks. They share only the domain model, the one thing the read and write sides must agree on.

Deploying flusso with Docker

Ship flusso as the smallest possible image — without compiling the binary yourself, and without dragging your whole repo into the build context. For the image’s internals (targets, base, non-root user) see the Dockerfile; for Kubernetes see the Helm chart.

Pick a recipe

The key idea: you never compile the binary, only a flusso.lock — see The one idea. Then pick by where the lock gets built:

SituationRecipe
Already have a flusso.lock, or a flat configA — bake your own lock (smallest, simplest)
Schemas scattered across a monorepo; want a hermetic in-Docker buildB — build the lock inside Docker
Want CI to build the lock and ship one fileC — build the lock in CI
Keep a flusso-only ignore file off everyone else’s buildsScoping the .dockerignore
Wondering why COPY *.schema.yml won’t doWhy COPY alone can’t do it

The one idea

Two different “compilations” — conflating them is what makes Docker feel heavy:

  1. The flusso binary — a full Rust build. Our job, published once per release as a registry image. You never compile it. Pull alias2k/flusso:VERSION and build from it.
  2. A flusso.lockflusso build inlines your flusso.toml + every referenced *.schema.yml into one portable, self-contained file. No DB, no toolchain, no secrets baked in.

ℹ️ Info — The image is published to two registries with identical tags. Use Docker Hub as the primary: alias2k/flusso (docker.io/alias2k/flusso). The ghcr.io/alias2k/flusso mirror is an equivalent drop-in if you prefer GitHub Container Registry.

Replace VERSION with whichever tag suits you:

  • X.Y.Z (e.g. 0.10.0) — an exact, immutable release. Most reproducible; recommended for production.
  • X.Y (e.g. 0.10) — a rolling tag that follows the latest patch on that minor (0.10.1, 0.10.2, …) but never a breaking minor bump.
  • latest — newest stable release. Convenient, not pinned.
  • sha-<short> — the immutable per-commit tag, for tracing or rollback.

So however your schemas are laid out — even scattered across a monorepo next to the services they describe — that layout only has to exist where you run flusso build, never inside the image. Get a lock, ship the lock, run the lock.

ℹ️ Info — Schema paths in flusso.toml resolve relative to the config file’s directory, with no globbing; each [[index]] names its schema = "…" explicitly. That’s the only rule the recipes below respect — when you compile the lock, the referenced files must exist at those paths.

Recipe A: bake your own lock (smallest, simplest)

If you already have a flusso.lock (see Recipe C) — or you only have a flat config — this is the whole thing. Build from the published image and copy one file in:

# syntax=docker/dockerfile:1
FROM alias2k/flusso:VERSION
COPY flusso.lock /app/flusso.lock
# ENTRYPOINT/CMD are inherited: `flusso run --public-address 0.0.0.0:9464`
# loads /app/flusso.lock by default.
docker build -t myorg/search:1.0 .

The image is the published base + one file. No Rust, no schema layout, a build context of a few KB. Secrets (DATABASE_URL, <SINK>_OPENSEARCH_URL) come from the environment at run time, so the lock is safe to commit and to bake in.

Don’t want to rebuild an image at all? Mount the lock instead:

docker run --rm -e DATABASE_URL=… -e PRIMARY_OPENSEARCH_URL=… \
  -v "$PWD/flusso.lock:/app/flusso.lock" -p 9464:9464 \
  alias2k/flusso:VERSION

Recipe B: build the lock inside Docker

Want the compile to happen inside Docker (hermetic, reproducible), and your schemas are scattered across the repo? Compile the lock in a builder stage, then copy only the lock into the final image. The trick that keeps the build context tiny without enumerating folders is an allowlist ignore file (see Scoping the .dockerignore):

flusso.Dockerfile:

# syntax=docker/dockerfile:1
FROM alias2k/flusso:VERSION AS lock
WORKDIR /src
COPY . .                                   # context is already pruned to toml + *.schema.yml,
                                           # with their real paths preserved → flusso.toml resolves
RUN flusso build --config flusso.toml --out /app/flusso.lock

FROM alias2k/flusso:VERSION
COPY --from=lock /app/flusso.lock /app/flusso.lock

flusso.Dockerfile.dockerignore:

*
!flusso.toml
!**/*.schema.yml
docker build -f flusso.Dockerfile -t myorg/search:1.0 .

COPY . . is what preserves the scattered directory structure (so the relative schema = "…" paths resolve); the ignore file is what keeps the multi-gigabyte monorepo out of the build context. They’re complementary — neither does the job alone. Add a new schema anywhere in the tree and it just works, no Dockerfile edit.

Recipe C: build the lock in CI, ship one file

The lowest-friction option for a monorepo: compile the lock on the host or in CI, where the repo is checked out and every relative path already resolves, then feed the single artifact to Recipe A.

flusso build --config flusso.toml --out flusso.lock   # inlines all scattered schemas

Commit flusso.lock, or publish it as a CI artifact, and the image build never sees a schema file — there’s no pattern to match, no context to prune, no tree to preserve. Just one file. (flusso build needs the flusso binary; in CI, run it from the published image: docker run --rm -v "$PWD:/src" -w /src alias2k/flusso:VERSION build --config flusso.toml --out flusso.lock.)

Scoping the .dockerignore

A root .dockerignore applies to every build in the repo, which you usually don’t want when flusso is one service among many. BuildKit (you’re on it — every recipe here starts with # syntax=docker/dockerfile:1) lets you scope an ignore file to one Dockerfile: place <dockerfile-name>.dockerignore next to it, and it takes precedence over the root .dockerignore for that build only.

flusso.Dockerfile
flusso.Dockerfile.dockerignore     # used only when building flusso.Dockerfile
.dockerignore                      # everyone else's default, untouched

So the allowlist in Recipe B lives in flusso.Dockerfile.dockerignore and affects nothing else in the repo.

Note

Per-Dockerfile ignore files are a BuildKit feature — honored by docker build, docker buildx, and docker compose. A legacy (non-BuildKit) builder silently falls back to the root .dockerignore.

Why COPY alone can’t do it

It’s tempting to skip the ignore file and just COPY the schemas by pattern. That doesn’t work, for two reasons:

  • No recursive glob. Docker’s COPY uses filepath.Match, where * does not cross /. There’s no **, so you can’t express “every *.schema.yml at any depth.”
  • It flattens. When a wildcard matches several files they all land directly in the destination — the source directory structure is not preserved. That breaks the relative schema = "…" paths immediately.

The filtering has to happen at the context layer (.dockerignore), not the COPY layer. COPY . . then preserves the tree; the scoped ignore file keeps the context small. Hence Recipe B.