Skip to content

feat(datastore): PostgreSQL compatibility for Fleet datastore#3

Closed
dnplkndll wants to merge 6 commits into
mainfrom
feat/pg-compat
Closed

feat(datastore): PostgreSQL compatibility for Fleet datastore#3
dnplkndll wants to merge 6 commits into
mainfrom
feat/pg-compat

Conversation

@dnplkndll
Copy link
Copy Markdown

Summary

Adds a dual-dialect SQL abstraction layer enabling Fleet's datastore to run against both MySQL and PostgreSQL without forking SQL statements.

  • DialectHelper interface (dialect.go) — abstracts MySQL-specific SQL constructs: INSERT IGNORE, ON DUPLICATE KEY UPDATE, GROUP_CONCAT, JSON_EXTRACT, IFNULL, etc.
  • MySQL dialect (dialect_mysql.go) — no-op passthrough, fully backwards-compatible
  • PostgreSQL dialect (dialect_postgres.go) — translates each construct to PG equivalents
  • Rebind driver (server/platform/postgres/) — transparently rewrites ? placeholders to $1, $2, ... at the driver level; new code needs no per-query porting
  • PG baseline schema (pg_baseline_schema.sql) — full Fleet schema for PostgreSQL
  • Test infrastructureCreatePostgresDS helper, TruncateTables PG support, InsertAndGetLastID helper
  • SQL compatibility fixes — all datastore files updated to use dialect helpers for upserts, JSON ops, string aggregation, and boolean casting

Approach

No existing MySQL behaviour is changed. The DialectHelper is injected at startup; MySQL gets the identity implementation. PostgreSQL gets translated SQL via the same interface. The rebind driver handles placeholder rewriting automatically.

Test plan

  • All existing MySQL integration tests pass unchanged (MYSQL_TEST=1 go test ./server/datastore/mysql/...)
  • PG smoke test (postgres_smoke_test.go, excluded from regular CI with //go:build ignore) exercises ~23 operations against a real PostgreSQL instance

@dnplkndll dnplkndll force-pushed the main branch 2 times, most recently from 769083c to 617cf71 Compare March 31, 2026 23:54
Introduces a DialectHelper interface that abstracts MySQL-specific SQL
constructs (INSERT IGNORE, ON DUPLICATE KEY UPDATE, GROUP_CONCAT,
JSON_EXTRACT, IFNULL, error classification, etc.) behind a common API.

- dialect.go: DialectHelper interface definition
- dialect_mysql.go: MySQL implementation (behaviour-preserving, existing tests pass)
- dialect_postgres.go: PostgreSQL implementation via SQL rewriting
- server/platform/postgres/: pgx driver, rebind SQL rewriter, PG error classification
- server/goose/: dual-dialect migration support (MySQL and PostgreSQL)
- server/config/: Driver field to select mysql (default) or postgres
- go.mod: add jackc/pgx/v5 PostgreSQL driver dependency

The rebind driver transparently rewrites MySQL SQL to PostgreSQL-compatible
SQL at query time: IFNULL→COALESCE, DATE_ADD/DATE_SUB→INTERVAL, IF()→CASE,
? placeholders→$N, boolean casting, JSON path translation, etc.
…ture

- Add dialect field to Datastore struct and dialectForDriver() wiring
- Add insertAndGetID / insertAndGetIDTx helpers (RETURNING id for PG)
- Add dialectStep() to migration client for dual-dialect migrations
- Add PG baseline schema (pg_baseline_schema.sql) for fresh PG databases
- Add TruncateTables PG support and InsertAndGetLastID test helper
- Wire dialect into AndroidDatastore
- Add postgres_smoke_test.go (excluded from regular CI via //go:build ignore)
Replace MySQL-specific SQL constructs with dialect-abstracted equivalents
throughout server/datastore/mysql/ and server/platform/mysql/:

- INSERT IGNORE → ds.dialect.InsertIgnoreInto() + OnConflictDoNothing()
- ON DUPLICATE KEY UPDATE → ds.dialect.OnDuplicateKey()
- MySQL error codes (1062, 1292, etc.) → ds.dialect.IsDuplicate/IsDeadlock/etc.
- Last-insert-id → insertAndGetID / insertAndGetIDTx (uses RETURNING on PG)
- GROUP_CONCAT → ds.dialect.GroupConcat()
- JSON_EXTRACT/JSON_ARRAYAGG → ds.dialect.JSONExtract/JSONArrayAgg()
- TIMESTAMPDIFF / DATE_FORMAT → dialect helpers
- Pagination SQL → ds.dialect.LimitOffset()
- Bitwise coercions and BINARY casts → dialect helpers

Also adds missing upstream methods and restores schema.sql to upstream state.
Update test files to pass ds.dialect to helper functions that now require
it for dialect-aware SQL generation:

- mdm_test.go: batchSetProfileLabelAssociationsDB calls
- microsoft_mdm_test.go: batchSetProfileVariableAssociationsDB calls
- operating_systems_test.go: upsertHostOperatingSystemDB calls
- packs_test.go: saveHostPackStatsDB calls
- policies_test.go: deleteAllPolicyMemberships, cleanupPolicyMembership* calls
- scripts_test.go: insertScriptContents return type int64 (was sql.Result)
- software_test.go: setOrUpdateSoftwareInstallerLabelsDB calls
- gofmt: fix formatting in mysql.go, policies.go, software.go, rebind_driver.go
- unused: remove jsonObjectFunc, resolveJSONFunc (mysql.go), dialectStep (migration.go)
- gosec: nolint md5 in policies.go (non-cryptographic checksum)
- staticcheck S1008: simplify errors.As in postgres/errors.go
- unconvert: remove unnecessary string() cast in postgres/errors.go
- gocritic: use errors.New instead of fmt.Errorf in rebind_driver.go
- staticcheck SA4004: remove non-looping for{} wrapper in castJsonbBuildObjectParams
- testifylint: use assert.Empty in dialect_mysql_test.go, nolint goroutine requires in packs_test.go
- modernize: interface{} → any in aggregated_stats.go, hosts.go, testing_utils.go
- modernize: strings.SplitSeq in testing_utils.go, new(bool) in postgres_smoke_test.go
- test: add mysqlDialect{} to mockDatastore to prevent nil dereference in TestGetContextTryStmt
dnplkndll pushed a commit that referenced this pull request Apr 14, 2026
<!-- Add the related story/sub-task/bug number, like Resolves fleetdm#123, or
remove if NA -->
**Related issue:** Resolves fleetdm#42836 

This is another hot path optimization.

## Before

When a host submits policy results via `SubmitDistributedQueryResults`,
the system needed to determine which policies "flipped" (changed from
passing to failing or vice versa). Each consumer computed this
independently:

```
SubmitDistributedQueryResults(policyResults)
  |
  +-- processScriptsForNewlyFailingPolicies
  |     filter to failing policies with scripts
  |     BUILD SUBSET of results
  |     CALL FlippingPoliciesForHost(subset)          <-- DB query #1
  |     convert result to set, filter, queue scripts
  |
  +-- processSoftwareForNewlyFailingPolicies
  |     filter to failing policies with installers
  |     BUILD SUBSET of results
  |     CALL FlippingPoliciesForHost(subset)          <-- DB query #2
  |     convert result to set, filter, queue installs
  |
  +-- processVPPForNewlyFailingPolicies
  |     filter to failing policies with VPP apps
  |     BUILD SUBSET of results
  |     CALL FlippingPoliciesForHost(subset)          <-- DB query #3
  |     convert result to set, filter, queue VPP
  |
  +-- webhook filtering
  |     filter to webhook-enabled policies
  |     CALL FlippingPoliciesForHost(subset)          <-- DB query #4
  |     register flipped policies in Redis
  |
  +-- RecordPolicyQueryExecutions
        CALL FlippingPoliciesForHost(all results)     <-- DB query #5
        reset attempt counters for newly passing
        INSERT/UPDATE policy_membership
```

Each `FlippingPoliciesForHost` call runs `SELECT policy_id, passes FROM
policy_membership WHERE host_id = ? AND policy_id IN (?)`. All 5 queries
hit the same table for the same host before `policy_membership` is
updated, so they all see identical state.

Each consumer also built intermediate maps to narrow down to its subset
before calling `FlippingPoliciesForHost`, then converted the result into
yet another set for filtering. This meant 3-4 temporary maps per
consumer.

## After

```
SubmitDistributedQueryResults(policyResults)
  |
  CALL FlippingPoliciesForHost(all results)           <-- single DB query
  build newFailingSet, normalize newPassing
  |
  +-- processScriptsForNewlyFailingPolicies
  |     filter to failing policies with scripts
  |     CHECK newFailingSet (in-memory map lookup)
  |     queue scripts
  |
  +-- processSoftwareForNewlyFailingPolicies
  |     filter to failing policies with installers
  |     CHECK newFailingSet (in-memory map lookup)
  |     queue installs
  |
  +-- processVPPForNewlyFailingPolicies
  |     filter to failing policies with VPP apps
  |     CHECK newFailingSet (in-memory map lookup)
  |     queue VPP
  |
  +-- webhook filtering
  |     filter to webhook-enabled policies
  |     FILTER newFailing/newPassing by policy IDs (in-memory)
  |     register flipped policies in Redis
  |
  +-- RecordPolicyQueryExecutions
        USE pre-computed newPassing (skip DB query)
        reset attempt counters for newly passing
        INSERT/UPDATE policy_membership
```

The intermediate subset maps and per-consumer set conversions are
removed. Each process function goes directly from "policies with
associated automation" to "is this policy in newFailingSet?" in a single
map lookup.

# Checklist for submitter

If some of the following don't apply, delete the relevant line.

- [x] Changes file added for user-visible changes in `changes/`,
`orbit/changes/` or `ee/fleetd-chrome/changes`.

## Testing

- [x] Added/updated automated tests
- [x] QA'd all new/changed functionality manually


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Performance Improvements**
* Reduced redundant database queries during policy result submissions by
computing flipping policies once per host check-in instead of multiple
times.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@dnplkndll
Copy link
Copy Markdown
Author

Superseded by #4 (feat/pg-compat-clean)

@dnplkndll dnplkndll closed this Apr 17, 2026
@dnplkndll dnplkndll deleted the feat/pg-compat branch April 17, 2026 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant