From 72d2db0ea9d113e8f7efa954eca4e3950395ac2b Mon Sep 17 00:00:00 2001 From: "Christian G. Warden" Date: Fri, 22 May 2026 07:38:26 -0500 Subject: [PATCH 1/2] salesforce: switch to OAuth flows and warehouse-style form The Salesforce connector now uses Salesforce's OAuth 2.0 endpoints (SOAP login is being decommissioned) and fits the warehouse-style connector + model flow used by other warehouses. - Authentication: drop SOAP login; pick between OAuth password flow, client credentials flow, and JWT bearer based on which credentials are populated. Adds a required `client_secret` for username/password authentication. - Schema: mark Salesforce as a warehouse so credentials are written to `connectors/.yaml` and the SOQL query lives in `models/.yaml`. The `key` field is uploaded as a file and base64-encoded before being stored in `.env` so embedded newlines do not break dotenv parsing; the driver accepts raw PEM as well. - Driver: accepts `sql:` as an alias for `soql:` to match the model shape produced by the explorer. --- .../connectors/data-source/salesforce.md | 118 +++++++++++---- go.mod | 14 +- go.sum | 28 +++- runtime/drivers/salesforce/authentication.go | 98 ++++++++++-- .../drivers/salesforce/authentication_test.go | 143 ++++++++++++++++++ runtime/drivers/salesforce/salesforce.go | 46 ++++-- runtime/drivers/salesforce/warehouse.go | 60 +++++--- .../src/features/sources/modal/constants.ts | 1 + .../features/templates/schemas/salesforce.ts | 117 ++++++++++---- 9 files changed, 508 insertions(+), 117 deletions(-) diff --git a/docs/docs/developers/build/connectors/data-source/salesforce.md b/docs/docs/developers/build/connectors/data-source/salesforce.md index b96148d51e2b..b8f5c2c1b309 100644 --- a/docs/docs/developers/build/connectors/data-source/salesforce.md +++ b/docs/docs/developers/build/connectors/data-source/salesforce.md @@ -9,55 +9,117 @@ sidebar_position: 65 ## Overview -[Salesforce](https://www.salesforce.com/) is a leading cloud-based Customer Relationship Management (CRM) platform designed to help businesses connect with and understand their customers better. It offers a comprehensive suite of applications focused on sales, customer service, marketing automation, analytics, and application development. Salesforce enables organizations of all sizes to build stronger relationships with their customers through personalized experiences, streamlined communication, and predictive insights. Rill can ingest data from Salesforce as a source by utilizing the Bulk API, which requires a Salesforce username and password (and, in some cases, a token, depending on the org configuration) to authenticate against a Salesforce org. +[Salesforce](https://www.salesforce.com/) is a leading cloud-based Customer Relationship Management (CRM) platform. Rill ingests data from Salesforce by issuing SOQL queries against the Bulk API. Authentication uses the Salesforce OAuth 2.0 endpoints exposed by a Connected App (or, for the client credentials and JWT flows, an External Client App). +The Salesforce connector follows the same shape as other warehouse connectors: credentials live in a connector file under `connectors/`, and each query is its own model file under `models/`. -## Local credentials +## Authentication -When using Rill Developer on your local machine, you will need to provide your credentials via a connector file. We would recommend not using plain text to create your file and instead use the `.env` file. For more details on your connector, see [connector YAML](/reference/project-files/connectors) for more details. +The Salesforce connector supports three OAuth flows. All three require a [Connected App](https://help.salesforce.com/s/articleView?id=sf.connected_app_overview.htm) in your Salesforce org; the client credentials and JWT flows also accept an [External Client App](https://help.salesforce.com/s/articleView?id=xcloud.ecapps_intro.htm). The flow you choose determines which other fields you need to provide. -:::tip Updating the project environmental variable +| Flow | Required fields | +| --- | --- | +| Username / Password (OAuth) | `username`, `password`, `client_id`, `client_secret` | +| Client Credentials | `client_id`, `client_secret` | +| JWT Bearer | `username`, `client_id`, `key` | + +The connector picks a flow based on which credentials are populated: JWT wins when `key` is set; otherwise a `username` plus `password` selects the OAuth password flow; otherwise a `client_secret` selects the client credentials flow. + +:::note SOAP login is deprecated + +Earlier versions of this connector used the SOAP login endpoint when a username and password were supplied. Salesforce is decommissioning the SOAP login endpoint, so the connector now uses the OAuth password flow instead. This requires the Connected App's Client Secret in addition to the existing Client ID. + +Note that the OAuth password flow only works with a Connected App; External Client Apps do not support it. The client credentials and JWT flows work with either. -If you've already deployed to Rill Cloud, you can either [push/pull the credential]( /guide/administration/project-settings/variables-and-credentials#pushing-and-pulling-credentials-to--from-rill-cloud-via-the-cli) from the CLI with: -``` -rill env push -rill env pull -``` ::: -Alternatively, you can include the credentials directly in the underlying source YAML by adding the `username` and `password` parameters. For example, your source YAML may contain the following properties (these can also be configured through the UI during source creation): +### Connector file + +Place your credentials in a connector file at `connectors/.yaml`. Reference secret values from `.env`. + +#### Username / Password (OAuth) + ```yaml -type: "model" -connector: "salesforce" -endpoint: "login.salesforce.com" -username: "user@example.com" -password: "MyPasswordMyToken" -soql: "SELECT Id, Name, CreatedDate FROM Opportunity" -sobject: "Opportunity" +type: connector +driver: salesforce + +endpoint: login.salesforce.com +username: user@example.com +password: "{{ .env.connector.salesforce.password }}" +client_id: "" +client_secret: "{{ .env.connector.salesforce.client_secret }}" ``` -:::tip Did you know? +#### Client Credentials -If this project has already been deployed to Rill Cloud and credentials have been set for this source, you can use `rill env pull` to [pull these cloud credentials](/developers/build/connectors/credentials/#rill-env-pull) locally (into your local `.env` file). Please note that this may override any credentials you have set locally for this source. +```yaml +type: connector +driver: salesforce -::: +endpoint: login.salesforce.com +client_id: "" +client_secret: "{{ .env.connector.salesforce.client_secret }}" +``` -## Deploy to Rill Cloud +#### JWT Bearer -When deploying a project to Rill Cloud, Rill requires you to explicitly provide Salesforce credentials used in your project. Please refer to our [connector YAML reference docs](/reference/project-files/connectors) for more information. +```yaml +type: connector +driver: salesforce -If you subsequently add sources that require new credentials (or if you simply entered the wrong credentials during the initial deploy), you can update the credentials by pushing the `Deploy` button to update your project or by running the following command in the CLI: +endpoint: login.salesforce.com +username: user@example.com +client_id: "" +key: "{{ .env.connector.salesforce.key }}" ``` -rill env push + +PEM keys contain newlines, which break `.env` parsing if stored raw. The UI's file picker base64-encodes the uploaded key automatically; when hand-editing `.env`, base64-encode the PEM file yourself: + +```sh +base64 < key.pem | tr -d '\n' >> .env ``` +Raw PEM written inline in the connector YAML (without an `.env` reference) is also accepted. -:::note +## Models -Leave the `key` and `client_id` fields blank unless you are using JWT (described in the next section [below](#jwt)). +A Salesforce model file references the connector by name and supplies the SOQL query plus the SObject the query reads from. The Bulk API needs the SObject to create the job, so it is set explicitly — Rill does not parse it from the SOQL. +```yaml +type: model +materialize: true + +connector: salesforce + +soql: | + SELECT Id, Name, CreatedDate + FROM Opportunity +sobject: Opportunity + +output: + connector: duckdb +``` + +Use `soql:` for the query. `sql:` is also accepted as an alias for parity with other warehouse drivers (the connector explorer in the UI writes the query into `sql:`). Add `queryAll: true` to include soft-deleted records. + +## Local credentials + +When using Rill Developer on your local machine, provide credentials via a connector file as shown above. Keep secrets in `.env` rather than the connector YAML. See [connector YAML](/reference/project-files/connectors) for more details. + +:::tip Updating the project environmental variable + +If you've already deployed to Rill Cloud, you can either [push/pull the credential]( /guide/administration/project-settings/variables-and-credentials#pushing-and-pulling-credentials-to--from-rill-cloud-via-the-cli) from the CLI with: +``` +rill env push +rill env pull +``` ::: -### JWT +## Deploy to Rill Cloud + +When deploying a project to Rill Cloud, Rill requires you to explicitly provide Salesforce credentials used in your project. See the [connector YAML reference docs](/reference/project-files/connectors) for more information. -Authentication using JWT instead of a password is also supported. Set `client_id` to the **Client Id** (also known as the _Consumer Key_) of the Connected App to use, and set `key` to contain the PEM-formatted private key to use for signing. +If you subsequently add sources that require new credentials (or if you simply entered the wrong credentials during the initial deploy), update them by pushing the `Deploy` button or by running: +``` +rill env push +``` diff --git a/go.mod b/go.mod index 5935dd20c232..7fda18d6c5ed 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 github.com/ClickHouse/clickhouse-go/v2 v2.41.0 - github.com/ForceCLI/force v1.1.0 + github.com/ForceCLI/force v1.10.1 github.com/Masterminds/sprig/v3 v3.3.0 github.com/MicahParks/keyfunc v1.9.0 github.com/NYTimes/gziphandler v1.1.1 @@ -142,6 +142,7 @@ require ( ) require ( + git.sr.ht/~jackmordaunt/go-toast v1.1.2 // indirect github.com/apache/arrow/go/v12 v12.0.1 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect @@ -152,6 +153,8 @@ require ( github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0 // indirect github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0 // indirect github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0 // indirect + github.com/esiqveland/notify v0.13.3 // indirect + github.com/gen2brain/beeep v0.11.2 // indirect github.com/go-openapi/swag/cmdutils v0.25.4 // indirect github.com/go-openapi/swag/conv v0.25.5 // indirect github.com/go-openapi/swag/fileutils v0.25.5 // indirect @@ -163,15 +166,21 @@ require ( github.com/go-openapi/swag/stringutils v0.25.5 // indirect github.com/go-openapi/swag/typeutils v0.25.5 // indirect github.com/go-openapi/swag/yamlutils v0.25.5 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/in-toto/attestation v1.1.2 // indirect + github.com/jackmordaunt/icns/v3 v3.0.1 // indirect github.com/moby/moby/api v1.54.1 // indirect github.com/moby/moby/client v0.4.0 // indirect + github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/prometheus/otlptranslator v1.0.0 // indirect github.com/rs/zerolog v1.28.0 // indirect github.com/segmentio/encoding v0.5.4 // indirect + github.com/sergeymakinen/go-bmp v1.0.0 // indirect + github.com/sergeymakinen/go-ico v1.0.0-beta.0 // indirect github.com/sigstore/sigstore v1.10.4 // indirect github.com/sigstore/sigstore-go v1.1.4 // indirect + github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af // indirect go.yaml.in/yaml/v2 v2.4.4 // indirect go.yaml.in/yaml/v4 v4.0.0-rc.4 // indirect gotest.tools/gotestsum v1.8.2 // indirect @@ -200,7 +209,6 @@ require ( github.com/BurntSushi/toml v1.4.0 // indirect github.com/ClickHouse/ch-go v0.69.0 // indirect github.com/DefangLabs/secret-detector v0.0.0-20250403165618-22662109213e // indirect - github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 // indirect github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect @@ -209,7 +217,6 @@ require ( github.com/Masterminds/semver/v3 v3.4.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v1.3.0 // indirect - github.com/ViViDboarder/gotifier v0.0.0-20140619195515-0f19f3d7c54c // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/andybalholm/brotli v1.2.0 // indirect @@ -347,7 +354,6 @@ require ( github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect - github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/buildkit v0.29.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect diff --git a/go.sum b/go.sum index 5f291c54c20a..fab6174ce471 100644 --- a/go.sum +++ b/go.sum @@ -619,6 +619,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 filippo.io/edwards25519 v1.1.1 h1:YpjwWWlNmGIDyXOn8zLzqiD+9TyIlPhGFG96P39uBpw= filippo.io/edwards25519 v1.1.1/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= +git.sr.ht/~jackmordaunt/go-toast v1.1.2 h1:/yrfI55LRt1M7H1vkaw+NaH1+L1CDxrqDltwm5euVuE= +git.sr.ht/~jackmordaunt/go-toast v1.1.2/go.mod h1:jA4OqHKTQ4AFBdwrSnwnskUIIS3HYzlJSgdzCKqfavo= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= @@ -682,10 +684,8 @@ github.com/ClickHouse/clickhouse-go/v2 v2.41.0 h1:JbLKMXLEkW0NMalMgI+GYb6FVZtpaM github.com/ClickHouse/clickhouse-go/v2 v2.41.0/go.mod h1:/RoTHh4aDA4FOCIQggwsiOwO7Zq1+HxQ0inef0Au/7k= github.com/DefangLabs/secret-detector v0.0.0-20250403165618-22662109213e h1:rd4bOvKmDIx0WeTv9Qz+hghsgyjikFiPrseXHlKepO0= github.com/DefangLabs/secret-detector v0.0.0-20250403165618-22662109213e/go.mod h1:blbwPQh4DTlCZEfk1BLU4oMIhLda2U+A840Uag9DsZw= -github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 h1:H2axnitaP3Dw+tocMHPQHjM2wJ/+grF8sOIQGaJeEsg= -github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99/go.mod h1:WHFXv3VIHldTnYGmWAXAxsu4O754A9Zakq4DedI8PSA= -github.com/ForceCLI/force v1.1.0 h1:e+KhyNeZF3r98YrvE32Bv7bQOCeg7lj1uiihcs3axVE= -github.com/ForceCLI/force v1.1.0/go.mod h1:S+csSNhBOHrRsRV7PMKhmPP7N4SBLYJMrQzEc/OHLTs= +github.com/ForceCLI/force v1.10.1 h1:Nr/ziB40Nbemibg+XQM+7bhsvujYwbwL5AvK0MllCmw= +github.com/ForceCLI/force v1.10.1/go.mod h1:vZhTNH4A1cgnFV21igwqwMRy01TXYsquzdXF4LnrZB8= github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a h1:mMd54YgLoeupNpbph3KdwvF58O0lZ72RQaJ2cFPOFDE= github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a/go.mod h1:DGKmCfb9oo5BivGO+szHk2ZvlqPDTlW4AYVpRBIVbms= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0 h1:DHa2U07rk8syqvCge0QIGMCE1WxGj9njT44GH7zNJLQ= @@ -750,8 +750,6 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= -github.com/ViViDboarder/gotifier v0.0.0-20140619195515-0f19f3d7c54c h1:qLWjxZGLdzxp0Gc4Sf6f4w15D+wNKZ28HhkV9y5cAhw= -github.com/ViViDboarder/gotifier v0.0.0-20140619195515-0f19f3d7c54c/go.mod h1:/nH+y85gO3ta3b6JtRWGA5hPIH35XJr/ZHXlfrBRx3A= github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= @@ -1253,6 +1251,8 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= +github.com/esiqveland/notify v0.13.3 h1:QCMw6o1n+6rl+oLUfg8P1IIDSFsDEb2WlXvVvIJbI/o= +github.com/esiqveland/notify v0.13.3/go.mod h1:hesw/IRYTO0x99u1JPweAl4+5mwXJibQVUcP0Iu5ORE= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -1288,6 +1288,8 @@ github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj2 github.com/gabriel-vasile/mimetype v1.4.7 h1:SKFKl7kD0RiPdbht0s7hFtjl489WcQ1VyPW8ZzUMYCA= github.com/gabriel-vasile/mimetype v1.4.7/go.mod h1:GDlAgAyIRT27BhFl53XNAFtfjzOkLaF35JdEG0P7LtU= github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= +github.com/gen2brain/beeep v0.11.2 h1:+KfiKQBbQCuhfJFPANZuJ+oxsSKAYNe88hIpJuyKWDA= +github.com/gen2brain/beeep v0.11.2/go.mod h1:jQVvuwnLuwOcdctHn/uyh8horSBNJ8uGb9Cn2W4tvoc= github.com/getkin/kin-openapi v0.132.0 h1:3ISeLMsQzcb5v26yeJrBcdTCEQTag36ZjaGk7MIRUwk= github.com/getkin/kin-openapi v0.132.0/go.mod h1:3OlG51PCYNsPByuiMB0t4fjnNlIDnaEDsjiKUV8nL58= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= @@ -1448,6 +1450,8 @@ github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6 github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.13.0 h1:95JolYOvGMqeH31+FC7D2+uULf6mG61mEZ/A8dRYMzw= github.com/gofrs/flock v0.13.0/go.mod h1:jxeyy9R1auM5S6JYDBhDt+E2TCo7DkratH4Pgi8P+Z0= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= @@ -1786,6 +1790,8 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackmordaunt/icns/v3 v3.0.1 h1:xxot6aNuGrU+lNgxz5I5H0qSeCjNKp8uTXB1j8D4S3o= +github.com/jackmordaunt/icns/v3 v3.0.1/go.mod h1:5sHL59nqTd2ynTnowxB/MDQFhKNqkK8X687uKNygaSQ= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= @@ -1947,7 +1953,6 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= @@ -2036,6 +2041,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/networkplumbing/go-nft v0.2.0/go.mod h1:HnnM+tYvlGAsMU7yoYwXEVLLiDW9gdMmb5HoGcwpuQs= +github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= +github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -2286,6 +2293,10 @@ github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/segmentio/encoding v0.5.4 h1:OW1VRern8Nw6ITAtwSZ7Idrl3MXCFwXHPgqESYfvNt0= github.com/segmentio/encoding v0.5.4/go.mod h1:HS1ZKa3kSN32ZHVZ7ZLPLXWvOVIiZtyJnO1gPH1sKt0= +github.com/sergeymakinen/go-bmp v1.0.0 h1:SdGTzp9WvCV0A1V0mBeaS7kQAwNLdVJbmHlqNWq0R+M= +github.com/sergeymakinen/go-bmp v1.0.0/go.mod h1:/mxlAQZRLxSvJFNIEGGLBE/m40f3ZnUifpgVDlcUIEY= +github.com/sergeymakinen/go-ico v1.0.0-beta.0 h1:m5qKH7uPKLdrygMWxbamVn+tl2HfiA3K6MFJw4GfZvQ= +github.com/sergeymakinen/go-ico v1.0.0-beta.0/go.mod h1:wQ47mTczswBO5F0NoDt7O0IXgnV4Xy3ojrroMQzyhUk= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4= @@ -2392,6 +2403,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/stripe/stripe-go/v79 v79.6.0 h1:qSBV2f2rpLEEZTdTlVLzdmQJZNmfoo2E3hUEkFT8GBc= @@ -2400,6 +2412,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= +github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af h1:6yITBqGTE2lEeTPG04SN9W+iWHCRyHqlVYILiSXziwk= +github.com/tadvi/systray v0.0.0-20190226123456-11a2b8fa57af/go.mod h1:4F09kP5F+am0jAwlQLddpoMDM+iewkxxt6nxUQ5nq5o= github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= github.com/testcontainers/testcontainers-go v0.14.0/go.mod h1:hSRGJ1G8Q5Bw2gXgPulJOLlEBaYJHeBSOkQM5JLG+JQ= github.com/testcontainers/testcontainers-go v0.41.1-0.20260401114404-cc0e33ddcfcf h1:lKH6b3SFPMaEhGtMv96Kq9xB8f4iIWat/jigTXO4Q0k= diff --git a/runtime/drivers/salesforce/authentication.go b/runtime/drivers/salesforce/authentication.go index 5c0387e4cda2..c4bdde6dbfa4 100644 --- a/runtime/drivers/salesforce/authentication.go +++ b/runtime/drivers/salesforce/authentication.go @@ -1,10 +1,12 @@ package salesforce import ( + "encoding/base64" "errors" "fmt" "net/url" "os" + "strings" force "github.com/ForceCLI/force/lib" ) @@ -17,33 +19,61 @@ type authenticationOptions struct { Password string JWT string ConnectedApp string + ClientSecret string } +// authMode describes which OAuth flow authenticate() will use for a given set +// of options. It is exported via selectAuthMode for unit testing. +type authMode int + +const ( + authModeUnknown authMode = iota + authModeJWT + authModePassword + authModeClientCredentials +) + func authenticate(options authenticationOptions) (*force.Force, error) { if options.ConnectedApp == "" { return nil, fmt.Errorf("connected app client id is required") } force.ClientId = options.ConnectedApp - if options.Username == "" { - return nil, fmt.Errorf("username missing") - } - - isJWTSelected := options.JWT != "" - isSOAPSelected := options.Password != "" - endpoint, err := endpoint(options) if err != nil { return nil, err } - switch { - case isJWTSelected: + switch selectAuthMode(options) { + case authModeJWT: + if options.Username == "" { + return nil, fmt.Errorf("username is required for JWT authentication") + } return jwtLogin(endpoint, options) - case isSOAPSelected: - return soapLoginAtEndpoint(endpoint, options.Username, options.Password) + case authModePassword: + if options.ClientSecret == "" { + return nil, fmt.Errorf("client_secret is required for username/password authentication") + } + return passwordFlowLogin(endpoint, options) + case authModeClientCredentials: + return clientCredentialsLogin(endpoint, options) + } + return nil, fmt.Errorf("unable to authenticate: provide a JWT key, a username and password (with client_secret), or a client_secret for the client credentials flow") +} + +// selectAuthMode picks an OAuth flow based on which credentials are populated. +// JWT wins when a key is present; otherwise username+password selects the +// password flow; otherwise a client_secret selects client credentials. +func selectAuthMode(options authenticationOptions) authMode { + switch { + case options.JWT != "": + return authModeJWT + case options.Username != "" && options.Password != "": + return authModePassword + case options.ClientSecret != "": + return authModeClientCredentials } - return nil, fmt.Errorf("unable to authenticate") + return authModeUnknown } func endpoint(options authenticationOptions) (endpoint string, err error) { @@ -67,13 +97,18 @@ func endpoint(options authenticationOptions) (endpoint string, err error) { } func jwtLogin(endpoint string, options authenticationOptions) (*force.Force, error) { + key, err := decodeJWTKey(options.JWT) + if err != nil { + return nil, err + } + tempfile, err := os.CreateTemp("", "") if err != nil { return nil, fmt.Errorf("creating tempfile to write rsa key failed: %w", err) } defer os.Remove(tempfile.Name()) - if _, err = tempfile.WriteString(options.JWT); err != nil { + if _, err = tempfile.Write(key); err != nil { return nil, fmt.Errorf("writing rsa key to tempfile failed: %w", err) } @@ -89,11 +124,42 @@ func jwtLogin(endpoint string, options authenticationOptions) (*force.Force, err return force.NewForce(&session), nil } -func soapLoginAtEndpoint(endpoint, username, password string) (*force.Force, error) { - session, err := force.ForceSoapLoginAtEndpoint(endpoint, username, password) +func passwordFlowLogin(endpoint string, options authenticationOptions) (*force.Force, error) { + session, err := force.PasswordFlowLoginAtEndpoint(endpoint, options.ConnectedApp, options.ClientSecret, options.Username, options.Password) if err != nil { - return nil, fmt.Errorf("SOAP authentication failed: %w", err) + return nil, fmt.Errorf("OAuth password authentication failed: %w", err) } + return force.NewForce(&session), nil +} +func clientCredentialsLogin(endpoint string, options authenticationOptions) (*force.Force, error) { + session, err := force.ClientCredentialsLoginAtEndpoint(endpoint, options.ConnectedApp, options.ClientSecret) + if err != nil { + return nil, fmt.Errorf("client credentials authentication failed: %w", err) + } return force.NewForce(&session), nil } + +// decodeJWTKey returns the raw PEM bytes for the JWT private key. The UI +// base64-encodes uploaded keys before writing them to .env so embedded +// newlines don't break the dotenv parser; raw PEM is accepted for +// backwards compatibility with hand-written configs. +func decodeJWTKey(key string) ([]byte, error) { + trimmed := strings.TrimSpace(key) + if strings.HasPrefix(trimmed, "-----BEGIN") { + return []byte(key), nil + } + // Tolerate whitespace introduced by line wrapping in .env or YAML. + compact := strings.Map(func(r rune) rune { + switch r { + case ' ', '\t', '\r', '\n': + return -1 + } + return r + }, trimmed) + decoded, err := base64.StdEncoding.DecodeString(compact) + if err != nil { + return nil, fmt.Errorf("JWT private key is neither PEM nor valid base64: %w", err) + } + return decoded, nil +} diff --git a/runtime/drivers/salesforce/authentication_test.go b/runtime/drivers/salesforce/authentication_test.go index 21ee5864589c..9167eadbea31 100644 --- a/runtime/drivers/salesforce/authentication_test.go +++ b/runtime/drivers/salesforce/authentication_test.go @@ -1,6 +1,7 @@ package salesforce import ( + "encoding/base64" "testing" "github.com/stretchr/testify/require" @@ -23,3 +24,145 @@ func TestEndpoint(t *testing.T) { require.NoError(t, err) require.Equal(t, "https://example.my.salesforce.com", e) } + +func TestSelectAuthMode(t *testing.T) { + cases := []struct { + name string + opts authenticationOptions + want authMode + }{ + { + name: "jwt wins when key is present", + opts: authenticationOptions{ + Username: "user@example.com", + Password: "pw", + JWT: "key", + ConnectedApp: "cid", + ClientSecret: "secret", + }, + want: authModeJWT, + }, + { + name: "password flow when username and password are set", + opts: authenticationOptions{ + Username: "user@example.com", + Password: "pw", + ConnectedApp: "cid", + ClientSecret: "secret", + }, + want: authModePassword, + }, + { + name: "client credentials when only client_id and secret are set", + opts: authenticationOptions{ + ConnectedApp: "cid", + ClientSecret: "secret", + }, + want: authModeClientCredentials, + }, + { + name: "client credentials when username is set without password", + opts: authenticationOptions{ + Username: "user@example.com", + ConnectedApp: "cid", + ClientSecret: "secret", + }, + want: authModeClientCredentials, + }, + { + name: "unknown when nothing useful is provided", + opts: authenticationOptions{ + Username: "user@example.com", + ConnectedApp: "cid", + }, + want: authModeUnknown, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.want, selectAuthMode(c.opts)) + }) + } +} + +func TestParseSourceProperties(t *testing.T) { + // Explicit soql + sobject is the historical shape. + conf, err := parseSourceProperties(map[string]any{ + "soql": "SELECT Id FROM Opportunity", + "sobject": "Opportunity", + }) + require.NoError(t, err) + require.Equal(t, "SELECT Id FROM Opportunity", conf.SOQL) + require.Equal(t, "Opportunity", conf.SObject) + + // `sql:` is accepted as a fallback for `soql:` so Salesforce fits the + // warehouse model shape produced by the connector explorer. + conf, err = parseSourceProperties(map[string]any{ + "sql": "SELECT Id FROM Account", + "sobject": "Account", + }) + require.NoError(t, err) + require.Equal(t, "SELECT Id FROM Account", conf.SOQL) + require.Equal(t, "Account", conf.SObject) + + // An explicit soql wins over sql when both are supplied. + conf, err = parseSourceProperties(map[string]any{ + "soql": "SELECT Id FROM Opportunity", + "sql": "SELECT Name FROM Lead", + "sobject": "Opportunity", + }) + require.NoError(t, err) + require.Equal(t, "SELECT Id FROM Opportunity", conf.SOQL) + + // Missing query is an error. + _, err = parseSourceProperties(map[string]any{"sobject": "Opportunity"}) + require.Error(t, err) + require.Contains(t, err.Error(), "soql") + + // Missing sobject is an error. + _, err = parseSourceProperties(map[string]any{"soql": "SELECT Id FROM Opportunity"}) + require.Error(t, err) + require.Contains(t, err.Error(), "sobject") +} + +func TestDecodeJWTKey(t *testing.T) { + pem := "-----BEGIN PRIVATE KEY-----\nMIIE...\n-----END PRIVATE KEY-----\n" + + // Raw PEM is passed through unchanged (backwards compat with hand-written configs). + out, err := decodeJWTKey(pem) + require.NoError(t, err) + require.Equal(t, pem, string(out)) + + // Base64-encoded PEM (the shape produced by the UI's file upload) decodes to PEM. + encoded := base64.StdEncoding.EncodeToString([]byte(pem)) + out, err = decodeJWTKey(encoded) + require.NoError(t, err) + require.Equal(t, pem, string(out)) + + // Whitespace inside the base64 value is tolerated (e.g. wrapped lines in .env). + wrapped := encoded[:20] + "\n" + encoded[20:40] + " " + encoded[40:] + out, err = decodeJWTKey(wrapped) + require.NoError(t, err) + require.Equal(t, pem, string(out)) + + // Garbage that is neither PEM nor base64 is rejected. + _, err = decodeJWTKey("not a key !@#$") + require.Error(t, err) +} + +func TestAuthenticateValidation(t *testing.T) { + // Missing connected app client id is always an error. + _, err := authenticate(authenticationOptions{Username: "u", Password: "p", ClientSecret: "s"}) + require.Error(t, err) + require.Contains(t, err.Error(), "connected app client id") + + // Username/password without client_secret should be reported. + _, err = authenticate(authenticationOptions{ConnectedApp: "cid", Username: "u", Password: "p"}) + require.Error(t, err) + require.Contains(t, err.Error(), "client_secret") + + // No credentials at all returns the catch-all error. + _, err = authenticate(authenticationOptions{ConnectedApp: "cid"}) + require.Error(t, err) + require.Contains(t, err.Error(), "unable to authenticate") +} diff --git a/runtime/drivers/salesforce/salesforce.go b/runtime/drivers/salesforce/salesforce.go index 7d1518807b9e..c70c74ca4e97 100644 --- a/runtime/drivers/salesforce/salesforce.go +++ b/runtime/drivers/salesforce/salesforce.go @@ -54,6 +54,11 @@ var spec = drivers.Spec{ Type: drivers.StringPropertyType, Secret: false, }, + { + Key: "client_secret", + Type: drivers.StringPropertyType, + Secret: true, + }, }, // Important: Any edits to the below properties must be accompanied by changes to the client-side form validation schemas. SourceProperties: []*drivers.PropertySpec{ @@ -105,7 +110,7 @@ var spec = drivers.Spec{ Type: drivers.StringPropertyType, DisplayName: "JWT Key for Authentication", Required: false, - Hint: "Paste your JWT private key for token-based authentication. Used with Connected App and Client ID.", + Hint: "Paste your JWT private key for token-based authentication. Used with a Connected App or External Client App's Client ID.", Placeholder: "your_jwt_key", Secret: true, }, @@ -123,9 +128,18 @@ var spec = drivers.Spec{ DisplayName: "Connected App Client Id", Required: false, Default: defaultClientID, - Hint: "The client ID (consumer key) from your Salesforce Connected App. Required for JWT authentication.", + Hint: "The Client ID from your Salesforce Connected App. The client credentials and JWT flows also accept an External Client App's Client ID.", NoPrompt: true, }, + { + Key: "client_secret", + Type: drivers.StringPropertyType, + DisplayName: "Connected App Client Secret", + Required: false, + Hint: "The Client Secret from your Salesforce Connected App. Required for the OAuth password flow (Connected App only) and for the client credentials flow (which also accepts an External Client App).", + Placeholder: "your_client_secret", + Secret: true, + }, { Key: "name", Type: drivers.StringPropertyType, @@ -171,14 +185,7 @@ type connection struct { // Ping implements drivers.Handle. func (c *connection) Ping(ctx context.Context) error { - var username, password, endpoint, key, clientID string - - if u, ok := c.config["username"].(string); ok && u != "" { - username = u - } else { - // backwards compatibility: return early because this can be defined in sourceProp - return nil - } + var username, password, endpoint, key, clientID, clientSecret string if e, ok := c.config["endpoint"].(string); ok && e != "" { endpoint = e @@ -193,17 +200,20 @@ func (c *connection) Ping(ctx context.Context) error { clientID = defaultClientID } - if p, ok := c.config["password"].(string); ok && p != "" { + if u, ok := c.config["username"].(string); ok { + username = u + } + + if p, ok := c.config["password"].(string); ok { password = p } - if k, ok := c.config["key"].(string); ok && k != "" { + if k, ok := c.config["key"].(string); ok { key = k } - if password == "" && key == "" { - // backwards compatibility: return early because this can be defined in sourceProp - return nil + if s, ok := c.config["client_secret"].(string); ok { + clientSecret = s } authOptions := authenticationOptions{ @@ -212,6 +222,12 @@ func (c *connection) Ping(ctx context.Context) error { JWT: key, Endpoint: endpoint, ConnectedApp: clientID, + ClientSecret: clientSecret, + } + + if selectAuthMode(authOptions) == authModeUnknown { + // backwards compatibility: credentials may be defined in the source properties instead + return nil } _, err := authenticate(authOptions) diff --git a/runtime/drivers/salesforce/warehouse.go b/runtime/drivers/salesforce/warehouse.go index 7ed604467816..7f6fb773abb1 100644 --- a/runtime/drivers/salesforce/warehouse.go +++ b/runtime/drivers/salesforce/warehouse.go @@ -33,14 +33,7 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any) (ou return nil, err } - var username, password, endpoint, key, clientID string - if srcProps.Username != "" { // get from src properties - username = srcProps.Username - } else if u, ok := c.config["username"].(string); ok && u != "" { // get from driver configs - username = u - } else { - return nil, fmt.Errorf("the property 'username' is required for Salesforce. Provide 'username' in the YAML properties or pass '--env connector.salesforce.username=...' to 'rill start'") - } + var username, password, endpoint, key, clientID, clientSecret string if srcProps.Endpoint != "" { // get from src properties endpoint = srcProps.Endpoint @@ -58,6 +51,12 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any) (ou clientID = defaultClientID } + if srcProps.Username != "" { // get from src properties + username = srcProps.Username + } else if u, ok := c.config["username"].(string); ok && u != "" { // get from driver configs + username = u + } + if srcProps.Password != "" { // get from src properties password = srcProps.Password } else if p, ok := c.config["password"].(string); ok && p != "" { // get from driver configs @@ -70,8 +69,10 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any) (ou key = k } - if password == "" && key == "" { - return nil, fmt.Errorf("the property 'password' or property 'key' is required for Salesforce. Provide 'password' or 'key' in the YAML properties or pass '--env connector.salesforce.password=...' or '--env connector.salesforce.key=...' to 'rill start'") + if srcProps.ClientSecret != "" { // get from src properties + clientSecret = srcProps.ClientSecret + } else if s, ok := c.config["client_secret"].(string); ok && s != "" { // get from driver configs + clientSecret = s } authOptions := authenticationOptions{ @@ -80,6 +81,20 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any) (ou JWT: key, Endpoint: endpoint, ConnectedApp: clientID, + ClientSecret: clientSecret, + } + + switch selectAuthMode(authOptions) { + case authModeUnknown: + return nil, fmt.Errorf("Salesforce credentials are required: provide a JWT 'key', a 'username' and 'password' (with 'client_secret'), or a 'client_secret' for the client credentials flow") + case authModePassword: + if clientSecret == "" { + return nil, fmt.Errorf("the property 'client_secret' is required for username/password authentication. Provide 'client_secret' in the YAML properties or pass '--env connector.salesforce.client_secret=...' to 'rill start'") + } + case authModeJWT: + if username == "" { + return nil, fmt.Errorf("the property 'username' is required for JWT authentication. Provide 'username' in the YAML properties or pass '--env connector.salesforce.username=...' to 'rill start'") + } } session, err := authenticate(authOptions) @@ -156,14 +171,16 @@ func (j *bulkJob) Next(ctx context.Context) ([]string, error) { } type sourceProperties struct { - SOQL string `mapstructure:"soql"` - SObject string `mapstructure:"sobject"` - QueryAll bool `mapstructure:"queryAll"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Key string `mapstructure:"key"` - Endpoint string `mapstructure:"endpoint"` - ClientID string `mapstructure:"client_id"` + SOQL string `mapstructure:"soql"` + SQL string `mapstructure:"sql"` + SObject string `mapstructure:"sobject"` + QueryAll bool `mapstructure:"queryAll"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Key string `mapstructure:"key"` + Endpoint string `mapstructure:"endpoint"` + ClientID string `mapstructure:"client_id"` + ClientSecret string `mapstructure:"client_secret"` } func parseSourceProperties(props map[string]any) (*sourceProperties, error) { @@ -172,8 +189,13 @@ func parseSourceProperties(props map[string]any) (*sourceProperties, error) { if err != nil { return nil, err } + // Accept `sql:` as an alias for `soql:` so Salesforce fits the same model + // shape as other warehouse drivers (which read `sql:` from the source). + if conf.SOQL == "" { + conf.SOQL = conf.SQL + } if conf.SOQL == "" { - return nil, fmt.Errorf("property 'soql' is mandatory for connector \"salesforce\"") + return nil, fmt.Errorf("property 'soql' (or 'sql') is mandatory for connector \"salesforce\"") } if conf.SObject == "" { return nil, fmt.Errorf("property 'sobject' is mandatory for connector \"salesforce\"") diff --git a/web-common/src/features/sources/modal/constants.ts b/web-common/src/features/sources/modal/constants.ts index b208be0a3b1b..97887618d7f2 100644 --- a/web-common/src/features/sources/modal/constants.ts +++ b/web-common/src/features/sources/modal/constants.ts @@ -92,6 +92,7 @@ export const SOURCES = [ "postgres", "redshift", "s3", + "salesforce", "snowflake", "sqlite", "supabase", diff --git a/web-common/src/features/templates/schemas/salesforce.ts b/web-common/src/features/templates/schemas/salesforce.ts index 794b6199f642..7a90b8ee4859 100644 --- a/web-common/src/features/templates/schemas/salesforce.ts +++ b/web-common/src/features/templates/schemas/salesforce.ts @@ -4,14 +4,52 @@ export const salesforceSchema: MultiStepFormSchema = { $schema: "http://json-schema.org/draft-07/schema#", type: "object", title: "Salesforce", - "x-category": "fileStore", + "x-category": "warehouse", "x-form-height": "tall", properties: { + auth_method: { + type: "string", + title: "Authentication method", + enum: ["username_password", "client_credentials", "jwt"], + default: "username_password", + description: "Choose how to authenticate to Salesforce", + "x-display": "radio", + "x-enum-labels": [ + "Username / Password (OAuth)", + "Client Credentials", + "JWT Bearer", + ], + "x-enum-descriptions": [ + "Sign in with a Salesforce username and password using the OAuth password flow. Requires a Connected App's Client ID and Client Secret (External Client Apps do not support this flow).", + "Sign in as the run-as user using the OAuth client credentials flow. Requires a Connected App or External Client App's Client ID and Client Secret.", + "Sign in with a JWT signed by a private key. Requires a Connected App or External Client App's Client ID and a PEM-formatted private key.", + ], + "x-ui-only": true, + "x-grouped-fields": { + username_password: [ + "username", + "password", + "client_id", + "client_secret", + ], + client_credentials: ["client_id", "client_secret"], + jwt: ["username", "client_id", "key"], + }, + }, + endpoint: { + type: "string", + title: "Login endpoint", + description: + "Salesforce login URL (e.g., login.salesforce.com or test.salesforce.com)", + "x-placeholder": "login.salesforce.com", + default: "login.salesforce.com", + }, username: { type: "string", title: "Username", description: "Salesforce username (usually an email)", "x-placeholder": "user@example.com", + "x-visible-if": { auth_method: ["username_password", "jwt"] }, }, password: { type: "string", @@ -21,56 +59,79 @@ export const salesforceSchema: MultiStepFormSchema = { "x-placeholder": "your_password", "x-secret": true, "x-env-var-name": "SALESFORCE_PASSWORD", + "x-visible-if": { auth_method: "username_password" }, }, - endpoint: { + client_id: { type: "string", - title: "Login endpoint", + title: "Connected App Client ID", description: - "Salesforce login URL (e.g., login.salesforce.com or test.salesforce.com)", - "x-placeholder": "login.salesforce.com", + "Client ID for the Salesforce Connected App. The client credentials and JWT flows also accept an External Client App's Client ID.", + "x-placeholder": "Connected App client ID", + }, + client_secret: { + type: "string", + title: "Connected App Client Secret", + description: + "Client Secret for the Salesforce Connected App. The client credentials flow also accepts an External Client App's Client Secret.", + "x-placeholder": "Connected App client secret", + "x-secret": true, + "x-env-var-name": "SALESFORCE_CLIENT_SECRET", + "x-visible-if": { + auth_method: ["username_password", "client_credentials"], + }, }, key: { type: "string", title: "JWT private key", - description: "PEM-formatted private key for JWT auth", - "x-display": "textarea", - "x-placeholder": "your_private_key", + description: + "PEM-formatted private key for JWT auth. The file is base64-encoded before being stored in .env so its newlines do not break parsing.", + format: "file", + "x-display": "file", + "x-file-accept": ".pem,.key", + "x-file-encoding": "base64", "x-secret": true, "x-env-var-name": "SALESFORCE_KEY", - "x-advanced": true, - }, - client_id: { - type: "string", - title: "Connected App Client ID", - description: "Client ID (consumer key) for JWT auth", - "x-placeholder": "Connected App client ID", - "x-advanced": true, + "x-visible-if": { auth_method: "jwt" }, }, soql: { type: "string", title: "SOQL", - description: "SOQL query to extract data", + description: "SOQL query to extract data from Salesforce", "x-placeholder": "SELECT Id, Name FROM Opportunity", + "x-step": "explorer", }, sobject: { type: "string", title: "SObject", - description: "Salesforce object to query", + description: + "Salesforce object the SOQL query reads from (e.g. Opportunity, Account, MyObject__c)", "x-placeholder": "Opportunity", - }, - queryAll: { - type: "boolean", - title: "Query all", - description: "Include deleted and archived records", - default: false, + "x-step": "explorer", }, name: { type: "string", - title: "Source name", - description: "Name for the source", + title: "Model name", + description: "Name for the model", pattern: "^[a-zA-Z0-9_]+$", - "x-placeholder": "my_new_source", + "x-placeholder": "my_model", + "x-step": "explorer", }, }, - required: ["soql", "sobject", "name", "username", "password"], + required: ["soql", "sobject", "name"], + allOf: [ + { + if: { properties: { auth_method: { const: "username_password" } } }, + then: { + required: ["username", "password", "client_id", "client_secret"], + }, + }, + { + if: { properties: { auth_method: { const: "client_credentials" } } }, + then: { required: ["client_id", "client_secret"] }, + }, + { + if: { properties: { auth_method: { const: "jwt" } } }, + then: { required: ["username", "client_id", "key"] }, + }, + ], }; From 151e35ad751f1edfdcebee1c6541424acb83340e Mon Sep 17 00:00:00 2001 From: "Christian G. Warden" Date: Fri, 22 May 2026 09:15:24 -0500 Subject: [PATCH 2/2] salesforce: switch to Bulk API 2.0 and add information schema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bulk API 2.0: replace the v1 query path with `Bulk2QueryJob`. v2 manages chunking server-side and derives the SObject from the query itself, so `sobject:` is no longer a required model property. - SELECT * rewrite: SOQL does not accept `SELECT *`, but the connector explorer's "Table" mode produces it. The driver detects `SELECT * FROM ` (with optional WHERE / ORDER BY / LIMIT), calls DescribeSObject, and rewrites the query into an explicit field list. Compound `address` / `location` types are skipped — their atomic components remain because Salesforce exposes them as their own fields. - Information schema: implement `ListDatabaseSchemas` / `ListTables` / `GetTable` via `force.ListSobjects` and `force.DescribeSObject`, so the connector explorer can browse SObjects as tables and clear the "does not implement information schema" error. - Shared auth helpers: extract `connection.authOptions()` and `sourceProperties.applyOverrides()` so InformationSchema, Ping, and QueryAsFiles all build their auth options the same way. --- .../connectors/data-source/salesforce.md | 5 +- .../drivers/salesforce/authentication_test.go | 60 +++- runtime/drivers/salesforce/bulk.go | 284 ------------------ runtime/drivers/salesforce/bulk2.go | 159 ++++++++++ .../drivers/salesforce/information_schema.go | 168 +++++++++++ .../salesforce/information_schema_test.go | 35 +++ runtime/drivers/salesforce/pk_chunking.go | 59 ---- runtime/drivers/salesforce/salesforce.go | 64 +--- runtime/drivers/salesforce/select_star.go | 73 +++++ .../drivers/salesforce/select_star_test.go | 60 ++++ runtime/drivers/salesforce/warehouse.go | 159 +++------- .../features/templates/schemas/salesforce.ts | 10 +- 12 files changed, 587 insertions(+), 549 deletions(-) delete mode 100644 runtime/drivers/salesforce/bulk.go create mode 100644 runtime/drivers/salesforce/bulk2.go create mode 100644 runtime/drivers/salesforce/information_schema.go create mode 100644 runtime/drivers/salesforce/information_schema_test.go delete mode 100644 runtime/drivers/salesforce/pk_chunking.go create mode 100644 runtime/drivers/salesforce/select_star.go create mode 100644 runtime/drivers/salesforce/select_star_test.go diff --git a/docs/docs/developers/build/connectors/data-source/salesforce.md b/docs/docs/developers/build/connectors/data-source/salesforce.md index b8f5c2c1b309..77a417e634d3 100644 --- a/docs/docs/developers/build/connectors/data-source/salesforce.md +++ b/docs/docs/developers/build/connectors/data-source/salesforce.md @@ -83,7 +83,7 @@ Raw PEM written inline in the connector YAML (without an `.env` reference) is al ## Models -A Salesforce model file references the connector by name and supplies the SOQL query plus the SObject the query reads from. The Bulk API needs the SObject to create the job, so it is set explicitly — Rill does not parse it from the SOQL. +A Salesforce model file references the connector by name and supplies the SOQL query. Rill issues the query through the Salesforce [Bulk API 2.0](https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/bulk_api_2_0.htm) ```yaml type: model @@ -94,7 +94,6 @@ connector: salesforce soql: | SELECT Id, Name, CreatedDate FROM Opportunity -sobject: Opportunity output: connector: duckdb @@ -102,6 +101,8 @@ output: Use `soql:` for the query. `sql:` is also accepted as an alias for parity with other warehouse drivers (the connector explorer in the UI writes the query into `sql:`). Add `queryAll: true` to include soft-deleted records. +SOQL itself does not accept `SELECT *`, but the connector rewrites the `SELECT * FROM ` shape (which the explorer's "Table" mode emits) into an explicit field list discovered from the SObject's describe response. Compound types like `Address` and `Location` are skipped — their atomic sub-components (e.g. `BillingStreet`, `BillingCity`) are queried instead. + ## Local credentials When using Rill Developer on your local machine, provide credentials via a connector file as shown above. Keep secrets in `.env` rather than the connector YAML. See [connector YAML](/reference/project-files/connectors) for more details. diff --git a/runtime/drivers/salesforce/authentication_test.go b/runtime/drivers/salesforce/authentication_test.go index 9167eadbea31..905e2a0fb4ab 100644 --- a/runtime/drivers/salesforce/authentication_test.go +++ b/runtime/drivers/salesforce/authentication_test.go @@ -86,43 +86,43 @@ func TestSelectAuthMode(t *testing.T) { } func TestParseSourceProperties(t *testing.T) { - // Explicit soql + sobject is the historical shape. + // Explicit soql is the historical shape; Bulk API 2.0 derives the + // SObject from the query itself so `sobject` is no longer required. conf, err := parseSourceProperties(map[string]any{ - "soql": "SELECT Id FROM Opportunity", - "sobject": "Opportunity", + "soql": "SELECT Id FROM Opportunity", }) require.NoError(t, err) require.Equal(t, "SELECT Id FROM Opportunity", conf.SOQL) - require.Equal(t, "Opportunity", conf.SObject) // `sql:` is accepted as a fallback for `soql:` so Salesforce fits the // warehouse model shape produced by the connector explorer. conf, err = parseSourceProperties(map[string]any{ - "sql": "SELECT Id FROM Account", - "sobject": "Account", + "sql": "SELECT Id FROM Account", }) require.NoError(t, err) require.Equal(t, "SELECT Id FROM Account", conf.SOQL) - require.Equal(t, "Account", conf.SObject) // An explicit soql wins over sql when both are supplied. conf, err = parseSourceProperties(map[string]any{ - "soql": "SELECT Id FROM Opportunity", - "sql": "SELECT Name FROM Lead", - "sobject": "Opportunity", + "soql": "SELECT Id FROM Opportunity", + "sql": "SELECT Name FROM Lead", }) require.NoError(t, err) require.Equal(t, "SELECT Id FROM Opportunity", conf.SOQL) - // Missing query is an error. - _, err = parseSourceProperties(map[string]any{"sobject": "Opportunity"}) + // Missing query is still an error. + _, err = parseSourceProperties(map[string]any{}) require.Error(t, err) require.Contains(t, err.Error(), "soql") - // Missing sobject is an error. - _, err = parseSourceProperties(map[string]any{"soql": "SELECT Id FROM Opportunity"}) - require.Error(t, err) - require.Contains(t, err.Error(), "sobject") + // A legacy `sobject:` value is silently ignored (no schema field) so + // existing project YAMLs don't error out after the Bulk 2.0 switch. + conf, err = parseSourceProperties(map[string]any{ + "soql": "SELECT Id FROM Opportunity", + "sobject": "Opportunity", + }) + require.NoError(t, err) + require.Equal(t, "SELECT Id FROM Opportunity", conf.SOQL) } func TestDecodeJWTKey(t *testing.T) { @@ -150,6 +150,34 @@ func TestDecodeJWTKey(t *testing.T) { require.Error(t, err) } +func TestApplyOverrides(t *testing.T) { + base := authenticationOptions{ + Endpoint: "login.salesforce.com", + ConnectedApp: defaultClientID, + Username: "config@example.com", + Password: "config-pw", + } + + // Per-source values supplied on the model win over connector config. + src := &sourceProperties{ + Endpoint: "test.salesforce.com", + Username: "src@example.com", + Password: "src-pw", + } + got := base + src.applyOverrides(&got) + require.Equal(t, "test.salesforce.com", got.Endpoint) + require.Equal(t, "src@example.com", got.Username) + require.Equal(t, "src-pw", got.Password) + // ConnectedApp was not set on the source, so the connector default stays. + require.Equal(t, defaultClientID, got.ConnectedApp) + + // An empty source struct leaves the connector-level options untouched. + got = base + (&sourceProperties{}).applyOverrides(&got) + require.Equal(t, base, got) +} + func TestAuthenticateValidation(t *testing.T) { // Missing connected app client id is always an error. _, err := authenticate(authenticationOptions{Username: "u", Password: "p", ClientSecret: "s"}) diff --git a/runtime/drivers/salesforce/bulk.go b/runtime/drivers/salesforce/bulk.go deleted file mode 100644 index d17d66e71aec..000000000000 --- a/runtime/drivers/salesforce/bulk.go +++ /dev/null @@ -1,284 +0,0 @@ -package salesforce - -import ( - "bytes" - "context" - "encoding/json" - "encoding/xml" - "errors" - "fmt" - "io" - "net/http" - "os" - "strconv" - "time" - - force "github.com/ForceCLI/force/lib" - "github.com/ForceCLI/force/lib/record_reader" - "github.com/rilldata/rill/runtime/pkg/observability" - "go.uber.org/zap" -) - -type batchResult struct { - batch *force.BatchInfo - resultID string -} - -type bulkJob struct { - session *force.Force - objectName string - query string - job force.JobInfo - jobID string - batchID string - logger *zap.Logger - // pkChunking automatically splits large data sets into smaller batches of pkChunkSize, which we can query concurrently later on - pkChunkSize int - results []batchResult - nextResult int - tempFilePaths []string - keepFilesUntilClose bool -} - -func (j *bulkJob) RecordReader(in io.Reader) record_reader.RecordReader { - return record_reader.NewCsv(in, &record_reader.Options{GroupSize: 100}) -} - -func makeBulkJob(session *force.Force, objectName, query string, queryAll bool, logger *zap.Logger) *bulkJob { - pkChunkSize := 100000 - contentType := force.JobContentTypeCsv - operation := "query" - - if queryAll { - operation = "queryAll" - } - - return &bulkJob{ - session: session, - objectName: objectName, - query: query, - pkChunkSize: pkChunkSize, - logger: logger, - job: force.JobInfo{ - Operation: operation, - Object: objectName, - ContentType: string(contentType), - }, - } -} - -func (c *connection) startJob(ctx context.Context, j *bulkJob) error { - session := j.session - - jobInfo, err := session.CreateBulkJobWithContext(ctx, j.job, func(request *http.Request) { - if isPKChunkingEnabled(j) { - pkChunkHeader := "chunkSize=" + strconv.Itoa(j.pkChunkSize) - parent := parentObject(j.objectName) - - if parent != "" { - pkChunkHeader += "; parent=" + parent - } - - request.Header.Add("Sforce-Enable-PKChunking", pkChunkHeader) - } - }) - if err != nil { - if errors.Is(err, force.InvalidBulkObject) { - return errors.New("object is not supported by Bulk API") - } - return err - } - result, err := session.BulkQueryWithContext(ctx, j.query, jobInfo.Id, j.job.ContentType) - if err != nil { - return errors.New("bulk query failed with " + err.Error()) - } - batchID := result.Id - // wait for chunking to complete - if isPKChunkingEnabled(j) { - for { - batchInfo, err := session.GetBatchInfoWithContext(ctx, jobInfo.Id, batchID) - if err != nil { - return errors.New("bulk job status failed with " + err.Error()) - } - - if batchInfo.State == "NotProcessed" { - // batches have been created - break - } - if batchInfo.State == "Failed" { - return errors.New("bulk query failed: " + batchInfo.StateMessage) - } - c.logger.Info("Waiting for pk chunking to complete", observability.ZapCtx(ctx)) - select { - case <-time.After(2 * time.Second): - case <-ctx.Done(): - return fmt.Errorf("startJob cancelled: %w", ctx.Err()) - } - } - } - - jobInfo, err = session.CloseBulkJobWithContext(ctx, jobInfo.Id) - if err != nil { - return err - } - var status force.JobInfo - - for { - status, err = session.GetJobInfoWithContext(ctx, jobInfo.Id) - if err != nil { - return errors.New("bulk job status failed with " + err.Error()) - } - if status.NumberBatchesCompleted+status.NumberBatchesFailed == status.NumberBatchesTotal { - break - } - c.logger.Info("Waiting for bulk export to complete", observability.ZapCtx(ctx)) - select { - case <-time.After(2 * time.Second): - case <-ctx.Done(): - return fmt.Errorf("startJob cancelled: %w", ctx.Err()) - } - } - - j.job = status - j.jobID = jobInfo.Id - j.batchID = batchID - - return nil -} - -func (j *bulkJob) getBatches(ctx context.Context) error { - if j.jobID == "" { - return fmt.Errorf("Invalid job: no job id") - } - - var batches []force.BatchInfo - var err error - errorMessage := "Could not retrieve job result. Reason: " - - if isPKChunkingEnabled(j) { - var allBatches []force.BatchInfo - allBatches, err = j.session.GetBatchesWithContext(ctx, j.job.Id) - // for pk chunking enabled jobs the first batch has no results - if allBatches != nil { - if allBatches[0].State == "Failed" { - return fmt.Errorf("Batch failed with: %s", allBatches[0].StateMessage) - } - - for _, b := range allBatches { - if b.State != "NotProcessed" && b.NumberRecordsProcessed > 0 { - batches = append(batches, b) - } - } - } - } else { - batch, berr := j.session.GetBatchInfoWithContext(ctx, j.jobID, j.batchID) - err = berr - batches = []force.BatchInfo{batch} - } - if err != nil { - return fmt.Errorf("%s %w", errorMessage+"batch status failed with ", err) - } - for _, b := range batches { - results, err := getBatchResults(ctx, j.session, &j.job, b) - if err != nil { - return fmt.Errorf("%s %w", errorMessage+"batch results failed with ", err) - } - j.results = append(j.results, results...) - } - return nil -} - -func (j *bulkJob) retrieveJobResult(ctx context.Context, result int) (string, error) { - batchResult := j.results[result] - writer, err := os.CreateTemp("", "batchResult-"+batchResult.resultID+"-*.csv") - if err != nil { - return "", err - } - defer func() { - writer.Close() - }() - - httpBody := fetchBatchResult(ctx, j, batchResult, j.logger) - err = readAndWriteBody(ctx, j, httpBody, writer) - if closer, ok := httpBody.(io.ReadCloser); ok { - closer.Close() - } - if err != nil { - return "", err - } - return writer.Name(), nil -} - -func fetchBatchResult(ctx context.Context, j *bulkJob, resultInfo batchResult, logger *zap.Logger) io.Reader { - if resultInfo.batch.State == "Failed" { - logger.Error("Could not fetch job result", zap.String("reason", "batch failed with "+resultInfo.batch.StateMessage), observability.ZapCtx(ctx)) - return bytes.NewReader(nil) - } - if resultInfo.batch.NumberRecordsProcessed == 0 { - logger.Debug("No records found for query", observability.ZapCtx(ctx)) - return bytes.NewReader(nil) - } - var result io.Reader - err := j.session.RetrieveBulkJobQueryResultsWithCallbackWithContext(ctx, j.job, resultInfo.batch.Id, resultInfo.resultID, func(r *http.Response) error { - result = r.Body - return nil - }) - if err != nil { - logger.Error("Could not fetch job result", zap.String("reason", "batch failed with "+resultInfo.batch.StateMessage), observability.ZapCtx(ctx)) - return bytes.NewReader(nil) - } - return result -} - -func readAndWriteBody(ctx context.Context, j *bulkJob, httpBody io.Reader, w io.Writer) error { - recReader := j.RecordReader(httpBody) - for { - records, err := recReader.Next() - if errors.Is(err, io.EOF) { - return nil - } else if err != nil { - return err - } - if _, err := io.Copy(w, bytes.NewReader(records.Bytes)); err != nil { - return fmt.Errorf("write failed: %w", err) - } - select { - case <-ctx.Done(): - return fmt.Errorf("readAndWriteBody cancelled: %w", ctx.Err()) - default: - } - } -} - -// Get all of the results for a batch. Most batches have one results, but -// large batches can be split into multiple result files. -func getBatchResults(ctx context.Context, session *force.Force, job *force.JobInfo, batch force.BatchInfo) ([]batchResult, error) { - var resultIDs []string - var results []batchResult - jobInfo, err := session.RetrieveBulkQueryWithContext(ctx, job.Id, batch.Id) - if err != nil { - return nil, err - } - - jct, err := job.JobContentType() - if err != nil { - return nil, err - } - if jct == force.JobContentTypeJson { - err = json.Unmarshal(jobInfo, &resultIDs) - } else { - var resultList struct { - Results []string `xml:"result"` - } - err = xml.Unmarshal(jobInfo, &resultList) - resultIDs = resultList.Results - } - if err != nil { - return nil, err - } - for _, r := range resultIDs { - results = append(results, batchResult{batch: &batch, resultID: r}) - } - - return results, err -} diff --git a/runtime/drivers/salesforce/bulk2.go b/runtime/drivers/salesforce/bulk2.go new file mode 100644 index 000000000000..b219d7d37653 --- /dev/null +++ b/runtime/drivers/salesforce/bulk2.go @@ -0,0 +1,159 @@ +package salesforce + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "time" + + force "github.com/ForceCLI/force/lib" + "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/pkg/observability" + "go.uber.org/zap" +) + +// bulk2QueryJob drives a Salesforce Bulk API 2.0 query job and exposes the +// paginated CSV results through drivers.FileIterator. Each Next() call fetches +// the next page (one CSV file per page) using the Sforce-Locator cursor. +type bulk2QueryJob struct { + session *force.Force + logger *zap.Logger + + jobID string + locator string + done bool + tempFilePaths []string + keepFilesUntilClose bool +} + +const bulk2PollInterval = 2 * time.Second + +func makeBulk2QueryJob(session *force.Force, logger *zap.Logger) *bulk2QueryJob { + return &bulk2QueryJob{session: session, logger: logger} +} + +// startJob submits the query to Bulk API 2.0 and waits for the job to reach a +// terminal state before returning. The caller can then iterate results via Next(). +func (j *bulk2QueryJob) startJob(ctx context.Context, query string, queryAll bool) error { + op := force.Bulk2OperationQuery + if queryAll { + op = force.Bulk2OperationQueryAll + } + + // The connector explorer's "Table" mode (and users typing the same + // shape) produces `SELECT * FROM `, which SOQL doesn't accept. + // Expand the star into the SObject's queryable field list before sending. + expanded, err := expandSelectStar(j.session, query) + if err != nil { + return err + } + query = expanded + + info, err := j.session.CreateBulk2QueryJobWithContext(ctx, force.Bulk2QueryJobRequest{ + Operation: op, + Query: query, + }) + if err != nil { + return fmt.Errorf("creating Bulk API 2.0 query job: %w", err) + } + j.jobID = info.Id + + final, err := j.session.WaitForBulk2QueryJobWithContext(ctx, j.jobID, bulk2PollInterval, func(state any) { + info, _ := state.(force.Bulk2QueryJobInfo) + j.logger.Info("Waiting for Bulk API 2.0 query job", zap.String("state", string(info.State)), observability.ZapCtx(ctx)) + }) + if err != nil { + return fmt.Errorf("waiting for Bulk API 2.0 query job: %w", err) + } + if final.State == force.Bulk2JobStateFailed { + msg := final.ErrorMessage + if msg == "" { + msg = "job failed without an error message" + } + return fmt.Errorf("Bulk API 2.0 query job failed: %s", msg) + } + if final.State == force.Bulk2JobStateAborted { + return errors.New("Bulk API 2.0 query job was aborted") + } + if final.NumberRecordsProcessed == 0 { + // Nothing to download; mark done so Next() returns io.EOF immediately. + j.done = true + } + return nil +} + +var _ drivers.FileIterator = &bulk2QueryJob{} + +// Close implements drivers.FileIterator. +func (j *bulk2QueryJob) Close() error { + return j.cleanupTempFiles() +} + +// Format implements drivers.FileIterator. +func (j *bulk2QueryJob) Format() string { return "csv" } + +// SetKeepFilesUntilClose implements drivers.FileIterator. +func (j *bulk2QueryJob) SetKeepFilesUntilClose() { j.keepFilesUntilClose = true } + +// Next implements drivers.FileIterator. Each call fetches one page of results +// using the locator cursor returned by the previous call. +func (j *bulk2QueryJob) Next(ctx context.Context) ([]string, error) { + if j.jobID == "" { + return nil, errors.New("invalid bulk2 job: no job id") + } + if j.done { + return nil, io.EOF + } + + if !j.keepFilesUntilClose { + if err := j.cleanupTempFiles(); err != nil { + return nil, err + } + } + + page, err := j.session.GetBulk2QueryResultsWithContext(ctx, j.jobID, j.locator, 0) + if err != nil { + return nil, fmt.Errorf("fetching Bulk API 2.0 results: %w", err) + } + j.locator = page.Locator + if j.locator == "" { + // Salesforce signals the final page by returning an empty Sforce-Locator + // header; mark done so the next iteration returns io.EOF. + j.done = true + } + + path, err := writeBytesToTempFile(page.Data, j.jobID) + if err != nil { + return nil, err + } + j.tempFilePaths = append(j.tempFilePaths, path) + return []string{path}, nil +} + +func (j *bulk2QueryJob) cleanupTempFiles() error { + if len(j.tempFilePaths) == 0 { + return nil + } + for _, p := range j.tempFilePaths { + if err := os.Remove(p); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete temp file %s: %w", p, err) + } + } + j.tempFilePaths = nil + return nil +} + +func writeBytesToTempFile(data []byte, jobID string) (string, error) { + f, err := os.CreateTemp("", "salesforce-bulk2-"+jobID+"-*.csv") + if err != nil { + return "", err + } + defer f.Close() + if _, err := f.Write(data); err != nil { + _ = os.Remove(f.Name()) + return "", fmt.Errorf("writing Bulk API 2.0 results to %s: %w", f.Name(), err) + } + return f.Name(), nil +} diff --git a/runtime/drivers/salesforce/information_schema.go b/runtime/drivers/salesforce/information_schema.go new file mode 100644 index 000000000000..7a2b3db03378 --- /dev/null +++ b/runtime/drivers/salesforce/information_schema.go @@ -0,0 +1,168 @@ +package salesforce + +import ( + "context" + "encoding/json" + "fmt" + "sort" + + force "github.com/ForceCLI/force/lib" + "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/pkg/pagination" +) + +// AsInformationSchema implements drivers.Handle. +func (c *connection) AsInformationSchema() (drivers.InformationSchema, bool) { + return c, true +} + +// ListDatabaseSchemas returns a single empty database/schema pair. Salesforce +// orgs do not have nested catalogs the way relational warehouses do, so the +// SObjects in the org are exposed as the tables under this one entry. +func (c *connection) ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*drivers.DatabaseSchemaInfo, string, error) { + return []*drivers.DatabaseSchemaInfo{{Database: "", DatabaseSchema: ""}}, "", nil +} + +// ListTables returns the queryable SObjects in the connected org. +func (c *connection) ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*drivers.TableInfo, string, error) { + session, err := c.authenticateFromConfig() + if err != nil { + return nil, "", err + } + + sobjects, err := session.ListSobjects() + if err != nil { + return nil, "", fmt.Errorf("failed to list Salesforce SObjects: %w", err) + } + + // Filter to queryable SObjects and pull their API names, then sort so + // pagination is stable across calls. + names := make([]string, 0, len(sobjects)) + for _, so := range sobjects { + if !sobjectBool(so, "queryable") { + continue + } + name, _ := so["name"].(string) + if name == "" { + continue + } + names = append(names, name) + } + sort.Strings(names) + + limit := pagination.ValidPageSize(pageSize, drivers.DefaultPageSize) + start := 0 + if pageToken != "" { + var startAfter string + if err := pagination.UnmarshalPageToken(pageToken, &startAfter); err != nil { + return nil, "", fmt.Errorf("invalid page token: %w", err) + } + // Find the first name strictly greater than the previous page's tail. + start = sort.SearchStrings(names, startAfter) + if start < len(names) && names[start] == startAfter { + start++ + } + } + + end := min(start+limit, len(names)) + + res := make([]*drivers.TableInfo, 0, end-start) + for _, n := range names[start:end] { + res = append(res, &drivers.TableInfo{Name: n}) + } + + next := "" + if end < len(names) { + next = pagination.MarshalPageToken(names[end-1]) + } + return res, next, nil +} + +// GetTable returns the column names and SOQL types for the given SObject. +func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { + session, err := c.authenticateFromConfig() + if err != nil { + return nil, err + } + + body, err := session.DescribeSObject(table) + if err != nil { + return nil, fmt.Errorf("failed to describe SObject %q: %w", table, err) + } + + schema, err := parseDescribeSObject(body) + if err != nil { + return nil, err + } + return &drivers.TableMetadata{Schema: schema}, nil +} + +// parseDescribeSObject extracts a name→SOQL-type map from the JSON body of a +// Salesforce describe call. +func parseDescribeSObject(body string) (map[string]string, error) { + var desc struct { + Fields []struct { + Name string `json:"name"` + Type string `json:"type"` + } `json:"fields"` + } + if err := json.Unmarshal([]byte(body), &desc); err != nil { + return nil, fmt.Errorf("failed to parse SObject describe response: %w", err) + } + + schema := make(map[string]string, len(desc.Fields)) + for _, f := range desc.Fields { + if f.Name == "" { + continue + } + schema[f.Name] = f.Type + } + return schema, nil +} + +// authenticateFromConfig builds a force session using only the driver's +// connection config (no per-source overrides). Used by InformationSchema where +// there is no model/source context. +func (c *connection) authenticateFromConfig() (*force.Force, error) { + opts := c.authOptions() + if opts.Endpoint == "" { + return nil, fmt.Errorf("the property 'endpoint' is required for Salesforce") + } + session, err := authenticate(opts) + if err != nil { + return nil, fmt.Errorf("authentication failed: %w", err) + } + return session, nil +} + +// authOptions builds an authenticationOptions from the connection config. +// It does not validate required fields; callers should check the result +// (e.g. via selectAuthMode) before using it. +func (c *connection) authOptions() authenticationOptions { + clientID, _ := c.config["client_id"].(string) + if clientID == "" { + clientID = defaultClientID + } + + endpoint, _ := c.config["endpoint"].(string) + username, _ := c.config["username"].(string) + password, _ := c.config["password"].(string) + key, _ := c.config["key"].(string) + clientSecret, _ := c.config["client_secret"].(string) + + return authenticationOptions{ + Username: username, + Password: password, + JWT: key, + Endpoint: endpoint, + ConnectedApp: clientID, + ClientSecret: clientSecret, + } +} + +// sobjectBool reads a boolean attribute from a ForceSobject map, defaulting +// to false when the key is missing or not a bool. +func sobjectBool(so force.ForceSobject, key string) bool { + v, ok := so[key].(bool) + return ok && v +} diff --git a/runtime/drivers/salesforce/information_schema_test.go b/runtime/drivers/salesforce/information_schema_test.go new file mode 100644 index 000000000000..448debd2a6ed --- /dev/null +++ b/runtime/drivers/salesforce/information_schema_test.go @@ -0,0 +1,35 @@ +package salesforce + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseDescribeSObject(t *testing.T) { + // Trimmed shape of a real describe response: fields[].name and + // fields[].type are the bits we use; extra keys must be ignored. + body := `{ + "name": "Opportunity", + "queryable": true, + "fields": [ + {"name": "Id", "type": "id", "label": "Opportunity ID"}, + {"name": "Name", "type": "string", "length": 120}, + {"name": "Amount", "type": "currency"}, + {"name": "", "type": "string"} + ] + }` + + schema, err := parseDescribeSObject(body) + require.NoError(t, err) + require.Equal(t, map[string]string{ + "Id": "id", + "Name": "string", + "Amount": "currency", + }, schema) +} + +func TestParseDescribeSObject_invalidJSON(t *testing.T) { + _, err := parseDescribeSObject("{not json") + require.Error(t, err) +} diff --git a/runtime/drivers/salesforce/pk_chunking.go b/runtime/drivers/salesforce/pk_chunking.go deleted file mode 100644 index b99c3f4dd9a6..000000000000 --- a/runtime/drivers/salesforce/pk_chunking.go +++ /dev/null @@ -1,59 +0,0 @@ -package salesforce - -import ( - "regexp" - "sort" - "strings" -) - -func isPKChunkingEnabled(bulkJob *bulkJob) bool { - return bulkJob.pkChunkSize > 0 && isPKChunkingEnabledObject(bulkJob.objectName) -} - -// pk chunking only works for certain standard objects, custom objects and share/history of those -func isPKChunkingEnabledObject(objectName string) bool { - standardObjectPKChunkingEnabled := []string{"account", "accounthistory", "accountshare", "campaign", "campaignhistory", "campaignmember", "campaignmemberhistory", "campaignmembershare", "campaignshare", "case", "casehistory", "caseshare", "contact", "contacthistory", "contactshare", "event", "eventhistory", "eventrelation", "eventrelationhistory", "eventrelationshare", "eventshare", "lead", "leadhistory", "leadshare", "opportunity", "opportunityhistory", "opportunityshare", "task", "taskhistory", "taskshare", "user", "userhistory", "usershare"} - - isCustomObject, err := regexp.MatchString("__c$", objectName) - if err != nil { - panic("Regex errored out with " + err.Error()) - } - isShareHistoryCustomObject, err := regexp.MatchString("(__Share|__History)$", objectName) - if err != nil { - panic("Regex errored out with " + err.Error()) - } - isHistoricalTrendingObject, err := regexp.MatchString("_hd$", objectName) - if err != nil { - panic("Regex errored out with " + err.Error()) - } - - return contains(standardObjectPKChunkingEnabled, objectName) || isCustomObject || isShareHistoryCustomObject || isHistoricalTrendingObject -} - -// performs a binary search for a given string -func contains(values []string, val string) bool { - val = strings.ToLower(val) - index := sort.SearchStrings(values, val) - - return index < len(values) && values[index] == val -} - -// if a object has Share or History (__Share, __History for custom) suffix, it likely has a parent object, which should be queried when using pk chunking -func parentObject(objectName string) string { - var parent string - regex := regexp.MustCompile("(__Share|Share|__History|History)$") - indexes := regex.FindStringIndex(objectName) - - if indexes != nil { - start, end := indexes[0], indexes[1] - suffix := objectName[start:end] - isCustomObject := suffix[0:2] == "__" - - parent = objectName[:start] - if isCustomObject { - parent += "__c" - } - } - - return parent -} diff --git a/runtime/drivers/salesforce/salesforce.go b/runtime/drivers/salesforce/salesforce.go index c70c74ca4e97..1f9413a9f18d 100644 --- a/runtime/drivers/salesforce/salesforce.go +++ b/runtime/drivers/salesforce/salesforce.go @@ -71,15 +71,6 @@ var spec = drivers.Spec{ Placeholder: "SELECT Id, CreatedDate, Name FROM Opportunity", Hint: "Write a SOQL query to retrieve data from your Salesforce object. For example: SELECT Id, Name FROM Opportunity.", }, - { - Key: "sobject", - Type: drivers.StringPropertyType, - Required: true, - DisplayName: "SObject", - Description: "SObject to query in Salesforce.", - Placeholder: "Opportunity", - Hint: "Enter the name of the Salesforce object you want to query (e.g., Opportunity, Lead, Account).", - }, { Key: "queryAll", Type: drivers.BooleanPropertyType, @@ -185,56 +176,18 @@ type connection struct { // Ping implements drivers.Handle. func (c *connection) Ping(ctx context.Context) error { - var username, password, endpoint, key, clientID, clientSecret string + authOptions := c.authOptions() - if e, ok := c.config["endpoint"].(string); ok && e != "" { - endpoint = e - } else { - // backwards compatibility: return early because this can be defined in sourceProp + // Backwards compatibility: endpoint and credentials may be defined in + // per-model source properties rather than the connector config, so a + // connector with no creds at this layer is not necessarily broken. + if authOptions.Endpoint == "" || selectAuthMode(authOptions) == authModeUnknown { return nil } - if c, ok := c.config["client_id"].(string); ok && c != "" { - clientID = c - } else { - clientID = defaultClientID - } - - if u, ok := c.config["username"].(string); ok { - username = u - } - - if p, ok := c.config["password"].(string); ok { - password = p - } - - if k, ok := c.config["key"].(string); ok { - key = k - } - - if s, ok := c.config["client_secret"].(string); ok { - clientSecret = s - } - - authOptions := authenticationOptions{ - Username: username, - Password: password, - JWT: key, - Endpoint: endpoint, - ConnectedApp: clientID, - ClientSecret: clientSecret, - } - - if selectAuthMode(authOptions) == authModeUnknown { - // backwards compatibility: credentials may be defined in the source properties instead - return nil - } - - _, err := authenticate(authOptions) - if err != nil { + if _, err := authenticate(authOptions); err != nil { return fmt.Errorf("authentication failed: %w", err) } - return nil } @@ -293,11 +246,6 @@ func (c *connection) AsOLAP(instanceID string) (drivers.OLAPStore, bool) { return nil, false } -// AsInformationSchema implements drivers.Connection. -func (c *connection) AsInformationSchema() (drivers.InformationSchema, bool) { - return nil, false -} - // AsObjectStore implements drivers.Connection. func (c *connection) AsObjectStore() (drivers.ObjectStore, bool) { return nil, false diff --git a/runtime/drivers/salesforce/select_star.go b/runtime/drivers/salesforce/select_star.go new file mode 100644 index 000000000000..df6fee00f018 --- /dev/null +++ b/runtime/drivers/salesforce/select_star.go @@ -0,0 +1,73 @@ +package salesforce + +import ( + "encoding/json" + "fmt" + "regexp" + "strings" + + force "github.com/ForceCLI/force/lib" +) + +// SOQL does not support `SELECT *`. The connector explorer's "Table" mode +// generates exactly that shape, so the driver expands it server-side: when +// the query is `SELECT * FROM ` (optionally followed by a WHERE / +// ORDER BY / LIMIT etc. clause), the `*` is replaced with the queryable +// field list from DescribeSObject. +var selectStarRegex = regexp.MustCompile(`(?is)^\s*SELECT\s+\*\s+FROM\s+([A-Za-z_][A-Za-z0-9_]*)(\b.*)?$`) + +// expandSelectStar returns the original query unchanged unless it matches the +// `SELECT * FROM ` shape, in which case it rewrites the `*` into an +// explicit field list discovered via Salesforce's describe endpoint. +func expandSelectStar(session *force.Force, query string) (string, error) { + matches := selectStarRegex.FindStringSubmatch(query) + if len(matches) == 0 { + return query, nil + } + sobject := matches[1] + rest := matches[2] + + body, err := session.DescribeSObject(sobject) + if err != nil { + return "", fmt.Errorf("describing SObject %q to expand SELECT *: %w", sobject, err) + } + fields, err := queryableFieldNames(body) + if err != nil { + return "", err + } + if len(fields) == 0 { + return "", fmt.Errorf("SObject %q has no queryable fields", sobject) + } + return fmt.Sprintf("SELECT %s FROM %s%s", strings.Join(fields, ", "), sobject, rest), nil +} + +// queryableFieldNames extracts field names from a Salesforce describe response, +// skipping field types that are not allowed in a SOQL SELECT list (the +// compound `address` and `location` types — their sub-components like +// BillingStreet / BillingCity are returned as separate fields and are queried +// individually). +func queryableFieldNames(describeBody string) ([]string, error) { + var desc struct { + Fields []struct { + Name string `json:"name"` + Type string `json:"type"` + } `json:"fields"` + } + if err := json.Unmarshal([]byte(describeBody), &desc); err != nil { + return nil, fmt.Errorf("parsing describe response: %w", err) + } + out := make([]string, 0, len(desc.Fields)) + for _, f := range desc.Fields { + if f.Name == "" { + continue + } + switch f.Type { + case "address", "location": + // Compound types are not selectable; their components appear + // as their own field entries. + continue + } + out = append(out, f.Name) + } + return out, nil +} diff --git a/runtime/drivers/salesforce/select_star_test.go b/runtime/drivers/salesforce/select_star_test.go new file mode 100644 index 000000000000..90d92a62baf8 --- /dev/null +++ b/runtime/drivers/salesforce/select_star_test.go @@ -0,0 +1,60 @@ +package salesforce + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueryableFieldNames(t *testing.T) { + body := `{ + "name": "Account", + "fields": [ + {"name": "Id", "type": "id"}, + {"name": "Name", "type": "string"}, + {"name": "BillingAddress", "type": "address"}, + {"name": "BillingStreet", "type": "string"}, + {"name": "BillingCity", "type": "string"}, + {"name": "Location", "type": "location"}, + {"name": "", "type": "string"} + ] + }` + got, err := queryableFieldNames(body) + require.NoError(t, err) + // Compound types (address, location) and empty names are filtered; + // their atomic sub-components remain because Salesforce describes them + // as separate fields. + require.Equal(t, []string{"Id", "Name", "BillingStreet", "BillingCity"}, got) +} + +func TestSelectStarRegex(t *testing.T) { + cases := []struct { + query string + wantObj string + match bool + }{ + {"SELECT * FROM Account", "Account", true}, + {"select * from account", "account", true}, + {" SELECT * FROM Opportunity ", "Opportunity", true}, + {"SELECT * FROM My_Custom__c WHERE Id != null", "My_Custom__c", true}, + {"SELECT * FROM Account\nORDER BY Name\nLIMIT 10", "Account", true}, + + // Real explicit field lists must not be rewritten. + {"SELECT Id, Name FROM Account", "", false}, + {"SELECT *, Id FROM Account", "", false}, + // FROM must be a bare identifier; subqueries / multiple objects don't + // fit the simple shape we expand. + {"SELECT * FROM (SELECT Id FROM Account)", "", false}, + } + for _, c := range cases { + t.Run(c.query, func(t *testing.T) { + m := selectStarRegex.FindStringSubmatch(c.query) + if !c.match { + require.Len(t, m, 0, "expected no match") + return + } + require.Greater(t, len(m), 1, "expected a match with capture group") + require.Equal(t, c.wantObj, m[1]) + }) + } +} diff --git a/runtime/drivers/salesforce/warehouse.go b/runtime/drivers/salesforce/warehouse.go index 7f6fb773abb1..be02cb0cb0ad 100644 --- a/runtime/drivers/salesforce/warehouse.go +++ b/runtime/drivers/salesforce/warehouse.go @@ -3,8 +3,6 @@ package salesforce import ( "context" "fmt" - "io" - "os" "github.com/mitchellh/mapstructure" "github.com/rilldata/rill/runtime/drivers" @@ -33,66 +31,25 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any) (ou return nil, err } - var username, password, endpoint, key, clientID, clientSecret string + authOptions := c.authOptions() + // Per-model source properties override the connector-level config so a + // single connector definition can be reused across models with different + // credentials when needed. + srcProps.applyOverrides(&authOptions) - if srcProps.Endpoint != "" { // get from src properties - endpoint = srcProps.Endpoint - } else if e, ok := c.config["endpoint"].(string); ok && e != "" { // get from driver configs - endpoint = e - } else { + if authOptions.Endpoint == "" { return nil, fmt.Errorf("the property 'endpoint' is required for Salesforce. Provide 'endpoint' in the YAML properties or pass '--env connector.salesforce.endpoint=...' to 'rill start'") } - if srcProps.ClientID != "" { // get from src properties - clientID = srcProps.ClientID - } else if c, ok := c.config["client_id"].(string); ok && c != "" { // get from driver configs - clientID = c - } else { - clientID = defaultClientID - } - - if srcProps.Username != "" { // get from src properties - username = srcProps.Username - } else if u, ok := c.config["username"].(string); ok && u != "" { // get from driver configs - username = u - } - - if srcProps.Password != "" { // get from src properties - password = srcProps.Password - } else if p, ok := c.config["password"].(string); ok && p != "" { // get from driver configs - password = p - } - - if srcProps.Key != "" { // get from src properties - key = srcProps.Key - } else if k, ok := c.config["key"].(string); ok && k != "" { // get from driver configs - key = k - } - - if srcProps.ClientSecret != "" { // get from src properties - clientSecret = srcProps.ClientSecret - } else if s, ok := c.config["client_secret"].(string); ok && s != "" { // get from driver configs - clientSecret = s - } - - authOptions := authenticationOptions{ - Username: username, - Password: password, - JWT: key, - Endpoint: endpoint, - ConnectedApp: clientID, - ClientSecret: clientSecret, - } - switch selectAuthMode(authOptions) { case authModeUnknown: return nil, fmt.Errorf("Salesforce credentials are required: provide a JWT 'key', a 'username' and 'password' (with 'client_secret'), or a 'client_secret' for the client credentials flow") case authModePassword: - if clientSecret == "" { + if authOptions.ClientSecret == "" { return nil, fmt.Errorf("the property 'client_secret' is required for username/password authentication. Provide 'client_secret' in the YAML properties or pass '--env connector.salesforce.client_secret=...' to 'rill start'") } case authModeJWT: - if username == "" { + if authOptions.Username == "" { return nil, fmt.Errorf("the property 'username' is required for JWT authentication. Provide 'username' in the YAML properties or pass '--env connector.salesforce.username=...' to 'rill start'") } } @@ -102,78 +59,16 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any) (ou return nil, fmt.Errorf("authentication failed: %w", err) } - job := makeBulkJob(session, srcProps.SObject, srcProps.SOQL, srcProps.QueryAll, c.logger) - - err = c.startJob(ctx, job) - if err != nil { - return nil, err - } - - err = job.getBatches(ctx) - if err != nil { + job := makeBulk2QueryJob(session, c.logger) + if err := job.startJob(ctx, srcProps.SOQL, srcProps.QueryAll); err != nil { return nil, err } - return job, nil } -var _ drivers.FileIterator = &bulkJob{} - -// Close implements drivers.RowIterator. -func (j *bulkJob) Close() error { - for _, p := range j.tempFilePaths { - err := os.Remove(p) - if err != nil { - return fmt.Errorf("failed to delete temp file: %w", err) - } - } - j.tempFilePaths = nil - return nil -} - -// Format implements drivers.RowIterator. -func (j *bulkJob) Format() string { - return "csv" -} - -// SetKeepFilesUntilClose implements drivers.RowIterator. -func (j *bulkJob) SetKeepFilesUntilClose() { - j.keepFilesUntilClose = true -} - -// Next implements drivers.RowIterator. -func (j *bulkJob) Next(ctx context.Context) ([]string, error) { - if j.jobID == "" { - return nil, fmt.Errorf("invalid job: no job id") - } - if j.job.NumberRecordsProcessed == 0 { - return nil, io.EOF - } - if len(j.tempFilePaths) != 0 && !j.keepFilesUntilClose { - for _, p := range j.tempFilePaths { - err := os.Remove(p) - if err != nil { - return nil, fmt.Errorf("failed to delete temp file: %w", err) - } - } - j.tempFilePaths = nil - } - if j.nextResult == len(j.results) { - return nil, io.EOF - } - tempFile, err := j.retrieveJobResult(ctx, j.nextResult) - if err != nil { - return nil, fmt.Errorf("failed to retrieve batch: %w", err) - } - j.tempFilePaths = append(j.tempFilePaths, tempFile) - j.nextResult++ - return []string{tempFile}, nil -} - type sourceProperties struct { SOQL string `mapstructure:"soql"` SQL string `mapstructure:"sql"` - SObject string `mapstructure:"sobject"` QueryAll bool `mapstructure:"queryAll"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` @@ -183,22 +78,44 @@ type sourceProperties struct { ClientSecret string `mapstructure:"client_secret"` } +// applyOverrides copies any non-empty source-level credential fields onto the +// supplied connector-level options. Used so a model can override the connector +// for one-off credentials without changing the connector definition. +func (s *sourceProperties) applyOverrides(opts *authenticationOptions) { + if s.Endpoint != "" { + opts.Endpoint = s.Endpoint + } + if s.ClientID != "" { + opts.ConnectedApp = s.ClientID + } + if s.Username != "" { + opts.Username = s.Username + } + if s.Password != "" { + opts.Password = s.Password + } + if s.Key != "" { + opts.JWT = s.Key + } + if s.ClientSecret != "" { + opts.ClientSecret = s.ClientSecret + } +} + func parseSourceProperties(props map[string]any) (*sourceProperties, error) { conf := &sourceProperties{} - err := mapstructure.Decode(props, conf) - if err != nil { + if err := mapstructure.Decode(props, conf); err != nil { return nil, err } // Accept `sql:` as an alias for `soql:` so Salesforce fits the same model // shape as other warehouse drivers (which read `sql:` from the source). + // Bulk API 2.0 derives the SObject from the query itself, so a separate + // `sobject:` property is no longer required. if conf.SOQL == "" { conf.SOQL = conf.SQL } if conf.SOQL == "" { return nil, fmt.Errorf("property 'soql' (or 'sql') is mandatory for connector \"salesforce\"") } - if conf.SObject == "" { - return nil, fmt.Errorf("property 'sobject' is mandatory for connector \"salesforce\"") - } - return conf, err + return conf, nil } diff --git a/web-common/src/features/templates/schemas/salesforce.ts b/web-common/src/features/templates/schemas/salesforce.ts index 7a90b8ee4859..5eaaa6440a46 100644 --- a/web-common/src/features/templates/schemas/salesforce.ts +++ b/web-common/src/features/templates/schemas/salesforce.ts @@ -100,14 +100,6 @@ export const salesforceSchema: MultiStepFormSchema = { "x-placeholder": "SELECT Id, Name FROM Opportunity", "x-step": "explorer", }, - sobject: { - type: "string", - title: "SObject", - description: - "Salesforce object the SOQL query reads from (e.g. Opportunity, Account, MyObject__c)", - "x-placeholder": "Opportunity", - "x-step": "explorer", - }, name: { type: "string", title: "Model name", @@ -117,7 +109,7 @@ export const salesforceSchema: MultiStepFormSchema = { "x-step": "explorer", }, }, - required: ["soql", "sobject", "name"], + required: ["soql", "name"], allOf: [ { if: { properties: { auth_method: { const: "username_password" } } },