Skip to content

Commit 5909784

Browse files
Add PostgreSQL plugin (#30)
* Add PostgreSQL plugin with query execution capabilities and documentation * simplify Async queries execution * Refactor PostgreSQL handler for readability * readme updates to make it consistent with other plugin docs * upgrade service --------- Co-authored-by: ivan.hladush <ivan.hladush@pattern.com>
1 parent 80ad6bf commit 5909784

3 files changed

Lines changed: 289 additions & 0 deletions

File tree

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package postgres
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync"
8+
9+
"github.com/patterninc/heimdall/internal/pkg/database"
10+
pkgcontext "github.com/patterninc/heimdall/pkg/context"
11+
"github.com/patterninc/heimdall/pkg/object/cluster"
12+
"github.com/patterninc/heimdall/pkg/object/job"
13+
"github.com/patterninc/heimdall/pkg/plugin"
14+
"github.com/patterninc/heimdall/pkg/result"
15+
"github.com/patterninc/heimdall/pkg/result/column"
16+
)
17+
18+
// postgresJobContext represents the context for a PostgreSQL job
19+
type postgresJobContext struct {
20+
Query string `yaml:"query,omitempty" json:"query,omitempty"`
21+
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
22+
}
23+
24+
type postgresClusterContext struct {
25+
ConnectionString string `yaml:"connection_string,omitempty" json:"connection_string,omitempty"`
26+
}
27+
28+
type postgresCommandContext struct {
29+
mu sync.Mutex
30+
}
31+
32+
// New creates a new PostgreSQL plugin handler.
33+
func New(_ *pkgcontext.Context) (plugin.Handler, error) {
34+
p := &postgresCommandContext{}
35+
return p, nil
36+
}
37+
38+
// Handler for the PostgreSQL query execution.
39+
func (p *postgresCommandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error {
40+
jobContext, err := validateJobContext(j)
41+
if err != nil {
42+
return err
43+
}
44+
45+
clusterContext, err := validateClusterContext(c)
46+
if err != nil {
47+
return err
48+
}
49+
50+
db := &database.Database{ConnectionString: clusterContext.ConnectionString}
51+
52+
if jobContext.ReturnResult {
53+
return executeSyncQuery(db, jobContext.Query, j)
54+
}
55+
return p.executeAsyncQueries(db, jobContext.Query, j)
56+
}
57+
58+
func validateJobContext(j *job.Job) (*postgresJobContext, error) {
59+
jobContext := &postgresJobContext{}
60+
if j.Context != nil {
61+
if err := j.Context.Unmarshal(jobContext); err != nil {
62+
return nil, fmt.Errorf("failed to unmarshal job context: %w", err)
63+
}
64+
}
65+
if jobContext.Query == "" {
66+
return nil, fmt.Errorf("query is required in job context")
67+
}
68+
return jobContext, nil
69+
}
70+
71+
func validateClusterContext(c *cluster.Cluster) (*postgresClusterContext, error) {
72+
clusterContext := &postgresClusterContext{}
73+
if c.Context != nil {
74+
if err := c.Context.Unmarshal(clusterContext); err != nil {
75+
return nil, fmt.Errorf("failed to unmarshal cluster context: %w", err)
76+
}
77+
}
78+
if clusterContext.ConnectionString == "" {
79+
return nil, fmt.Errorf("connection_string is required in cluster context")
80+
}
81+
return clusterContext, nil
82+
}
83+
84+
func executeSyncQuery(db *database.Database, query string, j *job.Job) error {
85+
// Allow a single query, even if it ends with a semicolon
86+
queries := splitAndTrimQueries(query)
87+
if len(queries) != 1 {
88+
return fmt.Errorf("multiple queries are not allowed when return_result is true")
89+
}
90+
91+
sess, err := db.NewSession(false)
92+
if err != nil {
93+
return fmt.Errorf("failed to open PostgreSQL connection: %w", err)
94+
}
95+
defer sess.Close()
96+
97+
rows, err := sess.Query(queries[0])
98+
if err != nil {
99+
return fmt.Errorf("PostgreSQL query execution failed: %w", err)
100+
}
101+
defer rows.Close()
102+
103+
rowsResult, err := result.FromRows(rows)
104+
if err != nil {
105+
return fmt.Errorf("failed to process PostgreSQL query results: %w", err)
106+
}
107+
108+
j.Result = rowsResult
109+
return nil
110+
}
111+
112+
func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
113+
sess, err := db.NewSession(false)
114+
if err != nil {
115+
j.Result = &result.Result{
116+
Columns: []*column.Column{{
117+
Name: "error",
118+
Type: column.Type("string"),
119+
}},
120+
Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}},
121+
}
122+
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
123+
}
124+
defer sess.Close()
125+
126+
_, err = sess.Exec(query)
127+
if err != nil {
128+
j.Result = &result.Result{
129+
Columns: []*column.Column{{
130+
Name: "error",
131+
Type: column.Type("string"),
132+
}},
133+
Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}},
134+
}
135+
return fmt.Errorf("Async PostgreSQL query execution error: %v", err)
136+
}
137+
138+
j.Result = &result.Result{
139+
Columns: []*column.Column{{
140+
Name: "message",
141+
Type: column.Type("string"),
142+
}},
143+
Data: [][]any{{"All queries executed successfully"}},
144+
}
145+
return nil
146+
}
147+
148+
func splitAndTrimQueries(query string) []string {
149+
queries := []string{}
150+
for _, q := range strings.Split(query, ";") {
151+
q = strings.TrimSpace(q)
152+
if q != "" {
153+
queries = append(queries, q)
154+
}
155+
}
156+
return queries
157+
}
158+
159+
func (p *postgresCommandContext)Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error{
160+
// implement me
161+
return nil
162+
}

plugins/postgres/README.md

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# ⚡ PostgreSQL Plugin
2+
3+
The **PostgreSQL Plugin** enables Heimdall to run SQL queries on configured PostgreSQL databases. It supports direct SQL, SQL files, batch execution, and both synchronous and asynchronous modes.
4+
5+
---
6+
7+
## 🧩 Plugin Overview
8+
9+
* **Plugin Name:** `postgres`
10+
* **Execution Modes:** Synchronous (return_result: true) and Asynchronous (return_result: false)
11+
* **Use Case:** Running SQL queries (single or batch) against PostgreSQL databases
12+
13+
---
14+
15+
## ⚙️ Defining a Postgres Command
16+
17+
A Postgres command can specify execution mode and other preferences. Example:
18+
19+
```yaml
20+
- name: postgres-0.0.1
21+
status: active
22+
plugin: postgres
23+
version: 0.0.1
24+
description: Execute queries against PostgreSQL databases
25+
tags:
26+
- type:postgres
27+
cluster_tags:
28+
- type:postgres
29+
- data:local
30+
```
31+
32+
---
33+
34+
## 🖥️ Cluster Configuration
35+
36+
Each Postgres cluster must define a `connection_string`:
37+
38+
```yaml
39+
- name: postgres
40+
status: active
41+
version: 0.0.1
42+
description: PostgreSQL Production Database
43+
context:
44+
connection_string: "postgresql://user:password@host:port/database"
45+
tags:
46+
- type:postgres
47+
- data:local
48+
```
49+
50+
---
51+
52+
## 🚀 Submitting a Postgres Job
53+
54+
A Postgres job provides the SQL query to be executed, and can specify execution mode:
55+
56+
```json
57+
{
58+
"name": "run-pg-query",
59+
"version": "0.0.1",
60+
"command_criteria": [
61+
"type:postgres"
62+
],
63+
"cluster_criteria": [
64+
"data:local"
65+
],
66+
"context": {
67+
"query": "select * from employees limit 10;",
68+
"return_result": true
69+
}
70+
}
71+
```
72+
73+
---
74+
75+
## 📦 Job Context & Runtime
76+
77+
The Postgres plugin handles:
78+
79+
* Executing single or multiple SQL statements (batch)
80+
* Supporting both direct query strings and SQL file execution
81+
* Synchronous mode (`return_result: true`): returns query results, only one query allowed
82+
* Asynchronous mode (`return_result: false`): executes all queries, returns success or error
83+
84+
### Job Context Example
85+
86+
```yaml
87+
query: SELECT * FROM my_table # Required - SQL query to execute or path to .sql file
88+
return_result: true # Optional - Whether to return query results (default: false)
89+
```
90+
91+
### Cluster Context Example
92+
93+
```yaml
94+
connection_string: postgresql://user:password@host:port/database # Required
95+
```
96+
97+
---
98+
99+
## 📊 Returning Job Results
100+
101+
If enabled in the environment, Heimdall exposes query results via:
102+
103+
```
104+
GET /api/v1/job/<job_id>/result
105+
```
106+
107+
---
108+
109+
## 🧠 Best Practices
110+
111+
* Use synchronous mode for SELECT queries where results are needed
112+
* Use asynchronous mode for DDL/DML or batch operations
113+
* Always secure your connection strings and database credentials
114+
* Use `type:postgres` tags to isolate command and cluster matching
115+
* Use SQL files for large or complex batch operations

plugins/postgres/postgres.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package main
2+
3+
import (
4+
"github.com/patterninc/heimdall/internal/pkg/object/command/postgres"
5+
"github.com/patterninc/heimdall/pkg/context"
6+
"github.com/patterninc/heimdall/pkg/plugin"
7+
)
8+
9+
// New creates a new instance of the postgres plugin.
10+
func New(c *context.Context) (plugin.Handler, error) {
11+
return postgres.New(c)
12+
}

0 commit comments

Comments
 (0)