flusso
flusso keeps an OpenSearch index in sync with Postgres, driven by declarative config — no cron job, no nightly reindex, no hand-rolled reindex script.
The whole model: two files
A deployment is two kinds of file. That’s the mental model.
| File | One per | Holds | Reference |
|---|---|---|---|
flusso.toml | deployment | where data comes from, where it goes, which indexes to build | Configuring a deployment |
*.schema.yml | index | what one search document looks like — its table, fields, and the related tables that fold in | Authoring schemas |
Every field declares its type from a fixed set that bridges a Postgres column and an OpenSearch mapping. A schema is therefore self-describing: flusso derives the full index mapping — and validates the config — without touching a database.
Change a user or one of their orders, and flusso rebuilds the whole users document and re-emits it. It works out which documents a changed row affects, reassembles each, and writes it by a deterministic id — no instructions about what to update.
This manual
- Getting started — run flusso in three commands, and what Postgres and OpenSearch need first.
- Authoring schemas — the
*.schema.ymlformat: field types, objects, maps, joins, aggregates, geo, filters, soft-delete, validation. - Configuring a deployment — the
flusso.tomlformat, every source and sink option, secrets and{ env = "VAR" }references, theFLUSSO_*flag overrides, the index prefix, and logging/telemetry. - Querying — the typed query-side client,
flusso-query, and its#[derive(FlussoDocument)]. - Deploying — Docker recipes: smallest image, baking or compiling a
flusso.lock, scoped.dockerignore.
The source, the contributor architecture tour, the Helm chart, and the runnable dev/ example all live in the repository on GitHub.
Getting started
Run flusso against the bundled dev stack in three commands, then point it at your own Postgres and OpenSearch.
Quickstart
The dev/ directory is a complete, runnable example — a docker-compose stack (Postgres wired for logical replication, OpenSearch, Dashboards, Prometheus, Grafana), seeded data, and a matching config. With just installed (cargo install just --locked):
just up # bring the whole stack up and wait for it to be healthy
just check # validate the config + schemas against the database
just run # backfill OpenSearch, then follow live changes (serves /status + /metrics)
Then, in another terminal, make changes and watch them stream through:
just psql # make some changes
curl -s localhost:9200/users/_search?pretty # see them land in OpenSearch
just status # live pipeline status
Run just on its own to see every recipe. The full walk-through — resetting state, inspecting the slot, OpenSearch Dashboards on :5601 — lives in the dev/ README.
ℹ️ Info — No
just? Every recipe is a thin wrapper; the rawcargo run -- …anddocker compose …commands are in thejustfile.
The CLI
Three subcommands. Every flag also reads a FLUSSO_* env var (the flag wins when both are set) — handy for containers.
| Command | Does | Database? |
|---|---|---|
flusso build | Compile config + schemas into one portable flusso.lock. No secrets baked in ({ env = "VAR" } refs carry through). | no |
flusso run | Stream changes through the engine. Like cargo run: with a flusso.toml present it recompiles + rewrites flusso.lock, then runs; with no config it loads the existing lock; --locked runs the lock as-is. Credentials resolve here, at run time. | yes |
flusso check | Validate and print the fully-typed mapping. --offline skips the database; without it, declared types are also confirmed against live columns. | optional |
flusso --help
flusso build --config flusso.toml -o flusso.lock # build the portable artifact
flusso check --config flusso.toml # validate (+ check vs database)
flusso check --config flusso.toml --offline # validate without a database
flusso run # run the compiled flusso.lock
flusso run --config flusso.toml # compile from source and run
flusso run --skip-backfill # resume live capture only
Logging honors RUST_LOG (default info); FLUSSO_LOG_FORMAT=json for structured logs. Set the standard OTEL_EXPORTER_OTLP_ENDPOINT and traces export there too. Every environment variable flusso reads — secrets, the FLUSSO_* flags, telemetry — is collected in Configuring a deployment.
Requirements
flusso doesn’t own Postgres or OpenSearch — it’s a guest in both. A few things have to be true before it runs. The dev/ stack sets all of this up; below is what to replicate against your own infrastructure. Full per-source/per-sink options are in Configuring a deployment.
Postgres (the source)
| Requirement | Detail |
|---|---|
PG 14+, wal_level = logical | A restart-required setting. max_wal_senders / max_replication_slots high enough for flusso plus any other consumers. |
| A publication | Covers every table any index reads — root tables and every table a join or aggregate pulls from. flusso manages it when the role is privileged enough (see below); otherwise it logs the exact SQL. |
| A replication slot | flusso always creates it on first connect (needs only REPLICATION). |
| Row identity | A primary key (usual case) or an explicit REPLICA IDENTITY on every replicated table. Keyless tables are skipped in backfill and error on a live change. |
A role with REPLICATION + SELECT | Enough to stream and create the slot. Managing the publication needs more — see below. |
ℹ️ Info — managing the publication. flusso derives the table set from the schema and creates/extends the publication itself, exactly as it does the slot — if the source role can. Creating or extending a publication needs table ownership plus
CREATEon the database (or superuser), a stronger grant than the read-only role above. When the role can’t, flusso doesn’t fail: it logs the exactCREATE PUBLICATION/ALTER PUBLICATION … ADD TABLEto run, andflusso checkprints the same. Set[source] manage_publication = false(orFLUSSO_MANAGE_PUBLICATION=false) to manage it yourself.
⚠️ Warning — Postgres retains WAL until flusso confirms it. A flusso that’s down for a long time means WAL piling up on the server. Drop the slot when you retire a deployment.
OpenSearch (the sink)
| Requirement | Detail |
|---|---|
| OpenSearch 2.x | Also speaks Elasticsearch 7.x on the query side via flusso-query. |
| A reachable HTTP(S) endpoint | The sink url. Optional HTTP Basic auth (username / password); tls_verify defaults to true — turn it off only for self-signed dev clusters. |
| A user that can manage and write flusso’s indexes | Plus the small hidden flusso_meta index where seeded state is recorded. |
💡 Did you know — flusso owns the index lifecycle. It derives a strict typed mapping per schema and names each index from a hash of that schema (
users_<hash>), so a structural change rolls onto a fresh index and re-seeds instead of fighting a mismatched one. The plain logical name (users) is kept as an alias on the current index, queryable without knowing the hash.
Deploying it
- Container image — the
Dockerfilebuilds a registry-ready, config-less image (mount a config or bake your ownflusso.lock). Itsdemotarget bakes the dev config in, which is whatjust demoruns. See Deploying for the recipes. - Kubernetes — the Helm chart deploys flusso as a single instance (one replication slot →
replicas: 1) with config via ConfigMap, secrets via env, a Service, and an optional PrometheusServiceMonitor.
Authoring schemas
One *.schema.yml file describes one search document — its root table, its fields, and how related tables fold in. This guide is the full reference for that format. For the deployment side (flusso.toml), see Configuration.
Quick reference
Fields are written type-first: - <type>: <name>, with siblings the type allows.
fields:
- keyword: email # scalar; document key `email`
required: true
- has_many: orders # join; folds a related table in
table: orders
foreign_key: user_id
primary_key: id
fields: [ { double: total } ]
- count: orderCount # aggregate; rolls a related table up
table: orders
foreign_key: user_id
| Need | Type keys | Jump to |
|---|---|---|
| A scalar column | text identifier keyword enum uuid boolean short integer long float double decimal date timestamp binary json custom | Types |
| Same-row sub-object | object | Objects |
| Dynamic-key object (translations, …) | map | Maps |
| A geographic point | geo | Geo points |
| Fold in a related table | belongs_to has_one has_many many_to_many | Joins |
| Roll a related table up | count sum avg min max ids | Aggregates |
| A fixed value | constant | Fields |
| Treat rows as deleted | top-level soft_delete | soft_delete |
| Index only a subset of rows | top-level filters | Root filters |
ℹ️ Info — Two JSON Schemas are the machine-readable source of truth for the file formats:
config.schema.jsonandindex.schema.yml. Point an editor at them for completion and inline validation.
*.schema.yml
Each schema file describes one search document: the root table it is built from, the fields it contains, and how related tables fold in.
Top-level keys
| Key | Type | Required | Default | Description |
|---|---|---|---|---|
version | int | yes | — | Schema format version. Only 1 is supported. |
table | Postgres identifier | yes | — | The root table the document is built from. |
schema | Postgres identifier | no | public | The database schema the root table lives in. |
primary_key | Postgres identifier | no | — | The root table’s primary-key column. Used to derive the document id and to resolve which documents a related-row change affects. Relations and reverse-resolution require it. |
doc_id | string | no | — | Not supported yet — setting it is a hard error. The document _id is always derived from primary_key. (A non-pk _id needs the id value at delete time, which the pk-keyed tombstone path can’t supply; tracked as a follow-up.) |
soft_delete | object | no | — | Treat rows as deleted based on a column/field rather than a physical DELETE. See below. |
filters | list | no | — | Root filters: only root rows matching every filter become documents. See below. |
fields | list | yes | — | The document’s fields. See Fields. |
version: 1
table: users
schema: public
primary_key: id
soft_delete
When soft_delete is set, a row matching the soft-delete condition emits a
tombstone (a delete to the sink) instead of an upsert. Key it off either a
mapped field or a raw column, and optionally narrow it with when filters.
# Off a column: users.deleted = true → delete.
soft_delete:
column: deleted
# Off a mapped field, narrowed to a subset of rows.
soft_delete:
field: status
when:
- { column: archived, op: eq, value: true }
| Key | Required | Description |
|---|---|---|
column or field | exactly one | The column (Postgres identifier) or mapped field (field name) signalling deletion. |
when | no | A list of filters; the soft-delete applies only to matching rows. |
Root filters
When only a subset of a table should be an index, a top-level filters list
(same filter forms as joins use) scopes which root rows become
documents:
version: 1
table: item
primary_key: id
filters:
- { column: item_type, op: eq, value: serialized }
- { column: archived_at, op: is_null }
A row outside the set never produces a document; a row that leaves the set (an
UPDATE that stops matching) emits a tombstone on its next rebuild, exactly
like soft_delete — both fold into the document query’s WHERE,
so “no row came back” means “this document should not exist”. A row that enters
the set upserts. Backfill walks the whole root table and lets the same predicate
decide, so filtered-out rows cost a no-op delete during seeding.
Fields
fields is a list. Each item is written type-first: a single type key
whose value is the document key, plus the siblings that type allows.
fields:
- keyword: email # a `keyword` scalar; document key `email`
required: true
- text: bio # analyzed full text
required: false
- integer: age
required: false
The type key is one of:
- a scalar type —
text,identifier,keyword,enum,uuid,boolean,short,integer,long,float,double,decimal,date,timestamp,binary,json(see Types) — orcustom(an explicit Postgres/OpenSearch pair); geo— a geographic point (see Geo points);object— a same-row sub-object (see Objects);map— a dynamic-key object (see Maps);belongs_to/has_one/has_many/many_to_many— a related table folded in (see Joins);count/sum/avg/min/max— a rollup over a related table (see Aggregates);constant— a fixed value.
There is exactly one type key per field. Which siblings a field accepts depends on that type:
| Sibling | Applies to | Description |
|---|---|---|
required | scalar, geo, map, to-one join | true forces a scalar/geo/map leaf non-null (nullable otherwise); on a to-one join (belongs_to/has_one) it maps the object non-null — see Joins. Rejected on to-many joins and aggregates (their nullability is structural). |
column | scalar, geo, belongs_to | The source column — for a belongs_to, this table’s column pointing at the related row. Defaults to the document key when omitted. |
options | types with a mapping | Extra OpenSearch mapping properties merged beside the derived type (e.g. analyzer, format, scaling_factor). |
transforms | scalar | Value transforms to apply. See Transforms. |
default | scalar | Value to coalesce a null column to. Must be a scalar (string, number, bool, or date) — an array/object/binary default is a hard error. |
postgres / opensearch | custom | The Postgres types accepted and the OpenSearch type emitted. |
lat / lon | geo | The two coordinate columns (two-column form). |
values | map | Mandatory — the leaf type shared by every value (a leaf kind: text/keyword/number/date). See Maps. |
fields | object, joins | The nested projection. |
table, primary_key, column/foreign_key/through, order_by, filters, limit | joins | Which key sibling applies depends on the verb. See Joins. |
table, column, value_type, element_type, foreign_key, through, filters | aggregates | See Aggregates. |
value | constant | The fixed value (null/absent renders as JSON null). |
fields:
# column source, renamed + transformed + defaulted
- keyword: email
column: email_address
required: false
transforms: [lowercase, trim]
default: "unknown@example.com"
Objects
An object nests sibling columns of the same row under one document key,
without reading a related table. It renders as an OpenSearch object and is never
null; its members declare their own types.
- object: address
fields:
- keyword: street
column: address_street
required: true
- keyword: city
column: address_city
required: true
- keyword: zip
column: address_zip
required: false
→ { "address": { "street": …, "city": …, "zip": … } }, all from one row.
An object differs from a to-one join (belongs_to/has_one): the
join reads a related table by key, an object stays put on the current row.
Optional options pass extra properties to the object mapping.
Maps
A map is a dynamic-key object over a json/jsonb column: the keys are
runtime-determined, but every value shares one leaf type. The motivating case is
translations — {"en": "…", "it": "…"} with an open-ended language set, where
declaring each language as its own field would defeat the purpose.
- map: title
values: text # the leaf type of every value (required)
required: true
values must be a leaf kind — text/identifier (text), keyword/enum/
uuid (keyword), a numeric, or date/timestamp. boolean, binary, json,
geo, and custom are rejected. The field maps to an OpenSearch object with
dynamic: true (injected automatically — an explicit dynamic in options
wins), so unmapped keys are accepted and stay searchable rather than rejected by
the index’s dynamic: strict. column defaults to the document key.
On the query side a map gets a typed handle (flusso-query): .key("it")
returns a fully-typed leaf of the declared kind, and a text map also offers a
cross-key .search(..) with per-key preference. See Querying.
Types
A scalar field declares its type from a fixed set. Each type bridges a
Postgres column type and an OpenSearch mapping type, so the schema describes the
document fully — flusso derives the index mapping (and validates a config) without
a database. Shorthand fields and columns with no type default to keyword.
type | Postgres | OpenSearch | Notes |
|---|---|---|---|
text | text, varchar | text | Analyzed natural-language full text (descriptions, bios) — the default analyzed type. Plain tokenize + accent/case fold. |
identifier | text, varchar | text | Analyzed identifier-like short text (names, SKUs, codes, statuses) — splits on punctuation/case so C-01234 is found by C01234, c-01234, or 01234. |
keyword | text, varchar | keyword | Exact, aggregatable. |
enum | text, varchar, PG enum | keyword | A closed string set stored as text, indexed exactly. |
uuid | uuid | keyword | |
boolean | boolean | boolean | |
short | smallint / int2 | short | |
integer | integer / int4 | integer | |
long | bigint / int8 | long | |
float | real / float4 | float | |
double | double precision / float8 | double | |
decimal | numeric / money | double | Lossy; use a custom scaled_float when exactness matters. |
date | date | date | |
timestamp | timestamp(tz), time | date | |
binary | bytea | binary | |
json | json, jsonb | object |
(A geographic point is a geo field, not a scalar type — see
Geo points.)
For anything the named types don’t cover, declare a custom field with the
OpenSearch type and the Postgres types it accepts:
- custom: price
postgres: [numeric]
opensearch: scaled_float
required: false
options: { scaling_factor: 100 }
options carries any extra OpenSearch mapping properties (analyzers, formats, …)
merged beside the derived type. Objects, joins, aggregates, and geo points carry
their own type keys rather than a scalar type; their shape is structural.
Production-ready defaults. The OpenSearch sink does not emit your
text/keywordfields bare. By default it attaches a strong analyzer and a set of subfields (keyword,keyword_lowercase,text) so search, exact filtering, and case-insensitive sort all work out of the box — see Index analysis & subfields. Anything inoptionsoverrides the default for that field.
text vs identifier
Both are analyzed (full-text searchable) text fields; they differ only in the
analyzer:
text— natural language (descriptions, bios, comments). The default analyzed type; tokenizes on word boundaries with accent/case folding.identifier— short structured strings (names, SKUs, codes, statuses). Splits on punctuation/case soC-01234is found byC01234,c-01234, or01234.
fields:
- text: bio # natural-language analyzer + default subfields
required: false
- identifier: sku # punctuation/case-splitting analyzer + default subfields
required: false
(Use keyword instead for exact match, sort, or aggregation rather than full-text
search.) Both apply only to scalar column fields, and an explicit analyzer in
options always wins over the type’s default. The analyzers themselves are
documented in
Index analysis & subfields.
Geo points
A geo field is a geographic point → OpenSearch geo_point. Two forms:
Two columns — a latitude and a longitude column assembled into a point. A
missing coordinate makes the whole point null (never {lat: null, lon: null},
which OpenSearch rejects):
- geo: location
lat: latitude
lon: longitude
required: false
Single column — a column already holding a geo_point-shaped value: a
json/jsonb {"lat": …, "lon": …} or [lon, lat], or a text "lat,lon":
- geo: location
column: location_json
required: false
PostGIS geometry and PG-native point aren’t accepted directly (they serialize
as WKB / (x,y), which OpenSearch won’t take); expose a generated jsonb/text
column in one of the shapes above. The two-column form needs no such column —
flusso assembles the point in the document query.
Transforms
A list applied in sequence to a column value before it lands in the document:
| Transform | Effect |
|---|---|
lowercase | Lowercase the string value. |
trim | Strip leading/trailing whitespace. |
- keyword: email
required: false
transforms: [trim, lowercase]
Joins
Fold rows from a related table into the document as nested documents. The join’s relationship verb is its type key, and the verb names which table holds the key.
| Type key | The key lives on… | Reads as | Renders as |
|---|---|---|---|
belongs_to | this table (column) | “my column points at the related row” | object (nullable; required: true → non-null) |
has_one | the related table (foreign_key) | “one related row points back at me” | object (nullable; required: true → non-null) |
has_many | the related table (foreign_key) | “many related rows point back at me” | nested array (never null) |
many_to_many | a junction table (through) | “we connect through a junction” | nested array (never null) |
The fields — the projection from each related row — and the related table’s
primary_key are siblings of the type key. The field reading that primary_key
is marked non-null automatically, like the root primary_key.
# My column points at them: embed the user a `created_by` column references.
# `column` defaults to the field name — here, the FK column IS `created_by`.
- belongs_to: created_by
table: users
primary_key: id
fields:
- keyword: email
required: true
- text: name
required: false
# Their column points at me: fold in the rows holding my key.
- has_many: orders
table: orders
foreign_key: user_id
primary_key: id
order_by:
- { column: created_at, direction: desc }
limit: 5
fields:
- integer: id
required: false
- double: total
required: true
- keyword: status
required: true
| Key | Type | Required | Description |
|---|---|---|---|
| (type key) | field name | yes | belongs_to, has_one, has_many, or many_to_many; its value is the document key. |
table | Postgres identifier | yes | The related table. |
primary_key | Postgres identifier | yes | The related table’s primary key. The projected field reading it is forced non-null. |
column | Postgres identifier | belongs_to only | This table’s column pointing at the related row. Defaults to the field name (so belongs_to: created_by reads the created_by column). |
foreign_key | Postgres identifier | has_one/has_many | The related table’s column pointing back at the parent. |
through | object | many_to_many | A junction table. |
required | bool | to-one only | belongs_to/has_one only. true maps the object non-null; omitted means nullable. Rejected on has_many/many_to_many (always a non-null array). |
filters | list | no | Filters narrowing which related rows are folded in. |
order_by | list | no | Ordering — a list of { column, direction }, where direction is asc (default) or desc. Not allowed on belongs_to (its target is unique); on has_one it picks which row becomes the object. |
limit | int ≥ 1 | has_many/many_to_many only | Cap the number of related rows folded in (the to-one verbs imply their own LIMIT 1). |
fields | list | yes | The fields projected from each related row. |
Key arity rule: a join takes exactly the key sibling its verb implies —
column for belongs_to, foreign_key for has_one/has_many, through for
many_to_many. Anything else is a load-time error naming the right one.
A belongs_to target that changes — or is deleted — re-emits every document
pointing at it: flusso finds the referrers on the parent table itself
(WHERE column = <changed key>), so a deleted target rebuilds those documents with
a null object rather than leaving them stale.
The through object (junction table for many-to-many):
| Key | Required | Description |
|---|---|---|
table | yes | The junction table. |
left_key | yes | Column joining the junction to the parent. |
right_key | yes | Column joining the junction to the related table. |
- many_to_many: tags
table: tags
through:
table: post_tags
left_key: post_id
right_key: tag_id
primary_key: id
fields:
- keyword: name
required: true
Aggregates
Reduce rows from a related table to a single value. The operation is the type
key: count, sum, avg, min, max, or ids.
A count is always a non-null long and an avg a nullable double, so they
take no value_type. A sum/min/max mirrors the aggregated column, so it
must declare a column and a value_type.
- count: orderCount
table: orders
foreign_key: user_id
- sum: lifetimeValue
table: orders
column: total
value_type: decimal
foreign_key: user_id
filters:
- { column: status, op: eq, value: paid }
| Key | Type | Required | Description |
|---|---|---|---|
| (type key) | field name | yes | count/sum/avg/min/max/ids; its value is the document key. |
table | Postgres identifier | yes | The related table. |
column | Postgres identifier | conditional | The column to reduce. Required for sum/avg/min/max; not used by count/ids. |
value_type | type name | conditional | The result type. Required for sum/min/max (it mirrors the column); not used by count/avg/ids. |
element_type | type name | conditional | Required for ids (and only ids): the scalar type of each collected primary key — long or keyword. |
foreign_key | Postgres identifier | conditional | The aggregated table’s column pointing back at the parent (exactly one of foreign_key xor through). |
through | object | conditional | Junction table for aggregating across many-to-many. |
filters | list | no | Filters restricting which rows count. |
ids — a flat array of a related table’s primary keys
ids collects the related table’s primary key into a flat scalar array (it
takes no column — the key is always the related table’s PK). OpenSearch has no
array type, so the field’s mapping type is just the element type
(element_type: long → type: long, keyword → type: keyword); the value is
multi-valued. An empty relation yields [], never null, so the field is non-null
(project it as a bare Vec<…>, not Option<Vec<…>>).
# one-to-many: orders.user_id points back at this row
- ids: orderIds
table: orders
foreign_key: user_id
element_type: long
# many-to-many: collected straight off the junction's right_key
- ids: tagIds
table: tags
through: { table: post_tags, left_key: post_id, right_key: tag_id }
element_type: long
Filters
Filters narrow which related rows a join or aggregate sees, which rows a
soft_delete applies to, and — as the top-level filters key —
which root rows become documents at all. Three forms:
Raw SQL — an escape hatch for conditions the structured forms can’t express:
- { raw: "amount > 0 AND currency = 'USD'" }
Null check — no value operand:
- { column: deleted_at, op: is_null }
- { column: confirmed_at, op: is_not_null }
Value comparison — operator plus a value whose shape matches its arity:
- { column: status, op: eq, value: paid }
- { column: status, op: in, value: [paid, shipped] }
- { column: total, op: between, value: [10, 100] }
op | Value shape |
|---|---|
eq, neq, lt, lte, gt, gte, like, ilike | a single scalar |
in, not_in | a list |
between | a list of exactly two values [lower, upper] |
is_null, is_not_null | (no value) |
A value op with a missing value, a list op given a scalar, or a between with
other than two values is a load-time error.
Conventions
Identifiers
Two distinct identifier rules apply depending on what is being named:
| Rule | Applies to | Pattern | Notes |
|---|---|---|---|
| Postgres identifier | table, column, schema, index, and sink names | ^[a-z_][a-z0-9_]*$, max 63 chars | Lowercased and trimmed on load, matching Postgres’ folding of unquoted identifiers. A name that isn’t a valid identifier this way must be addressed explicitly (e.g. set column:). |
| Field name | the document key a field lands under (field:) | ^[a-zA-Z_][a-zA-Z0-9_]*$, max 63 chars | Case is preserved — field: orderCount stays camelCase in the emitted document. Only trimmed. |
The split is deliberate: the value comes from a Postgres column (lowercase identifier) but lands under a document key you choose (which may be camelCase to suit the search index).
Validation, in one place
Loading enforces — beyond what the file format itself can express — that:
- the schema
versionis supported (only1); - all table/column/schema/index/sink names are valid Postgres identifiers, and field names are valid field-name identifiers;
- each field has exactly one type key, and only the siblings that type allows;
- a join carries exactly the key sibling its verb implies —
columnforbelongs_to(defaulting to the field name),foreign_keyforhas_one/has_many,throughformany_to_many— and the to-one verbs take nolimit(nororder_by, forbelongs_to); - an aggregate specifies exactly one of
foreign_keyorthrough; sum/avg/min/maxaggregates carry acolumn, andsum/min/maxalso declare avalue_type(it mirrors the column);- an
idsaggregate declares anelement_type(a scalar type) and takes nocolumnorvalue_type;element_typeis rejected on every other op; - a
geofield gives eitherlatandlon, or a singlecolumn; - a
betweenfilter has exactly two values, andin/not_inget a list.
A failure at any of these stops the load with a specific error naming the cause.
None of it needs a database. When the source is reachable, flusso check
additionally confirms each declared type and nullability against the live columns
and reports any disagreement.
A complete example
users.schema.yml:
version: 1
table: users
schema: public
primary_key: id
soft_delete:
column: deleted
fields:
- integer: id
required: false
- keyword: email
required: true
transforms: [lowercase, trim]
- text: name
required: false
- has_many: orders
table: orders
foreign_key: user_id
primary_key: id
order_by:
- { column: id, direction: asc }
fields:
- integer: id
required: false
- double: total
required: true
- keyword: status
required: true
- count: orderCount
table: orders
foreign_key: user_id
A change to a users row — or to any of that user’s orders — rebuilds the whole
users document and re-emits it to every sink. Setting users.deleted = true
emits a tombstone instead.
Configuring a deployment
One flusso.toml file describes a deployment — the source database, the sink destinations, and which indexes to build — plus the environment that feeds it secrets and runtime flags. This guide is the single reference for that file and that environment. (Each index’s own *.schema.yml is covered in schema authoring.)
Quick reference
| Looking for… | Jump to |
|---|---|
Every flusso.toml top-level key | The flusso.toml format |
| Postgres source options | Postgres |
| OpenSearch sink options + defaults | OpenSearch |
| Which subfield to query (exact / full-text / sort) | Index analysis & subfields |
| Stdout sink envelope | Stdout |
Secrets, { env = "VAR" }, the reserved overrides + precedence | Secrets & connection values |
The FLUSSO_* flag env vars | CLI flags as env vars |
| Status / metrics / control ports + auth | HTTP surfaces |
| Sharing one cluster across deployments | Index prefix |
RUST_LOG, OTLP, Prometheus | Logging & telemetry |
| A copy-paste env block | Cheat sheet |
flusso reads env vars for three jobs: filling in config values (the secrets story), setting CLI flags (every flag has a FLUSSO_* twin — CLI flags as env vars), and logging & telemetry (below).
Every key + default
| Key | Where | Default | Purpose |
|---|---|---|---|
on_error | top-level / [[index]] | "stop" | item-rejection policy — stop or skip (on_error) |
prefix | top-level | — | prepend to every owned index name (index prefix) |
type | [source] | — | postgres |
connection_url | [source] | — | full URL or parts; DATABASE_URL overrides |
manage_publication | [source] | true | auto-create/extend the publication |
type | [sinks.<name>] | — | opensearch or stdout |
url | opensearch sink | — | cluster URL; <NAME>_OPENSEARCH_URL overrides |
username / password | opensearch sink | — | HTTP Basic auth |
tls_verify | opensearch sink | true | verify TLS certs |
batch_size | opensearch sink | 1000 | docs per bulk chunk |
max_bytes | opensearch sink | 10 MiB | bytes per bulk chunk |
timeout_secs | opensearch sink | 30 | HTTP request timeout |
max_retries | opensearch sink | 3 | transient-failure retries |
pipeline | opensearch sink | — | ingest pipeline applied on index |
number_of_shards | opensearch sink | 1 | primary shards per index |
number_of_replicas | opensearch sink | 1 | replica shards per index |
refresh_interval | opensearch sink | "10s" | steady-state refresh ceiling ("-1" disables) |
text_analysis | opensearch sink | builtin | analyzer toolkit — builtin or icu |
auto_subfields | opensearch sink | true | auto subfields on text/keyword |
pretty | stdout sink | false | pretty JSON instead of NDJSON |
name / schema / enabled | [[index]] | — | logical name / schema path / build on this run |
public_address / private_address | [server] | 127.0.0.1:9464 / :9465 | HTTP bind addresses (HTTP surfaces) |
ℹ️ Info —
schema::load("flusso.toml")is the front door: it reads the config and every schema it references, validates both layers, and returns one fully-validatedConfig. Schema paths resolve relative to the config file’s directory. Two JSON Schemas are the machine-readable source of truth —config.schema.jsonandindex.schema.yml; point an editor at them for completion.
The flusso.toml format
Top-level table. Only [source] is required.
| Key | Required | Description |
|---|---|---|
[source] | yes | The database to read from. |
[sinks.<name>] | no | Named destinations. Zero or more; each key is a sink name (a Postgres identifier). |
[[index]] | no | The indexes to build. Zero or more array entries. |
on_error | no | What to do when a sink rejects a document at the item level: "stop" (default) or "skip". See on_error. |
prefix | no | Literal string prepended to every index name flusso owns (indexes, aliases, flusso_meta), so deployments can share one cluster — e.g. prefix = "dev_" → dev_users. Overridable at runtime by --index-prefix / FLUSSO_INDEX_PREFIX. See Index prefix. |
[source]
The database documents are read from — one per deployment. type selects the kind:
type | Reference |
|---|---|
postgres | Postgres source |
[source]
type = "postgres"
connection_url = "postgresql://user:pass@localhost:5432/mydb"
Connection options (full-URL and individual-parts forms, the DATABASE_URL override) and
capture behavior live in Sources and
Secrets & connection values.
[sinks.<name>]
Named destinations; each key is a sink name (a Postgres identifier) and type selects the
kind. Define more than one and flusso fans out — every document is written to all of
them. If no sinks are defined, the CLI falls back to a stdout sink.
type | Reference |
|---|---|
opensearch | OpenSearch sink |
stdout | Stdout sink |
[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }
[sinks.audit]
type = "stdout"
pretty = true
Each type’s full option set and behavior is documented in Sinks.
[[index]]
One array entry per index to build.
| Key | Type | Required | Description |
|---|---|---|---|
name | Postgres identifier | yes | The logical index name — the pipeline’s stable identity. |
schema | path | yes | Path to the index’s *.schema.yml, relative to the config file. Must end in .yml/.yaml. |
enabled | bool | yes | Whether this index is built on this run. |
on_error | "stop" | "skip" | no | Override the global on_error for this index. Omitted inherits the global default. |
[[index]]
name = "users"
schema = "users.schema.yml"
enabled = true
The *.schema.yml referenced here is documented in schema authoring.
on_error
When a sink accepts a flush but rejects a specific document — a mapping conflict, a value
the destination can’t index — on_error decides what happens. It governs only these
item-level rejections; a flush-wide failure (the destination unreachable, the whole
request refused) always stops the run.
| Value | Behavior |
|---|---|
"stop" (default) | Stop the run. The batch is left unconfirmed and redelivered on restart, so a persistently-bad document halts sync until the data is fixed or the policy changes. Dropping data is opt-in. |
"skip" | Quarantine the document (logged, counted in flusso.documents.quarantined and the /status documents_quarantined) and continue. The rest of the batch is applied and acked; the document never lands until its source row changes again. |
A global on_error is the default for every index; override it per index with on_error
inside an [[index]] entry. The policy is operational, not part of the document shape, so
changing it never triggers a reindex.
on_error = "stop" # global default
[[index]]
name = "analytics"
schema = "analytics.schema.yml"
enabled = true
on_error = "skip" # this index tolerates bad rows
prefix
See Index prefix for the full behavior, the runtime overrides, and the rules for sharing one OpenSearch cluster across deployments.
The model
One source, many sinks. Source, sink, and the in-process queue are all trait objects, so backends swap without touching the engine. Today that’s Postgres in, OpenSearch (or stdout) out — the only backends documented here.
Sources
A source is where rows come from. There is exactly one per deployment, and one type today.
Postgres
[source]
type = "postgres"
connection_url = "postgresql://user:pass@localhost:5432/mydb"
manage_publication = true # optional; default true
flusso follows Postgres’ logical replication stream to capture changes, and snapshots tables to seed an index before following live changes.
| Key | Type | Default | |
|---|---|---|---|
connection_url | URL / parts | — | see below |
manage_publication | bool | true | auto-create/extend the publication when privileged; see capture |
Connection
connection_url takes one of two shapes.
A full URL — a string or env_or_value. Must match
^(postgresql|postgres)://…:
connection_url = "postgresql://user:pass@localhost:5432/mydb"
connection_url = { env = "DATABASE_URL" }
Individual parts — database is required; the rest default:
| Part | Type | Default | |
|---|---|---|---|
host | string | 127.0.0.1 | |
port | int (1–65535) | 5432 | |
user | string | postgres | |
password | env_or_value | — | optional |
database | string | — | required |
[source.connection_url]
host = "127.0.0.1"
port = 5432
user = "postgres"
password = { env = "PGPASSWORD" }
database = "mydb"
Either shape can be overridden by a reserved deployment variable, so the same config travels across environments unedited — see Secrets & connection values for the override and precedence rules.
How it captures changes
- Logical replication (WAL). flusso consumes a logical replication slot. The slot is
created automatically if it does not exist. The publication is also managed
automatically: flusso derives the full table set from your schema (root tables plus
every table a join or aggregate reads) and creates or extends the publication to cover
it — provided the source role can (it must own those tables and hold
CREATEon the database, or be superuser). When it can’t, flusso logs the exactCREATE/ALTER PUBLICATIONSQL instead and keeps going;flusso checkprints the same coverage report. Setmanage_publication = false(orFLUSSO_MANAGE_PUBLICATION=false/--manage-publication false) to opt out and manage the publication yourself. Slot and publication names are CLI flags (--slot,--publication, both defaulting toflusso). - Backfill. Before live capture, the engine asks each sink whether an index is already
seeded and, for those that aren’t, snapshots the root tables to seed them.
--skip-backfillresumes live capture only. - Requires
wal_level = logicalon the server. See thedev/environment for a ready-to-run Postgres configured for this.
Sinks
A sink is where assembled documents land. Each entry under [sinks] is a named
destination — the key is a sink name (a Postgres identifier) and type selects the kind:
[sinks.primary] # name: "primary"
type = "opensearch"
url = "https://localhost:9200"
[sinks.audit] # name: "audit" — documents go here too (fan-out)
type = "stdout"
pretty = true
OpenSearch
[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }
batch_size = 2000
Writes documents to an OpenSearch cluster via the bulk API — the sink you deploy.
url, username, and password each accept an env_or_value (a literal
or a { env = "VAR" } reference resolved at run time), and can also be supplied or
overridden per sink via reserved deployment-override variables — naming and precedence in
Secrets & connection values.
| Key | Type | Default | Description |
|---|---|---|---|
url | env_or_value | — (required) | Base URL of the cluster, e.g. https://search.example.com:9200. |
username | env_or_value | — | HTTP Basic Auth username. |
password | env_or_value | — | HTTP Basic Auth password. |
tls_verify | bool | true | Verify TLS certificates. Set false only for local development. |
batch_size | int ≥ 1 | 1000 | Maximum documents per bulk-request chunk. |
max_bytes | int | 10485760 (10 MiB) | Maximum bytes per bulk chunk; within OpenSearch’s recommended 5–15 MB range, well under the 100 MB http.max_content_length default. A single document larger than this is sent on its own. |
timeout_secs | int ≥ 1 | 30 | HTTP request timeout, in seconds. |
max_retries | int ≥ 0 | 3 | Additional retry attempts on transient failures (exponential backoff). |
pipeline | string | — | Optional OpenSearch ingest pipeline applied on every index operation. |
number_of_shards | int ≥ 1 | 1 | Primary shards for each created index. |
number_of_replicas | int ≥ 0 | 1 | Replica shards for each created index. |
refresh_interval | string | "10s" | OpenSearch refresh_interval applied to each index after seeding — the steady-state visibility ceiling (e.g. "10s", "1s", or "-1" to disable auto-refresh). flusso forces an immediate refresh whenever the pipeline catches up, so this only bounds staleness while a backlog drains (see below). |
text_analysis | builtin | icu | builtin | Analysis backend for the flusso_* analyzers (see below). icu requires the analysis-icu plugin on every node. |
auto_subfields | bool | true | Auto-enrich text/keyword fields with a good analyzer and subfields. A field’s explicit options always win; set false to emit fields bare. |
How it owns its indexes:
- Explicit, fully-typed mapping. The sink creates each index up front from the resolved
schema mapping,
dynamic: strict— field types come from the schema, not OpenSearch’s dynamic guesses, and only configured fields are accepted. An index that already exists is left untouched. - Hashed name over generations. The addressable name is
{logical}_{hash}(the hash derives from the parsed index schema) — itself a hash alias over a concrete generation index{logical}_{hash}_{gen}that holds the data. A structural schema change moves the hash, so the sink writes a fresh alias + generation (re-seeded from scratch) rather than into the old, mismatched shape. An on-demand reindex builds the next generation behind the same hash alias and repoints atomically when it’s seeded. flusso and theflusso-queryclient address{logical}_{hash}; the generation detail is documented in theflusso-sinks-opensearchcrate. - Convenience alias. The bare logical name (
users) is also kept as an alias on the current generation, so a human or ad-hoc tool canGET /users/_searchwithout knowing the hash. Best-effort: if it fails (e.g. the cluster already has a real index named like the alias), flusso logs a warning and carries on — correctness never depends on it. - Refresh adapts to the backlog. Created with auto-refresh disabled (
refresh_interval: -1) for fast bulk seeding; on seeding completion the index is handed the configuredrefresh_interval(default"10s") — the steady-state visibility ceiling. Aflushalso forces an immediate refresh whenever the pipeline has caught up (no backlog behind the batch), so search is fresh when traffic is light but bulk indexing stays cheap while a backlog drains. Therefresh_intervalonly bounds staleness during sustained backlog — raise it for more write throughput under load, lower it (toward1s) for fresher search while behind. - Production-ready defaults. Created indexes ship a tuned
analysisblock and, unlessauto_subfieldsis off, well-shapedtext/keywordfields — see Index analysis & subfields. - Seeding markers. Seeded state is persisted in a hidden
flusso_metaindex, so a restart skips a completed backfill instead of redoing it.
Index analysis & subfields
The sink creates every index with a good search setup out of the box. flusso owns the index (mapping + analyzers + subfields); your application owns the queries. The notes below say which subfield to target for each job.
Analyzers (always defined, named flusso_*):
| Name | What it does |
|---|---|
flusso_code | The type: identifier analyzer (and the analyzer on a keyword field’s text subfield). Splits on punctuation, case, and letter↔digit boundaries, then lowercases and folds accents. C-01234 indexes as c-01234, c01234, c, 01234, so it’s found by C01234, c-01234, or 01234 — but not by a fuzzy c1234 (add fuzziness on the query side if you want that). Tuned for identifier-like short text. |
flusso_text | The default for type: text — natural language. Plain tokenize + lowercase + accent fold, no code-splitting. |
flusso_lowercase | A normalizer (single token, no splitting) for case- and accent-insensitive exact match and sort. |
With text_analysis = "icu" the folding/tokenizing swaps to the analysis-icu plugin
(icu_tokenizer / icu_folding / icu_normalizer) for stronger multilingual handling —
proper CJK/Thai segmentation and folding across every script. The plugin must be
installed on every node (opensearch-plugin install analysis-icu) or index creation
fails; builtin (the default) needs no plugins.
Default subfields (when auto_subfields is on — the default):
| Field type | Shape | Query each subfield for… |
|---|---|---|
text | analyzer: flusso_text + .keyword + .keyword_lowercase | the field itself → full-text search; .keyword → exact filter / aggregation / exact sort; .keyword_lowercase → case-insensitive sort & exact lookup. |
keyword | .text (flusso_code) + .keyword_lowercase | the field itself → exact term / aggregation; .text → full-text search; .keyword_lowercase → case-insensitive sort. |
keyword subfields cap at ignore_above: 256. Other types (long, date, boolean, …)
and the object/nested containers are emitted as-is. Any key you set in a field’s
options overrides the auto default for that field — e.g. your own analyzer replaces
flusso_code, and your own fields replaces the auto subfields wholesale.
Example query against a text field name, precise (all terms must match) and
case/punctuation-insensitive:
{ "query": { "match": { "name": { "query": "C-01234", "operator": "and" } } } }
Stdout
[sinks.audit]
type = "stdout"
pretty = true
Writes each operation to standard output as a JSON envelope — for development and piping
into jq.
| Key | Type | Default | Description |
|---|---|---|---|
pretty | bool | false | Pretty-print JSON instead of compact one-line NDJSON. |
Every envelope carries provenance and bookkeeping so a stream is self-describing: which
sink and version produced it (sink, version), when (ts), in what order (seq), the
index, the op (upsert / delete) and id, plus a meta summary (top-level field
count and serialized byte size). An upsert carries the document; a delete does not.
{"document":{"email":"ada@x.io"},"id":"42","index":"users","meta":{"bytes":20,"fields":1},"op":"upsert","seq":1,"sink":"stdout","ts":"2026-06-03T10:20:30.123Z","version":"0.1.0"}
{"id":"7","index":"users","op":"delete","seq":2,"sink":"stdout","ts":"2026-06-03T10:20:30.124Z","version":"0.1.0"}
Logs go to stderr, so stdout stays a clean data stream.
Secrets & connection values
flusso never bakes a secret into a compiled config. A flusso.lock carries only the
names of the variables to read; the real values are read in the environment that runs the
pipeline. Compile in CI, run in prod, and the secret never rides along in between.
env_or_value references
Anywhere a secret or connection string is expected in flusso.toml, give either a literal
string or a reference to an environment variable:
password = "literal-secret" # literal — carried as-is
password = { env = "OS_PASSWORD" } # read from $OS_PASSWORD when the pipeline runs
Either form is accepted wherever this guide says a value is an env_or_value. The variable
name is yours. Resolution is deferred to run time — which lets a
compiled artifact travel without baking in its secrets. An unset variable
fails at run time, not compile time — by design, so the compile step needs no secrets.
Reserved deployment-override variables
A few well-known names act as a deployment override layer: set them and the same
flusso.toml works across environments unedited (12-factor). When set, they take priority
over the file value (and the override is logged at startup):
| Variable | Overrides / fills | Notes |
|---|---|---|
DATABASE_URL | the source connection_url | The source is a singleton, so one well-known name is unambiguous. |
<SINK>_OPENSEARCH_URL | a sink’s url | <SINK> is the uppercased sink name — [sinks.primary] → PRIMARY_OPENSEARCH_URL. |
<SINK>_OPENSEARCH_USERNAME | a sink’s username | Same naming. |
<SINK>_OPENSEARCH_PASSWORD | a sink’s password | Same naming. |
The per-sink prefix means several OpenSearch sinks never collide (PRIMARY_…,
SECONDARY_…).
Precedence
When more than one source could supply a value, highest wins:
- An explicit
{ env = "X" }reference — names its own source, never overridden by a reserved variable. (IfXis unset, that’s an error.) - The reserved variable, if set — overrides a literal in the file and fills a value the file omitted.
- The literal value in the config.
- Otherwise → an error, for anything required (the source URL, a sink
url).
Shortcut: “I asked for a specific variable” beats “the deployment set the well-known one” beats “whatever’s written in the file.”
[source]
type = "postgres"
connection_url = "postgres://localhost/dev" # $DATABASE_URL wins if set
[sinks.primary]
type = "opensearch"
url = "https://localhost:9200" # $PRIMARY_OPENSEARCH_URL wins if set
# username / password omitted → filled from
# $PRIMARY_OPENSEARCH_USERNAME / $PRIMARY_OPENSEARCH_PASSWORD
CLI flags as env vars
Every flusso flag also reads a FLUSSO_* environment variable. The flag wins when both
are set — env is the fallback.
| Variable | Flag | Commands |
|---|---|---|
FLUSSO_CONFIG | --config | build, check, run |
FLUSSO_OUT | --out | build |
FLUSSO_LOCK | --lock | run |
FLUSSO_LOCKED | --locked | run |
FLUSSO_SLOT | --slot | run |
FLUSSO_PUBLICATION | --publication | run, check |
FLUSSO_MANAGE_PUBLICATION | --manage-publication | run, check |
FLUSSO_SKIP_BACKFILL | --skip-backfill | run |
FLUSSO_PRETTY | --pretty | run |
FLUSSO_QUEUE_CAPACITY | --queue-capacity | run |
FLUSSO_PUBLIC_ADDRESS | --public-address | run |
FLUSSO_PRIVATE_ADDRESS | --private-address | run |
FLUSSO_ADMIN_USER | --admin-user | run, indexes, reindex |
FLUSSO_ADMIN_PASSWORD | --admin-password | run, indexes, reindex |
FLUSSO_SERVER | --server | indexes, reindex |
FLUSSO_LAG_POLL_SECS | --lag-poll-secs | run |
FLUSSO_INDEX_PREFIX | --index-prefix | run |
FLUSSO_OFFLINE | --offline | check |
FLUSSO_FORMAT | --format | check |
FLUSSO_SCHEMA | the schema-kind argument | schema |
flusso <cmd> --help shows the matching [env: FLUSSO_…] next to each flag.
The two HTTP surfaces’ bind addresses also fall back to a [server] table in
flusso.toml — see HTTP surfaces.
HTTP surfaces
flusso serves two HTTP surfaces; both read the daemon’s live status.
| Surface | Default bind | Auth | Endpoints |
|---|---|---|---|
| Public (read-only) | 127.0.0.1:9464 | none | /healthz /readyz /status /metrics |
| Private (control) | 127.0.0.1:9465 | HTTP Basic | /indexes, /reindex |
- Bind address —
--public-address/--private-address, theFLUSSO_PUBLIC_ADDRESS/FLUSSO_PRIVATE_ADDRESSenv vars, or a[server]table influsso.toml. Precedence: flag > env >[server]config > default. - Basic-auth credentials —
--admin-user/--admin-password(defaultadmin/flusso). Flag/env only, never the config file — they’re secrets. Theindexes/reindexclient subcommands reuse them and take--server/FLUSSO_SERVERto address a running server’s private surface.
⚠️ Warning — The default control-surface credentials are
admin/flusso. Change them before binding the private surface anywhere but localhost.
Index prefix
--index-prefix / FLUSSO_INDEX_PREFIX (also the prefix key in flusso.toml) prepends a
literal string to every index name flusso owns — the indexes, their aliases, and the
flusso_meta index. Precedence is flag > env > config > none. Use it to run several
deployments against one OpenSearch cluster without collision: set dev_, staging_,
nightly_, and each gets its own dev_users / staging_users / … with independent seed
state.
- You include the separator.
dev_→dev_users;dev→devusers. - Validated at startup. Lowercase, no characters OpenSearch forbids, and a leading
letter or digit (an index name can’t start with
_/-/+). A bad prefix fails the run fast. - Read side must match. The
flusso-queryconsumer applies the prefix at runtime (Client::index_prefix, typically wired from the sameFLUSSO_INDEX_PREFIX) — see querying. The compile-time derive is unaffected, so one consumer binary serves every environment. - Changing it re-roots everything. Turning a prefix on (or changing it) points flusso at brand-new names and triggers a full reseed; the old indexes/aliases are left orphaned.
Logging & telemetry
| Variable | Default | Effect |
|---|---|---|
RUST_LOG | info | Log verbosity / filtering (tracing env filter syntax, e.g. flusso=debug,info). |
FLUSSO_LOG_FORMAT | text | Set to json for structured JSON logs (one object per line). |
NO_COLOR | unset | Set to anything to disable colored CLI output (also auto-off when not a TTY). |
OTEL_EXPORTER_OTLP_ENDPOINT | unset | Base OTLP endpoint. Its presence turns on trace + metric export. |
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT | unset | Traces-only endpoint (enables trace export on its own). |
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT | unset | Metrics-only endpoint (enables metric export on its own). |
OTEL_EXPORTER_OTLP_PROTOCOL | http/protobuf | OTLP transport for both signals: http/protobuf or grpc. Unrecognized values warn and fall back to http/protobuf. |
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL | (general) | Per-signal transport override for traces. |
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL | (general) | Per-signal transport override for metrics. |
With no OTLP endpoint set, the exporters aren’t installed and cost nothing — telemetry is
opt-in. When an endpoint is configured, the rest of the standard OTEL_* variables
(OTEL_EXPORTER_OTLP_HEADERS, OTEL_SERVICE_NAME, …) are honored by the OpenTelemetry SDK.
Transport and port go together. flusso defaults to OTLP over HTTP/protobuf
(conventionally port :4318). Set OTEL_EXPORTER_OTLP_PROTOCOL=grpc to switch to OTLP/gRPC
(conventionally :4317 — the OpenTelemetry Collector’s and Jaeger’s default receiver).
flusso does not infer the protocol from the port, so the endpoint must match the
protocol you choose: pointing the default HTTP exporter at a gRPC :4317 port just loops
HTTP export failed: network error. The per-signal *_TRACES_PROTOCOL /
*_METRICS_PROTOCOL override the general one.
Prometheus metrics are a separate, pull-based path: served at
/metricson the public surface (default127.0.0.1:9464), no env var required.
The derive (compile-time)
#[derive(FlussoDocument)] (the flusso-query client) reads FLUSSO_CONFIG
at compile time to locate flusso.toml when it can’t be found by walking up from the
crate’s CARGO_MANIFEST_DIR. Same name as the CLI flag, consumed by the proc-macro instead
of the binary. (You can also point a single struct at a config with
#[flusso(config = "…")].)
Compiling
flusso build --config config.toml -o flusso.lock runs all validation and writes the whole
validated configuration — every schema inlined — to a single binary artifact (MessagePack).
Because schemas are self-describing and secrets are
deferred, compiling needs no database and bakes in
no secret: { env = … } references travel as references, not values.
flusso run with no --config loads that artifact and resolves the connection and
credentials in its own environment; flusso run --config flusso.toml compiles from source
and runs that. So a deployment ships one file — no YAML tree, no source checkout — and the
same artifact runs anywhere its environment provides the secrets. The Docker shipping
recipes are in deploying.
Cheat sheet
# secrets & connections (resolved at run time)
DATABASE_URL=postgres://user:pass@host:5432/db
PRIMARY_OPENSEARCH_URL=https://opensearch:9200
PRIMARY_OPENSEARCH_USERNAME=flusso
PRIMARY_OPENSEARCH_PASSWORD=… # plus any names you used in { env = "…" }
# CLI flags (flag wins if both set) — see the table above for the full list
FLUSSO_CONFIG=flusso.toml
FLUSSO_SLOT=flusso
FLUSSO_PUBLICATION=flusso
FLUSSO_MANAGE_PUBLICATION=false # off = never issue publication DDL, only warn
FLUSSO_PUBLIC_ADDRESS=0.0.0.0:9464 # read-only surface (health/status/metrics)
FLUSSO_PRIVATE_ADDRESS=0.0.0.0:9465 # control surface (indexes/reindex), Basic auth
FLUSSO_ADMIN_USER=admin # change these before exposing the private port!
FLUSSO_ADMIN_PASSWORD=change-me
FLUSSO_SKIP_BACKFILL=true
# logging & telemetry
RUST_LOG=flusso=debug,info
FLUSSO_LOG_FORMAT=json
OTEL_EXPORTER_OTLP_ENDPOINT=http://collector:4318
A complete example
flusso.toml:
[source]
type = "postgres"
connection_url = { env = "DATABASE_URL" }
[sinks.primary]
type = "opensearch"
url = "https://localhost:9200"
password = { env = "OS_PASSWORD" }
[sinks.audit]
type = "stdout"
pretty = true
[[index]]
name = "users"
schema = "users.schema.yml"
enabled = true
The accompanying users.schema.yml (and the full schema-authoring reference) lives in
schema authoring. A change to a users row — or to any related row
the schema folds in — rebuilds the whole users document and re-emits it to every sink.
flusso-query — the typed query client
flusso keeps an OpenSearch index in sync with Postgres from a declarative schema
(the write side; see README.md). That schema is a contract: it
fixes the shape of every document — which fields exist, their types, which are
nested arrays, which are scalars. flusso enforces that contract on the write
side; flusso-query enforces it on the read side.
flusso-query does not generate the document types for you. You write the
document struct by hand and keep full control — its derives, field types, which
fields it projects, how doc keys map to Rust names. #[derive(FlussoDocument)]
then, at compile time and with no database, does two things against the resolved
schema:
- Validates that every struct field lines up with the schema — the field exists, its Rust type matches, its nullability matches. A struct that has drifted stops compiling, pointing at the offending field.
- Generates the typed query surface from the schema — field handles,
get/query, the schema hash — so a service queries the index like a typed function: field names checked at compile time, operators that only exist for the field types that support them, results that deserialize into your struct.
The query surface comes from the full schema, not from the struct — so you can
filter or sort on a field even if your struct doesn’t deserialize it. When the
schema changes and the index is rebuilt, anything that no longer fits stops
compiling at cargo build.
Quick reference
| Looking for… | Jump to |
|---|---|
| A full worked example (structs + query) | A query, from the caller’s seat |
| Which operators each field type exposes | What the field type buys you |
and/or/not, clause vs combinator style, count/ids | Composing queries |
| Per-query options, compound/scoring, standalone, sort, search-level | Query options and extra query types |
Nested arrays — filter by vs filter of (any/all, filter_nested) | Filtering nested collections |
| Several searches, one round-trip | _msearch |
| One blended result list across indexes | Combined search |
How the macro finds the schema; path for nested structs | Binding to the schema |
flusso type → Rust type → handle | flusso types → Rust types |
| Anything the typed builder can’t express | The escape hatch |
| Reading a prefixed deployment | Resolving the index name |
A query, from the caller’s seat
A service that searches the users index from the schema guide. You write
the structs; #[derive(FlussoDocument)] validates them and generates the query
surface. The only input is the index name — it finds flusso.toml itself (see
Binding to the schema).
The document structs
#![allow(unused)]
fn main() {
use flusso_query::{Client, FlussoDocument};
/// A `users` document — *you* write this. It's a **projection**: it deserializes
/// the fields below and omits the rest of the index (addresses, profile,
/// avgOrderValue, …), which the derive allows. The derive checks every field
/// against the `users` schema and hangs the typed query surface off `User`.
#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users")] // ← the only input: which index
pub struct User {
pub id: i32, // primary key (integer) — never null
pub email: String, // keyword, required → never null
#[serde(rename = "fullName")]
pub full_name: Option<String>, // text, not required → nullable
pub account: Account, // an object — always assembled
pub orders: Vec<Order>, // has_many join → nested, never null
#[serde(rename = "orderCount")]
pub order_count: i64, // count aggregate → long, never null
#[serde(rename = "lifetimeValue")]
pub lifetime_value: Option<f64>, // sum aggregate → nullable
}
/// The `account` object — a same-row sub-object. A nested/object struct validates
/// against its `path` in the same index; it has no entry points of its own.
#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users", path = "account")]
pub struct Account {
pub tier: String, // enum → keyword, required
pub country: Option<String>, // keyword, not required
#[serde(rename = "createdAt")]
pub created_at: time::OffsetDateTime, // timestamp, required
}
#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users", path = "orders")]
pub struct Order {
pub status: String, // enum, required
pub total: Decimal, // decimal column (`decimal` feature); or `f64`
#[serde(rename = "placedAt")]
pub placed_at: time::OffsetDateTime, // timestamp, required
pub items: Vec<Item>, // a deeper has_many → nested
}
#[derive(Debug, Clone, serde::Deserialize, FlussoDocument)]
#[flusso(index = "users", path = "orders.items")]
pub struct Item {
#[serde(rename = "productId")]
pub product_id: i32,
pub quantity: i32,
#[serde(rename = "unitPrice")]
pub unit_price: Decimal,
}
}
The query
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Transport. Points at OpenSearch, not at flusso — the engine is write-only;
// reads go straight to the index it maintains.
let client = Client::connect("https://localhost:9200")?
.basic_auth("admin", std::env::var("OS_PASSWORD")?);
// Fetch one document by its id. `id` is typed as the root table's primary
// key (i32 here), and the result is Option<User> — None when absent.
let user: Option<User> = User::get(&client, 42).await?;
// A typed search. Every `User::field()` is a handle that only exposes the
// operators its mapping supports (see "What the field type buys you"). The
// handles cover the *whole schema* — so `User::addresses()` filters fine even
// though this projection never deserializes addresses.
let page = User::query()
.filter(User::email().eq("ada@example.com")) // keyword → exact
.filter(User::order_count().gte(5)) // long → range
.filter(User::account().tier().eq("gold")) // into the object's handles
.query(User::full_name().matches("ada lovelace")) // text → analyzed match
.filter(User::orders().any(Order::status().eq("delivered"))) // nested, via your Order struct
.filter(User::addresses().any(AddressFields::city().eq("Boston"))) // not projected — generated namespace
.sort(User::order_count().desc())
.from(0)
.size(20)
.send(&client)
.await?;
println!("{} total matches", page.total);
for hit in page.hits {
// hit.source is a fully-typed User. hit.id and hit.score come from the
// search envelope, not the document body.
let u: &User = &hit.source;
println!("{:.3} {} ({} orders)", hit.score, u.email, u.order_count);
for order in &u.orders { // Vec<Order>
println!(" order — {} — {}", order.total, order.status);
}
}
Ok(())
}
What you can’t write
The compiler refuses the mistakes:
User::email().matches(..)—matchesis a text operator andemailis akeyword.User::full_name().eq(..)—full_nameis analyzedtext, so it has no exacteq.User::nmae()— there is no such handle.hit.source.totl— no such field.
And the struct itself can’t drift. Declaring email: i32, or email: Option<String> when the field is required, or a field totl the schema doesn’t
have, is a compile error from the derive. The schema has been lifted into the
type system, so the compiler checks both your queries and your struct against it.
OS_PASSWORDis just a variable name picked for the example. The full env-var story (secrets, the reserved deployment overrides,FLUSSO_CONFIG) lives in the configuration guide.
What the field type buys you
The handle a schema field generates determines the operators in scope: an operator that doesn’t make sense for a field’s type doesn’t exist on its handle, so the mistake is a compile error rather than a 400 from OpenSearch.
| Handle | Operators |
|---|---|
Keyword | eq, any_of, prefix, wildcard, regexp, fuzzy, exists |
Text | matches, match_phrase, match_phrase_prefix, matches_fuzzy, any_of (exact, via .keyword), exists — no exact eq (it’s analyzed) |
Bool | eq, exists, asc/desc |
Number | eq, any_of, lt, lte, gt, gte, between, exists |
Date | eq, any_of, lt, lte, gt, gte, between, exists |
Object<S> | exists (a same-document sub-object — an object field or a to-one join (belongs_to/has_one); S is the enclosing scope). Its sub-fields are flattened, so query them via the child struct’s dotted-path handles (Account::tier()), not through this handle |
Nested<S, T> | any(q) / all(q) to match parents and lift the child query into scope S — q is a child query built from T’s handles (merging); matching(q) (with .sort/.size) to shape what’s returned — see Filtering nested collections; plus exists |
Geo | within(Distance::km(12.0), center), within_box, within_polygon, exists; sort with distance_from(center) (nearest-first sugar) or distance_sort(center, order, DistanceUnit) |
TextMap | key(k) → a Text leaf for one key; search(q) (cross-key, .prefer(key, weight) / .only_preferred()); has_key(k), exists |
KeywordMap | key(k) → a Keyword leaf for one key; has_key(k), exists — no search (exact-match, use key(..).eq(..)) |
NumberMap | key(k) → a Number leaf for one key (eq/lt/gte/between/…); has_key(k), exists |
DateMap | key(k) → a Date leaf for one key (eq/gte/between/…); has_key(k), exists |
Binary | exists — base64-encoded, not searchable |
Json | exists, raw(serde_json::Value) — the untyped fallback |
Each operator’s argument is typed by kind, not by one fixed Rust type — a
handle accepts any value implementing FlussoValue<kind::…> (which requires
serde::Serialize):
- Numerics are split per type (
kind::Byte/Short/Integer/Long/Float/Double/Decimal), and a value is accepted only if it widens into that kind without loss. So aLongfield takes any integer, aDoubletakesf32/f64or a small int, aDecimaltakesDecimal(decimalfeature) or an integer — but a float on an integer field, or ani64on aShort, is a compile error. Bare literals work where lossless:eq(5)onlong/integer/double/decimal,eq(5.0)onfloat/double. Abyte/shortfield needs a typed literal (eq(5i16)), since5defaults toi32(which would narrow). Keywordtakes aString/&stror a#[derive(FlussoValue)]keyword enum/newtype (matched against its serde string form).Booltakes aboolor a bool#[derive(FlussoValue)]newtype.Datetakes aString/&strISO-8601 literal or — with thechronofeature — aNaiveDate/NaiveDateTime/DateTime<Utc>.
A custom money/quantity type queries with no cast — Order::total().eq(Money(d))
— as long as it’s a FlussoValue of the field’s kind (see below). A value of the
wrong kind is a compile error (Order::created_at().gte(5), Order::age().eq(1.5)).
Subfield accessors. flusso’s sink auto-enriches text/keyword fields
(auto_subfields, on by default) with exact / sortable / searchable subfields,
and the handles expose them — no Keyword::at("code.keyword") string path:
#![allow(unused)]
fn main() {
User::full_name() // Text → analyzed full-text match
User::full_name().keyword() // Keyword → exact / wildcard / prefix
User::full_name().keyword_lowercase() // Keyword → case-insensitive match / sort
User::email().text() // Text → full-text over a keyword field
}
(A wildcard belongs on .keyword(), not the analyzed handle, which matches
tokens not the whole value.) These accessors exist only when the subfield is
actually provisioned — and the derive enforces it at compile time: a
text/keyword handle is stamped with subfields only when every OpenSearch
sink has auto_subfields on and the field declares no custom fields.
Otherwise the handle is …<NoSubfields> and .keyword() / .text() /
.keyword_lowercase() (and the Text::any_of / Text::asc sugar built on
them) simply don’t exist — calling one is a compile error, not a runtime 400.
Sorting is similar — sort(…) accepts handles whose type is sortable (numbers,
dates, keywords, booleans, and text). Text::asc/desc is sugar that sorts
via the case-insensitive .keyword_lowercase subfield automatically; reach for
User::full_name().keyword().desc() when you want an exact-case sort instead.
A geo_point sorts by proximity with User::location().distance_from(center).
A few clauses span more than one field, so they’re free functions:
multi_match("ada", [User::full_name(), User::bio()]) runs one analyzed query
across several Text fields (weight one with User::full_name().boosted(3.0)).
The typed surface is broad — see Query options and extra query
types for the per-query options
(boost/case_insensitive/fuzziness/…), the compound/scoring queries
(constant_score/dis_max/function_score/boosting), the standalone ones
(ids/query_string/simple_query_string/script_score/…), and the
search-level controls (min_score/collapse/search_after/highlight/…).
What’s left for the raw hatch is knn, geo_shape, span
and parent/child queries — types with no corresponding flusso field.
Composing queries
A handle’s operator produces a Query<S> — carrying the scope it was built
in. The root document and any flattened object/to-one-join sub-field share the
scope Root; only a nested array introduces a fresh scope, tagged with its
element type (Query<Order>).
Queries compose with and / or / not within a scope, and the Search
builder exposes the bool-query clauses directly:
#![allow(unused)]
fn main() {
// Combinator style.
let q = User::email().eq("ada@example.com")
.and(User::order_count().gte(5))
.and(User::orders().any(Order::status().eq("delivered").and(Order::total().gt(0.0))));
User::query().query(q).send(&client).await?;
// Clause style — `filter`/`must_not` don't score, `query`(=must)/`should` do.
User::query()
.query(User::full_name().matches("ada")) // scored
.filter(User::order_count().gte(5)) // filtered, cached, no score
.must_not(User::email().prefix("test-"))
.should(User::orders().any(Order::status().eq("delivered")))
.send(&client)
.await?;
}
query/filter/must_not/should accept anything that is a Query<Root>, so
the two styles mix freely. Which clause to reach for:
| Clause | Scores? | Bool slot | Use for |
|---|---|---|---|
query | yes | must | full-text relevance you want ranked |
filter | no (cached) | filter | exact constraints — terms, ranges, nested any |
must_not | no | must_not | exclusions |
should | yes | should | optional boosts; a real constraint with min_should_match |
A built search can finish as a count instead of a page: .count() sends the
same query to _count and returns the number of matching documents (u64) —
cheaper than send() when the hits aren’t needed. Sort, from/size, and
filter_nested projections are ignored; they never change which documents match.
#![allow(unused)]
fn main() {
let open_orders: u64 = User::query()
.filter(User::orders().any(Order::status().eq("open")))
.count(&client)
.await?;
}
Or as an id page: .ids() runs the same search with _source: false and
returns Vec<String> — the matching document ids (root primary keys,
stringified), in order, no sources fetched. Sort and from/size apply as in
send(); filter_nested projections are dropped. The cheap way to feed another
lookup — e.g. search in OpenSearch, then load the full rows from Postgres:
#![allow(unused)]
fn main() {
let user_ids: Vec<String> = User::query()
.filter(User::orders().any(Order::status().eq("open")))
.sort(User::order_count().desc())
.size(100)
.ids(&client)
.await?;
}
Query options and extra query types
Each leaf operator returns a small builder that carries that query’s options
plus the universal boost(f32) and name(&str) (_name, surfaced in a hit’s
matched_queries). With no option set it renders the DSL shorthand; set one and
it expands. A builder drops straight into a clause (it’s an AsQuery), so no
.build() is needed:
#![allow(unused)]
fn main() {
User::query()
.should(User::full_name().matches("acme").boost(2.0)) // weighted text
.should(User::full_name().keyword().wildcard("*acme*").case_insensitive())
.should(User::full_name().matches("acme").fuzziness(Fuzziness::Auto)) // typo-tolerant
.min_should_match(1) // make should a real filter
.filter(User::owner_id().eq(owner_uuid)) // uuid keyword (feature)
.filter(User::tier().eq(Tier::Pro)) // enum keyword
.sort(User::created_at().desc().missing_first()) // null-aware sort
.send(&client).await?;
}
The options per query type (all optional; plus the universal boost/name on every builder):
| Query | Options |
|---|---|
term / prefix / wildcard / regexp | case_insensitive |
prefix / wildcard | rewrite |
regexp | flags, max_determinized_states |
fuzzy | fuzziness, prefix_length, max_expansions, transpositions |
matches | fuzziness, operator, minimum_should_match, prefix_length, analyzer, zero_terms_query, lenient |
| phrase matches | slop, analyzer |
multi_match | type, operator, fuzziness, tie_breaker, minimum_should_match |
| range | format, time_zone, relation |
within (geo) | distance_type, validation_method |
nested any | score_mode, ignore_unmapped |
The enumerable options are closed enums, not strings — a typo is a compile
error, not a 400. operator/default_operator take Operator { And, Or };
fuzziness takes Fuzziness { Auto, AutoBounds(u32, u32), Edits(u32) };
multi_match’s type takes MultiMatchType; zero_terms_query takes
ZeroTermsQuery { None, All }; a range relation takes RangeRelation { Intersects, Contains, Within }; function_score’s score_mode/boost_mode
take ScoreMode/BoostMode; a nested score_mode takes NestedScoreMode
(which, unlike ScoreMode, has None for a filter-only nested clause). Geo’s
distance_type/validation_method take DistanceType/ValidationMethod, and
a sort’s numeric_type / Sort::script type take NumericType /
ScriptSortType. minimum_should_match takes a MinimumShouldMatch
(2/.into() for a count, MinimumShouldMatch::percent(75), or ::raw("3<90%")
for the combining mini-language). Genuinely open-ended params (analyzer,
format, time_zone, unmapped_type, flags) stay String.
.or()/.and()on a builder needuse flusso_query::AsQuery;in scope (the combinators are provided methods on that trait). Composing via theSearchclauses (.should()/.filter()/…) needs no import.
Bool / compound & scoring. Search::min_should_match(n) (or
Query::min_should_match on an or-group) turns a top-level free-text should
group into a real constraint instead of scoring-only. The scoring wrappers are
free functions: constant_score(filter), dis_max([..]).tie_breaker(..),
boosting(positive, negative, negative_boost), and
function_score(query).weight(..)/.weight_when(.., filter)/.boost_mode(..).
Standalone query types (free functions, each an AsQuery): ids([..])
(get-many-by-_id), query_string(..) / simple_query_string(..) /
combined_fields(.., [fields]) (user-facing full-text), and the relevance ones
script(..), script_score(query, source), distance_feature(..),
rank_feature(..), more_like_this([fields], [like]). match_bool_prefix is a
Text operator (search-as-you-type).
Sort. .asc()/.desc() on a sortable handle (number, date, keyword, bool,
or text — the last routing through .keyword_lowercase) return a Sort
builder: chain .missing_first()/.missing_last()/.missing(v),
.mode(SortMode::..), .unmapped_type(..)/.numeric_type(..)/.format(..), or
.nested(path)/.nested_filtered(path, q). Also Sort::score() (by _score),
Sort::script(ScriptSortType, source, order), and Geo::distance_from(center)
/ Geo::distance_sort(center, order, DistanceUnit) for proximity sorting. A
geo radius is a typed Distance (Distance::km(12.0) / ::miles(5.0) /
::meters(800.0) / …), so a malformed "12 km" can’t reach the query.
Search-level controls on the Search builder: min_score,
track_total_hits, track_scores, search_after([..]) (deep pagination),
collapse(field), post_filter(q), and highlight(Highlight::new().field(..)).
Queries are values; the client appears once
Type::query() takes no client: a Search<T> is a plain, Cloneable value with
no lifetime. Build it anywhere — a helper, a struct field, a cached “prepared
search” — and hand a &Client to the terminal (send / ids / count, all
&self) when it’s time to run. One built query can run many times, against
different clients, or with per-call tweaks via clone():
#![allow(unused)]
fn main() {
fn busy_users() -> Search<User> { // no client in sight
User::query().filter(User::order_count().gte(5))
}
let page = busy_users().send(&client).await?; // run it
let next = busy_users().from(20).send(&client).await?; // tweak a copy
}
Several searches, one round-trip (_msearch)
Independent typed searches — different indexes, different document types — can
share one HTTP round-trip. client.msearch(…) takes a tuple of &Search<T>
(arity 1–8) and returns one typed SearchResponse per slot, in order:
#![allow(unused)]
fn main() {
let users_q = User::query().query(User::full_name().matches(&q)).size(10);
let orders_q = Order::query().filter(Order::status().eq("open")).size(5);
let (users, orders) = client.msearch((&users_q, &orders_q)).await?;
}
The “search page with separate sections” primitive: each slot keeps its own query,
sort, pagination, and filter_nested projections, and decodes into its own type.
A slot-level failure fails the whole call with an error naming the slot (no partial
results). For many searches of one type there’s
client.msearch_all(&searches) → Vec<SearchResponse<T>>.
One blended result list (combined search)
The other multi-index shape: one query over several indexes, hits ranked together in a single list — the “one search box over everything” primitive. You declare which document types blend by writing an enum with one variant per type:
#![allow(unused)]
fn main() {
/// One item in the storefront's global search.
#[derive(Debug, FlussoMultiDocument)] // the `derive` feature, like FlussoDocument
enum StoreItem {
User(User),
Order(Order),
}
let page = StoreItem::query() // MultiSearch<StoreItem> — client-free too
.query(multi_match("ada", [User::full_name(), Order::customer_name()]))
.size(20)
.send(&client)
.await?;
for hit in page.hits {
match hit.source { // dispatched by the hit's `_index`
StoreItem::User(u) => render_user(u, hit.score),
StoreItem::Order(o) => render_order(o, hit.score),
}
}
}
Root-scope queries compose across document types (Query<Root> carries no
document type), and each hit is decoded into the variant that owns its index.
The query goes through the stable {INDEX}_{HASH} alias, but OpenSearch reports
the concrete index behind it ({INDEX}_{HASH}_{generation}) in each hit — so the
crate normalizes that generation suffix before dispatch; you just match on
hit.source. A hit from an index no variant claims is an error, not a skip.
count(&client) works on the union too.
Two semantics to know:
- A query on a field that exists in only one of the indexes is fine — it just doesn’t match in the others.
- A sort on such a field is rejected by OpenSearch unless it carries an
unmapped_type. Sort the blended list by relevance, or on fields all the union’s indexes share.
The derive validates the enum’s shape (single-field tuple variants, no duplicate
payload types) and generates the trait’s two members. Without the derive
feature, the impl is two short members written by hand — a TARGETS const listing
each variant’s (INDEX, SCHEMA_HASH) and a decode that matches on
Type::physical_index().
Building a child filter and merging it into the parent
Because the scope is part of the type, a query is a value you can build, name,
store, and reuse. A nested child struct (one whose path ends in a nested
array) carries its own field handles tagged with the child scope — so they produce
Query<Order>, not Query<Root>:
#![allow(unused)]
fn main() {
// Built from Order's own handles. Reusable — a plain function returning a query:
fn big_delivered() -> Query<Order> {
Order::status().eq("delivered")
.and(Order::total().gt(100.0))
}
}
To merge a child filter into a parent, lift it through the nesting that holds
it: User::orders().any(child) (or .all(child)) takes a Query<Order> and
returns a Query<Root> — a nested clause at the orders path — which composes
with parent-scope queries like any other:
#![allow(unused)]
fn main() {
let q = User::email().eq("ada@example.com")
.and(User::orders().any(big_delivered())); // Query<Order> → lifted → Query<Root>
User::query().filter(q).send(&client).await?;
}
The scope tag keeps this honest: User::email().and(Order::status().eq(…)) does
not compile — you can’t and a Query<Root> with a Query<Order>; the child
query has to be lifted through User::orders() first. A child constraint can never
be silently applied at the wrong level.
Lifting composes through depth: Order::items().any(Item::quantity().gt(1)) is a
Query<Order>, which User::orders().any(…) then lifts the rest of the way to
Query<Root>.
Optional filters
Callers build queries from optional inputs — request params, form fields — and the
if let Some(x) = … { q = q.filter(…) } dance breaks the fluent chain. The
primitive that fixes it: Option<Q> is itself a Query, where None
contributes nothing in any clause — must_not(None) excludes nothing, and(None)
is the identity. So every clause and combinator accepts an optional; you .map the
value into the handle:
#![allow(unused)]
fn main() {
User::query()
.filter(params.email.map(|e| User::email().eq(e))) // skipped when None
.filter(params.min_orders.map(|n| User::order_count().gte(n)))
.send(&client).await?;
// Composes inside and/or too — a None branch just drops out:
let q = User::email().eq("ada@example.com")
.and(params.tier.map(|t| User::account().tier().eq(t))); // None → just the email clause
}
A named filter_some(value, |v| …) sugar that drops the .map is an obvious
follow-on, left out of the first cut.
Filtering nested collections
orders is a nested array, and there are two independent things you might
filter — flusso keeps them separate:
- Filter by nested — choose which users come back, based on their orders.
The
any/allyou’ve already seen: it’s aQuery, so it goes infilter/query/etc. A matching user still carries its wholeordersarray. - Filter of nested — shape the
ordersarray each user comes back with, without changing which users return. A separate clause,filter_nested.
They compose: use either alone, or both together (often with the same predicate).
filter_nested — shaping the returned array
#![allow(unused)]
fn main() {
let page = User::query()
// filter BY: only users with a delivered order
.filter(User::orders().any(Order::status().eq("delivered")))
// filter OF: and within each, keep only the delivered orders, newest first, ≤5
.filter_nested(
User::orders()
.matching(Order::status().eq("delivered"))
.sort(Order::placed_at().desc())
.size(5),
)
.send(&client).await?;
for hit in &page.hits {
// `source.orders` IS the filtered subset — no extra accessor:
for order in &hit.source.orders { // delivered, newest first, ≤ 5
println!("{} — {}", order.total, order.status);
}
}
}
User::orders().matching(q) is a nested projection: q is a Query<Order>
built from Order’s handles, plus optional .sort(Order::field().desc()),
.size(n), .from(n). matching itself is optional — drop it to keep every child
but still sort or cap the array.
Because filter_nested does not touch which parents match, a user with no
delivered orders still comes back — with orders: []. Pair it with a
filter(User::orders().any(…)) when you also want to drop those users.
filter_nested always replaces hit.source.<path> with the matched subset:
the client fetches the nested matches and substitutes them for that field before
deserializing.
Not yet built: a
.keep_source()opt-out that leaves the stored array intact and a typedhit.nested(handle)side accessor. Todayfilter_nestedalways replaces the array insource.
Depth
filter_nested shapes one nested level — orders. You can still match on deeper
nesting from inside the predicate
(Order::items().any(Item::quantity().gt(1))), and the returned orders honor it;
returning a filtered items array inside each returned order is a deeper
inner-hits case left to the raw hatch for now.
Results
#![allow(unused)]
fn main() {
pub struct SearchResponse<T> {
pub total: u64, // total matches (not the page size)
pub max_score: Option<f32>,
pub hits: Vec<Hit<T>>,
pub took: std::time::Duration,
}
pub struct Hit<T> {
pub id: String, // the document id (root primary key, stringified)
pub score: f32,
pub source: T, // the fully-typed document
}
}
get returns Option<T>; search returns SearchResponse<T>, where T is the
struct you wrote. There is no serde_json::Value in the common path.
Binding to the schema
The macro validates against the resolved mapping — flusso’s
IndexMapping: every field with a
concrete type, whether it is nullable, its nested children, and the schema
hash.
flusso’s schemas are self-describing: every leaf declares its type and
whether it’s required, and joins/objects/aggregates have structural types — so the
mapping resolves with no database, exactly as flusso build does when it writes
flusso.lock. The client reuses that resolution. (See the
schema guide for the
schema format and the configuration guide
for how the index is written.)
The one input: the index name
You never point the macro at a file. You name the index, and the macro finds the schema:
#![allow(unused)]
fn main() {
#[derive(serde::Deserialize, FlussoDocument)]
#[flusso(index = "users")]
pub struct User { /* … */ }
}
At compile time the macro:
- Locates
flusso.tomlby walking up from the consuming crate’sCARGO_MANIFEST_DIR. (Override with#[flusso(index = "users", config = "…")]or theFLUSSO_CONFIGenv var — see the configuration guide.) - Selects the
[[index]]whosenamematches"users"— the reason an index name is required, since oneflusso.tomldefines several. - Loads that index’s
schema:file (resolved relative toflusso.toml) and resolves theIndexMappingin-process — the same resolutionflusso buildperforms. Hermetic: no database, no network. - Tracks
flusso.tomland every schema file it read as build inputs (viainclude_bytes!), so editing the config or a schema retriggers compilation and a drifted struct fails the next build.
The resolved mapping’s content hash is the binding’s SCHEMA_HASH — the same
hash flusso build writes into flusso.lock and the engine folds into the
physical index name, so binding and index are provably the same schema version.
There is no build.rs, no generated .rs file to include!, and no committed
mapping artifact to keep in sync — the struct is the only file you maintain.
A nested or object struct names its path
A same-row object, a to-one join, or a nested join is its own struct, validated
against a dotted path into the same index — account, orders,
orders.items. It declares the same index so the macro resolves the same config,
then walks to that path’s children:
#![allow(unused)]
fn main() {
#[flusso(index = "users", path = "orders.items")]
pub struct Item { /* … */ }
}
These contribute field validation and their own field handles —
Order::status(), Order::total(), … — producing Query<Order> values you can
compose, store, and lift into a parent query (see Building a child
filter). Only the
root struct gets entry points (get/query) and SCHEMA_HASH.
What the derive expands to
#[derive(FlussoDocument)] on the root User emits (roughly):
#![allow(unused)]
fn main() {
impl User {
// Entry points.
pub fn get(client: &Client, id: i32) -> impl Future<Output = Result<Option<User>>>;
pub fn query() -> Search<User>; // client-free: a plain, reusable value
// Field handles — one per *schema* field, carrying its type. These are what
// the query builder consumes. They exist for every field in the mapping,
// whether or not `User` projects it.
pub fn id() -> Number<kind::Integer> { /* … */ }
pub fn email() -> Keyword { /* … */ }
pub fn full_name() -> Text { /* … */ }
pub fn account() -> Object { /* … */ } // object/to-one join → `Object<Root>` (scope-only; `.exists()`)
pub fn addresses() -> Nested<Root, AddressFields> { /* … */ } // not projected — generated namespace
pub fn orders() -> Nested<Root, Order> { /* … */ } // projected — `Nested<enclosing scope, your struct>`
pub fn order_count() -> Number<kind::Long> { /* … */ }
pub fn lifetime_value() -> Number<kind::Double> { /* … */ }
pub fn avg_order_value() -> Number<kind::Double> { /* … */ } // not projected by `User`
pub fn last_order_at() -> Date { /* … */ } // not projected by `User`
// …one per schema field.
/// The physical index this binds to — `get`/`query` use it.
pub const INDEX: &str = "users_3f2a1b9c…";
/// The schema hash this binding was generated from (the `INDEX` suffix).
pub const SCHEMA_HASH: &str = "3f2a1b9c…";
}
// Each nested path has ONE handle namespace whose functions build a `Query` in
// that path's scope. When you wrote a struct for the path, that struct IS the
// namespace — its derive adds the handles, covering the full sub-schema (not just
// the fields it deserializes), producing `Query<Order>`. A `nested` array
// introduces its own scope: `Order`'s handles are tagged `<Order>` (the root and
// flattened objects stay `<Root>`); they must be lifted before joining a root query.
impl Order {
pub fn status() -> Keyword<Order> { /* … */ }
pub fn total() -> Number<kind::Decimal, Order> { /* … */ }
pub fn placed_at() -> Date<Order> { /* … */ }
pub fn items() -> Nested<Order, Item> { /* … */ } // deeper nested: enclosing scope `Order`, child `Item`
// …one per field at the `orders` path.
}
// For a nested path you DIDN'T give a struct, the root derive generates a
// handles-only namespace named `<Path>Fields`, so it's still queryable:
pub struct AddressFields;
impl AddressFields {
pub fn city() -> Keyword<AddressFields> { /* … */ } // nested scope, like `Order`
pub fn postal_code() -> Keyword<AddressFields> { /* … */ }
// …one per field at the `addresses` path.
}
}
What the derive checks
For each field the struct declares, the macro resolves the matching schema field by
its document key — honoring #[serde(rename = "…")] and a container
#[serde(rename_all = "…")], so the struct’s serde config and flusso’s validation
agree — then checks three things:
| Check | Pass | Compile error |
|---|---|---|
| field exists | the doc key is in the schema | no field `totl` in index `users` (span on the field) |
| type matches | leaf Rust type matches the field’s type | email is `keyword` → expected `String`, found `i32` |
| nullability matches | Option<_> iff the field is nullable | email is required → expected `String`, found `Option<String>` |
The rules that make this full control rather than a straitjacket:
- Partial projections are allowed. Leaving schema fields off your struct is fine — you only deserialize the subset you declare. Only the three checks above fail.
- Type matching is by leaf identifier +
Optionshape. The macro can’t resolve arbitrary type aliases, so it compares the final path segment (String,i32,f64,OffsetDateTime, …) and theOption<_>wrapper against the type table. For anobjectfield it expects a struct, for anestedfield aVec<_>, and defers the inner field checks to that struct’s ownFlussoDocumentderive. - Escape hatches. A field typed
serde_json::Valueopts out of type checking.#[flusso(skip)]drops a field from validation entirely — for a computed or app-only field not backed by the index (pair it with#[serde(skip)]or#[serde(default)]).
flusso types → Rust types
The type the derive expects for each schema type (the same bridge the
schema guide defines, with the Rust side added). Declare something
else and it won’t compile (modulo the leaf-identifier rule above).
flusso type | OpenSearch | Rust type | Field handle |
|---|---|---|---|
text | text | String | Text |
identifier | text | String | Text |
keyword | keyword | String (or a FlussoValue newtype) | Keyword |
enum | keyword | String or a #[derive(FlussoValue)] enum | Keyword |
uuid | keyword | String, or uuid::Uuid (uuid feature) | Keyword |
boolean | boolean | bool | Bool |
short | short | i16 | Number<kind::Short> |
integer | integer | i32 | Number<kind::Integer> |
long | long | i64 | Number<kind::Long> |
float | float | f32 | Number<kind::Float> |
double | double | f64 | Number<kind::Double> |
decimal | double | Decimal (decimal feature) or f64 | Number<kind::Decimal> |
date | date | time::Date (feature) | Date |
timestamp | date | time::OffsetDateTime (feature) | Date |
binary | binary | String (base64) | Binary |
json | object | serde_json::Value | Json |
geo_point | geo_point | GeoPoint ({ lat, lon }) | Geo |
custom { opensearch } | (given) | matching scalar, else serde_json::Value | by OS type |
object | object | a struct | Object |
join belongs_to / has_one | object | Option< a struct > | Object |
join has_many / many_to_many | nested | Vec< a struct > | Nested<S, T> |
Decimals query without a cast. type: decimal maps to OpenSearch double
(lossy in storage) but gets a Number<kind::Decimal> handle, distinct from a
true double’s Number<kind::Double> — so it accepts a Decimal or a
losslessly-widening integer (eq(5)), but not an f64 (that’s lossy, a
compile error). To pass a rust_decimal::Decimal, enable the decimal
feature on flusso-query (it re-exports Decimal and implements
FlussoValue<kind::Decimal> for it); the document field can be Decimal or
f64 (both deserialize from the stored double). Query precision is f64-bound
(serde_json has no arbitrary precision) — fine for a double-stored column; if
you need exact querying, declare a custom scaled_float (also kind::Decimal)
and note this limit.
Dates are behind a feature so a caller picks time or chrono (or String
for raw ISO-8601); the derive accepts whichever leaf type the chosen feature settles
on.
Custom value types — #[derive(FlussoValue)]. A newtype inherits its inner
type’s kinds automatically, so struct Money(Decimal) is a decimal value and
struct Sku(String) a keyword + text value — no kind tag needed, and each is
queryable and rejected exactly where the inner type would be. FlussoValue
requires serde::Serialize (a supertrait), so derive it too. An enum has no
inner type, so it needs an explicit string kind — #[flusso(keyword)] (default)
or #[flusso(text)]; numeric/date tags don’t exist (use a newtype, which
inherits). Enum keyword fields stay typed — never #[flusso(skip)] them.
#![allow(unused)]
fn main() {
// Newtype: inherits Decimal's kinds — a `decimal`/`scaled_float` value.
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, FlussoValue)]
struct Money(Decimal);
// Enum: string-valued, so a kind tag is required (keyword default).
#[derive(serde::Serialize, serde::Deserialize, FlussoValue)]
#[serde(rename_all = "camelCase")]
#[flusso(keyword)]
enum Tier { Pro, Enterprise, Free }
#[derive(serde::Deserialize, FlussoDocument)]
#[flusso(index = "customers")]
struct Customer { tier: Tier, balance: Money /* … */ }
Customer::tier().eq(Tier::Pro); // term against "pro"
Customer::balance().gte(Money(Decimal::ONE)); // a decimal value, no cast
}
uuid::Uuid is a keyword value behind the uuid feature — id / foreign-key
fields stay in the struct as Uuid (no #[flusso(skip)], no
Keyword::at("…")), and Customer::owner_id().eq(some_uuid) works without
.to_string().
Nullability is declared, not guessed
A field is T or Option<T>, and the derive checks it against the resolved
mapping. Nullability comes straight from the schema with no database round-trip: a
leaf states it with required, and joins, objects, and aggregates carry it
structurally. ResolvedField records the resulting nullable: bool; the derive
requires nullable: false → T, nullable: true → Option<T>.
| Field source | nullable | Why |
|---|---|---|
root primary_key column | false | forced non-null — it backs the document id |
join primary_key field | false | forced non-null, just like the root key |
leaf column, required: true | false | declared non-null |
leaf column, required: false | true | nullable by default |
object | false | always assembled from the same row |
join belongs_to / has_one (object) | true | there may be no related row |
join has_many / many_to_many | false | a Vec, empty when there are none, never null |
aggregate count | false | a non-null long — zero rows is 0, not null |
aggregate avg | true | a nullable double — null over zero rows |
aggregate sum / min / max | true | null over zero rows; the result mirrors the column |
required is rejected by the schema on joins and aggregates precisely because
their nullability is structural — so there’s nothing for the author to declare.
The escape hatch
Anything the typed builder can’t express stays reachable, and still deserializes into the typed struct:
#![allow(unused)]
fn main() {
let page: SearchResponse<User> = User::query()
.raw(serde_json::json!({
"knn": { "embedding": { "vector": [/* … */], "k": 10 } }
}))
.send(&client)
.await?;
}
raw takes the OpenSearch query DSL verbatim — the pressure-release valve for the
few types with no flusso field (knn/vector, geo_shape, span and parent/child
queries) and for percolators — without dropping to an untyped client or losing
typed results. Most of what used to need it (function_score, script,
constant_score, query_string, search_after, …) is now in the typed surface.
Resolving the index name
The physical index carries the hash suffix (users_3f2a1b9c) — exactly what
the OpenSearch sink writes — and rotates on a structural schema change.
Because the binding is generated from the schema at compile time, the derive
knows that hash and emits it as a const: User::INDEX is the physical name, and
get/query use it. So User::query() addresses the right index directly, with
the hash hidden from the caller — no convenience alias required.
This is self-correcting: a structural schema change rotates the hash and changes
the resolved mapping, so the next cargo build regenerates the binding against the
new physical index. (User::INDEX and User::SCHEMA_HASH are exposed for logging,
admin, or a hand-built Search.)
The convenience alias (
users→ current generation) is still worthwhile for clients that don’t recompile against the schema — dynamic/scripting use, dashboards. For a derived binding it’s unnecessary: the compile-time hash is the stable name.
Reading a prefixed deployment
If flusso runs with an index prefix (FLUSSO_INDEX_PREFIX,
e.g. dev_ so it writes dev_users_<hash>), tell the client the same prefix:
#![allow(unused)]
fn main() {
let client = Client::connect("https://localhost:9200")?
.index_prefix(std::env::var("FLUSSO_INDEX_PREFIX").unwrap_or_default());
}
The prefix is applied at runtime, on the transport — the derive still bakes the
unprefixed User::INDEX/SCHEMA_HASH, and the client prepends the prefix to every
request path. So one compiled consumer binary serves every environment: point it
at dev or staging by setting FLUSSO_INDEX_PREFIX, no rebuild. It must match the
writer’s prefix exactly, or queries hit an empty (or wrong) index.
Out of scope for the first cut
- Search aggregations (facets, histograms, cardinality). The typed surface is
filter/query/sort + typed hits first; aggregations need their own typed result
tree, and the
rawhatch covers them in the meantime. - Writes. flusso owns the index; the client never upserts or deletes.
- Correlating hits across indexes. Both multi-index shapes ship — one blended
result list and independent searches
via
_msearch— but correlating hits across indexes remains the caller’s job. - Scroll pagination.
from/sizeandsearch_after(deep pagination) ship; a scroll cursor is a follow-on. - Generating the document struct. By design — the developer owns the struct.
Where this lands in the workspace
| Crate | Role |
|---|---|
flusso-query | Runtime: the Client transport, the field-handle/Query/Search builder, SearchResponse. Generic over the developer’s document types. Targets OpenSearch / Elasticsearch (shared DSL). Re-exports the derive behind a derive feature, so callers use flusso_query::FlussoDocument. |
flusso-query-derive | The #[derive(FlussoDocument)] proc-macro crate. At compile time it discovers flusso.toml, resolves the named index’s IndexMapping from the self-describing schema (no database), validates the annotated struct, and emits the field handles, entry points, and schema hash. Reuses schema-config-toml, schema-index-yaml, and schema-core. |
Both crates sit above schema-core and depend only downward — no dependency on the
engine, the sources, or the sinks. They share only the domain model, the one thing
the read and write sides must agree on.
Deploying flusso with Docker
Ship flusso as the smallest possible image — without compiling the binary yourself, and without dragging your whole repo into the build context. For the image’s internals (targets, base, non-root user) see the Dockerfile; for Kubernetes see the Helm chart.
Pick a recipe
The key idea: you never compile the binary, only a flusso.lock — see The one idea. Then pick by where the lock gets built:
| Situation | Recipe |
|---|---|
Already have a flusso.lock, or a flat config | A — bake your own lock (smallest, simplest) |
| Schemas scattered across a monorepo; want a hermetic in-Docker build | B — build the lock inside Docker |
| Want CI to build the lock and ship one file | C — build the lock in CI |
| Keep a flusso-only ignore file off everyone else’s builds | Scoping the .dockerignore |
Wondering why COPY *.schema.yml won’t do | Why COPY alone can’t do it |
The one idea
Two different “compilations” — conflating them is what makes Docker feel heavy:
- The
flussobinary — a full Rust build. Our job, published once per release as a registry image. You never compile it. Pullalias2k/flusso:VERSIONand build from it. - A
flusso.lock—flusso buildinlines yourflusso.toml+ every referenced*.schema.ymlinto one portable, self-contained file. No DB, no toolchain, no secrets baked in.
ℹ️ Info — The image is published to two registries with identical tags. Use Docker Hub as the primary:
alias2k/flusso(docker.io/alias2k/flusso). Theghcr.io/alias2k/flussomirror is an equivalent drop-in if you prefer GitHub Container Registry.Replace
VERSIONwith whichever tag suits you:
X.Y.Z(e.g.0.10.0) — an exact, immutable release. Most reproducible; recommended for production.X.Y(e.g.0.10) — a rolling tag that follows the latest patch on that minor (0.10.1,0.10.2, …) but never a breaking minor bump.latest— newest stable release. Convenient, not pinned.sha-<short>— the immutable per-commit tag, for tracing or rollback.
So however your schemas are laid out — even scattered across a monorepo next to
the services they describe — that layout only has to exist where you run
flusso build, never inside the image. Get a lock, ship the lock, run the lock.
ℹ️ Info — Schema paths in
flusso.tomlresolve relative to the config file’s directory, with no globbing; each[[index]]names itsschema = "…"explicitly. That’s the only rule the recipes below respect — when you compile the lock, the referenced files must exist at those paths.
Recipe A: bake your own lock (smallest, simplest)
If you already have a flusso.lock (see Recipe C) — or you only have a flat
config — this is the whole thing. Build from the published image and copy one
file in:
# syntax=docker/dockerfile:1
FROM alias2k/flusso:VERSION
COPY flusso.lock /app/flusso.lock
# ENTRYPOINT/CMD are inherited: `flusso run --public-address 0.0.0.0:9464`
# loads /app/flusso.lock by default.
docker build -t myorg/search:1.0 .
The image is the published base + one file. No Rust, no schema layout, a build
context of a few KB. Secrets (DATABASE_URL, <SINK>_OPENSEARCH_URL) come from
the environment at run time, so the lock is safe to commit and to bake in.
Don’t want to rebuild an image at all? Mount the lock instead:
docker run --rm -e DATABASE_URL=… -e PRIMARY_OPENSEARCH_URL=… \
-v "$PWD/flusso.lock:/app/flusso.lock" -p 9464:9464 \
alias2k/flusso:VERSION
Recipe B: build the lock inside Docker
Want the compile to happen inside Docker (hermetic, reproducible), and your
schemas are scattered across the repo? Compile the lock in a builder stage, then
copy only the lock into the final image. The trick that keeps the build
context tiny without enumerating folders is an allowlist ignore file (see
Scoping the .dockerignore):
flusso.Dockerfile:
# syntax=docker/dockerfile:1
FROM alias2k/flusso:VERSION AS lock
WORKDIR /src
COPY . . # context is already pruned to toml + *.schema.yml,
# with their real paths preserved → flusso.toml resolves
RUN flusso build --config flusso.toml --out /app/flusso.lock
FROM alias2k/flusso:VERSION
COPY --from=lock /app/flusso.lock /app/flusso.lock
flusso.Dockerfile.dockerignore:
*
!flusso.toml
!**/*.schema.yml
docker build -f flusso.Dockerfile -t myorg/search:1.0 .
COPY . . is what preserves the scattered directory structure (so the relative
schema = "…" paths resolve); the ignore file is what keeps the multi-gigabyte
monorepo out of the build context. They’re complementary — neither does the job
alone. Add a new schema anywhere in the tree and it just works, no Dockerfile
edit.
Recipe C: build the lock in CI, ship one file
The lowest-friction option for a monorepo: compile the lock on the host or in CI, where the repo is checked out and every relative path already resolves, then feed the single artifact to Recipe A.
flusso build --config flusso.toml --out flusso.lock # inlines all scattered schemas
Commit flusso.lock, or publish it as a CI artifact, and the image build never
sees a schema file — there’s no pattern to match, no context to prune, no tree to
preserve. Just one file. (flusso build needs the flusso binary; in CI, run it
from the published image: docker run --rm -v "$PWD:/src" -w /src alias2k/flusso:VERSION build --config flusso.toml --out flusso.lock.)
Scoping the .dockerignore
A root .dockerignore applies to every build in the repo, which you usually
don’t want when flusso is one service among many. BuildKit (you’re on it — every
recipe here starts with # syntax=docker/dockerfile:1) lets you scope an ignore
file to one Dockerfile: place <dockerfile-name>.dockerignore next to it, and
it takes precedence over the root .dockerignore for that build only.
flusso.Dockerfile
flusso.Dockerfile.dockerignore # used only when building flusso.Dockerfile
.dockerignore # everyone else's default, untouched
So the allowlist in Recipe B lives in
flusso.Dockerfile.dockerignore and affects nothing else in the repo.
Note
Per-Dockerfile ignore files are a BuildKit feature — honored by
docker build,docker buildx, anddocker compose. A legacy (non-BuildKit) builder silently falls back to the root.dockerignore.
Why COPY alone can’t do it
It’s tempting to skip the ignore file and just COPY the schemas by pattern.
That doesn’t work, for two reasons:
- No recursive glob. Docker’s
COPYusesfilepath.Match, where*does not cross/. There’s no**, so you can’t express “every*.schema.ymlat any depth.” - It flattens. When a wildcard matches several files they all land directly
in the destination — the source directory structure is not preserved. That
breaks the relative
schema = "…"paths immediately.
The filtering has to happen at the context layer (.dockerignore), not the
COPY layer. COPY . . then preserves the tree; the scoped ignore file keeps the
context small. Hence Recipe B.