Skip to content

Commit 2bc4949

Browse files
select-insert-logic (#620)
* select-insert-logic Summary: - Support for `INSERT.. SELECT.. [LEFT OUTER JOIN]`. - Added robot test `Insert Select Negative Returning Star`. - Added robot test `Insert Select Positive Returning Star`. - Added robot test `Insert Left Outer Join Positive Returning Star`. - Added robot test `Insert Left Outer Join Negative Returning Star`. * - Fixed robot.
1 parent 28745fa commit 2bc4949

23 files changed

Lines changed: 485 additions & 35 deletions

File tree

.vscode/launch.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@
193193
"select split_part(rings.name, '/', -1) from google.cloudkms.key_rings rings inner join google.cloudkms.crypto_keys keys on keys.keyRingsId = split_part(rings.name, '/', -1) and keys.projectsId = 'testing-project' where rings.projectsId = 'testing-project' and rings.locationsId = 'global' and keys.locationsId = 'global';",
194194
"select * from local_openssl.keys.x509 where cert_file = '${workspaceFolder}/test/assets/input/manual_cert.pem';",
195195
"select t1.id as t1_id, t2.id as t2_id from (select 'my-id' as id) t1 left outer join (select a.vpcId id from aws.ec2_native.vpcs a where region = 'ap-southeast-2') t2 on t1.id = t2.id;",
196+
"insert into aws.ec2.vpcs(CidrBlock, region) select '10.0.0.0/16', 'ap-southeast-2' where 0 = 1 returning *;",
197+
"insert into aws.ec2.vpcs(CidrBlock, region) select '10.0.0.0/16', 'ap-southeast-2' where 1 = 1 returning *;",
198+
"select lhs.proj, lhs.bucket from (select 'testing-project' as proj, 'silly-bucket' as bucket) lhs LEFT OUTER join (select name from google.storage.buckets where project = 'testing-project') rhs on lhs.bucket = rhs.name where rhs.name;",
199+
"insert into google.storage.buckets( project, data__name) select lhs.proj, lhs.bucket from (select 'testing-project' as proj, 'silly-bucket' as bucket) lhs LEFT OUTER join (select name from google.storage.buckets where project = 'testing-project') rhs on lhs.bucket = rhs.name where rhs.name is null returning *;",
196200
],
197201
"default": "show providers;"
198202
},

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ require (
1818
github.com/spf13/cobra v1.4.0
1919
github.com/spf13/pflag v1.0.5
2020
github.com/spf13/viper v1.10.1
21-
github.com/stackql/any-sdk v0.5.1-alpha01
21+
github.com/stackql/any-sdk v0.5.1-alpha03
2222
github.com/stackql/go-suffix-map v0.0.1-alpha01
2323
github.com/stackql/psql-wire v0.1.2-alpha01
2424
github.com/stackql/stackql-parser v0.0.16-alpha01
2525
github.com/stretchr/testify v1.10.0
2626
golang.org/x/mod v0.25.0
2727
golang.org/x/sync v0.15.0
28-
golang.org/x/tools v0.34.0
2928
gonum.org/v1/gonum v0.15.1
3029
gopkg.in/yaml.v2 v2.4.0
3130
gotest.tools v2.2.0+incompatible
@@ -131,6 +130,7 @@ require (
131130
golang.org/x/sys v0.33.0 // indirect
132131
golang.org/x/term v0.32.0 // indirect
133132
golang.org/x/text v0.26.0 // indirect
133+
golang.org/x/tools v0.34.0 // indirect
134134
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
135135
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
136136
google.golang.org/grpc v1.67.1 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
467467
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
468468
github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk=
469469
github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU=
470-
github.com/stackql/any-sdk v0.5.1-alpha01 h1:HHzdjSsR981GaqoZ1CAmRuVAfZZeMEunsLhcrVtCw/M=
471-
github.com/stackql/any-sdk v0.5.1-alpha01/go.mod h1:ZRTGrcDKFLaC+5yWo4OqXVs1HTNxgYM9nQsQDlq0Fe0=
470+
github.com/stackql/any-sdk v0.5.1-alpha03 h1:zYVr/ysJ26Tb1JCl/49MymMsHH+5dC2iGircAJhGbIg=
471+
github.com/stackql/any-sdk v0.5.1-alpha03/go.mod h1:ZRTGrcDKFLaC+5yWo4OqXVs1HTNxgYM9nQsQDlq0Fe0=
472472
github.com/stackql/go-suffix-map v0.0.1-alpha01 h1:TDUDS8bySu41Oo9p0eniUeCm43mnRM6zFEd6j6VUaz8=
473473
github.com/stackql/go-suffix-map v0.0.1-alpha01/go.mod h1:QAi+SKukOyf4dBtWy8UMy+hsXXV+yyEE4vmBkji2V7g=
474474
github.com/stackql/psql-wire v0.1.2-alpha01 h1:RMBRURGspmSNqm2/sgoEc+D6Sri2y/3drjl4nKlOOi4=

internal/stackql/astanalysis/earlyanalysis/ast_expand.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,10 @@ func (v *indirectExpandAstVisitor) Visit(node sqlparser.SQLNode) error {
334334
if err != nil {
335335
return err
336336
}
337+
err = node.Rows.Accept(v)
338+
if err != nil {
339+
return err
340+
}
337341

338342
case *sqlparser.Update:
339343
v.mutateCount++

internal/stackql/astvisit/placeholder_param_extract.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ func (v *standardParserPlaceholderParamAstVisitor) Visit(node sqlparser.SQLNode)
141141
return node.Table.Accept(v)
142142

143143
case *sqlparser.Insert:
144+
err := v.Visit(node.Rows)
145+
if err != nil {
146+
return err
147+
}
144148
if len(node.Columns) > 0 {
145149
err := node.Columns.Accept(v)
146150
if err != nil {

internal/stackql/astvisit/table_alias_pairing.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ func (v *standardParserTableAliasPairingAstVisitor) Visit(node sqlparser.SQLNode
175175
return node.Table.Accept(v)
176176

177177
case *sqlparser.Insert:
178+
return v.Visit(node.Rows)
178179

179180
case *sqlparser.Update:
180181

internal/stackql/astvisit/table_extract.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func (v *standardParserTableExtractAstVisitor) Visit(node sqlparser.SQLNode) err
138138
return node.Table.Accept(v)
139139

140140
case *sqlparser.Insert:
141+
return v.Visit(node.Rows)
141142

142143
case *sqlparser.Update:
143144

internal/stackql/dependencyplanner/dependencyplanner.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/stackql/stackql/internal/stackql/docparser"
1818
"github.com/stackql/stackql/internal/stackql/drm"
1919
"github.com/stackql/stackql/internal/stackql/handler"
20+
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/builder_input"
2021
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
2122
"github.com/stackql/stackql/internal/stackql/parserutil"
2223
"github.com/stackql/stackql/internal/stackql/primitivebuilder"
@@ -514,17 +515,25 @@ func (dp *standardDependencyPlanner) orchestrate(
514515
nil,
515516
insPsc,
516517
nil,
518+
// this is the connector
517519
outStream,
518520
)
519521
} else {
522+
bldrInput := builder_input.NewBuilderInput(
523+
dp.primitiveComposer.GetGraphHolder(),
524+
dp.handlerCtx,
525+
annotationCtx.GetTableMeta(),
526+
)
527+
bldrInput.SetIsAwait(false) // returning hardcoded to false for now
520528
builder = primitivebuilder.NewSingleSelectAcquire(
521529
dp.primitiveComposer.GetGraphHolder(),
522530
dp.handlerCtx,
523531
rc,
524532
insPsc,
525533
nil,
534+
// this is the connector
526535
outStream,
527-
false, // returning hardcoded to false for now
536+
bldrInput,
528537
)
529538
}
530539
dp.execSlice = append(dp.execSlice, builder)
@@ -557,6 +566,7 @@ func (dp *standardDependencyPlanner) processAcquire(
557566
nil, annotationCtx.GetHIDs(),
558567
inputTableName,
559568
annotationCtx.GetTableMeta().GetAlias())
569+
// TODO: repeat this for mutations
560570
anTab.SetSQLDataSource(sqlDataSource)
561571
return anTab, dp.tcc, nil
562572
}

internal/stackql/execution/mono_valent_execution.go

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/stackql/stackql/internal/stackql/acid/binlog"
2424
"github.com/stackql/stackql/internal/stackql/drm"
2525
"github.com/stackql/stackql/internal/stackql/handler"
26+
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/builder_input"
2627
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
2728
"github.com/stackql/stackql/internal/stackql/primitive"
2829
"github.com/stackql/stackql/internal/stackql/primitivegraph"
@@ -84,6 +85,7 @@ type monoValentExecution struct {
8485
isMutation bool
8586
isAwait bool
8687
defaultHTTPClient *http.Client // for testing purposes only
88+
bldrInput builder_input.BuilderInput
8789
}
8890

8991
func NewMonoValentExecutorFactory(
@@ -97,6 +99,7 @@ func NewMonoValentExecutorFactory(
9799
isSkipResponse bool,
98100
isMutation bool,
99101
isAwait bool,
102+
bldrInput builder_input.BuilderInput,
100103
) MonoValentExecutorFactory {
101104
var tcc internaldto.TxnControlCounters
102105
if insertCtx != nil {
@@ -127,6 +130,7 @@ func NewMonoValentExecutorFactory(
127130
isAwait: isAwait,
128131
defaultHTTPClient: defaultHTTPClient,
129132
invoker: anysdkhttp.New(),
133+
bldrInput: bldrInput,
130134
}
131135
}
132136

@@ -1257,6 +1261,59 @@ func (sp *standardProcessor) Process() processorResponse {
12571261
return newHTTPProcessorResponse(nil, reversalStream, false, nil)
12581262
}
12591263

1264+
var (
1265+
_ formulation.BaseArmouryGenerator = (*reconsititutedarmouryGenerator)(nil)
1266+
)
1267+
1268+
type rawTransformer struct {
1269+
node sqlparser.SQLNode
1270+
insertValOnlyRows map[int]map[int]interface{}
1271+
}
1272+
1273+
func (rt *rawTransformer) Transform() (map[int]map[string]interface{}, error) {
1274+
return util.ExtractSQLNodeParams(rt.node, rt.insertValOnlyRows)
1275+
}
1276+
1277+
func newRawTransformer(
1278+
node sqlparser.SQLNode,
1279+
insertValOnlyRows map[int]map[int]interface{},
1280+
) *rawTransformer {
1281+
return &rawTransformer{
1282+
node: node,
1283+
insertValOnlyRows: insertValOnlyRows,
1284+
}
1285+
}
1286+
1287+
type reconsititutedarmouryGenerator struct {
1288+
prior formulation.BaseArmouryGenerator
1289+
updatedParams map[int]map[string]any
1290+
}
1291+
1292+
func newUpdatedArmouryGenerator(
1293+
prior formulation.BaseArmouryGenerator,
1294+
updatedParams map[int]map[string]any,
1295+
) formulation.BaseArmouryGenerator {
1296+
return &reconsititutedarmouryGenerator{
1297+
prior: prior,
1298+
updatedParams: updatedParams,
1299+
}
1300+
}
1301+
1302+
func (r *reconsititutedarmouryGenerator) GetHTTPArmoury() (formulation.HTTPArmoury, error) {
1303+
if len(r.updatedParams) == 0 {
1304+
return r.prior.GetHTTPArmoury()
1305+
}
1306+
armoury, err := r.prior.GetHTTPArmoury()
1307+
if err != nil {
1308+
return nil, err
1309+
}
1310+
updatedArmoury, err := armoury.MergeLateBindingMaps(r.updatedParams)
1311+
if err != nil {
1312+
return nil, err
1313+
}
1314+
return updatedArmoury, nil
1315+
}
1316+
12601317
//nolint:revive,nestif,funlen,gocognit // TODO: investigate
12611318
func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) internaldto.ExecutorOutput, error) {
12621319
prov, err := mv.tableMeta.GetProvider()
@@ -1280,6 +1337,62 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
12801337
return nil, authCtxErr
12811338
}
12821339
ex := func(pc primitive.IPrimitiveCtx) internaldto.ExecutorOutput {
1340+
requiredDepedencyKey, requiredKeyExists := mv.bldrInput.GetRequiredDataRequestKey()
1341+
// lateBindingData := map[int]map[string]any{}
1342+
1343+
rawData := map[int]map[int]any{}
1344+
if requiredKeyExists {
1345+
dataRequest, isRequestExist := mv.graphHolder.GetExecutorOutput(requiredDepedencyKey)
1346+
if !isRequestExist {
1347+
return internaldto.NewErroneousExecutorOutput(fmt.Errorf("required data request not found"))
1348+
}
1349+
requiredData := dataRequest.GetRawResult()
1350+
if requiredData == nil {
1351+
return internaldto.NewErroneousExecutorOutput(fmt.Errorf("nil required data"))
1352+
}
1353+
rowCount := 0
1354+
for res, err := requiredData.Read(); res != nil; {
1355+
rows, rowErr := res.GetMap()
1356+
if rowErr != nil {
1357+
return internaldto.NewErroneousExecutorOutput(fmt.Errorf("error reading required data: %w", rowErr))
1358+
}
1359+
for k, r := range rows {
1360+
rawData[k] = r
1361+
rowCount++
1362+
}
1363+
logging.GetLogger().Infoln(fmt.Sprintf("read %d rows for required data with key '%s'", len(rows), requiredDepedencyKey))
1364+
if err != nil {
1365+
if err == io.EOF {
1366+
break
1367+
}
1368+
return internaldto.NewErroneousExecutorOutput(fmt.Errorf("error reading required data: %w", err))
1369+
}
1370+
break // yes weird, the api should change
1371+
}
1372+
if rowCount == 0 {
1373+
return internaldto.NewNopEmptyExecutorOutput([]string{"operation skipped due to nil data"})
1374+
}
1375+
logging.GetLogger().Infoln(fmt.Sprintf("required data for key '%s' = %v", requiredDepedencyKey, rawData))
1376+
}
1377+
1378+
lateBindingData := map[int]map[string]any{}
1379+
var lateBindingErr error
1380+
if len(rawData) > 0 {
1381+
parserNode, _ := mv.bldrInput.GetParserNode()
1382+
transformer := newRawTransformer(
1383+
parserNode,
1384+
rawData,
1385+
)
1386+
lateBindingData, lateBindingErr = transformer.Transform()
1387+
if lateBindingErr != nil {
1388+
return internaldto.NewErroneousExecutorOutput(fmt.Errorf("error transforming required data: %w", lateBindingErr))
1389+
}
1390+
logging.GetLogger().Infoln(fmt.Sprintf("late binding data = %v", lateBindingData))
1391+
}
1392+
armouryGenerator := newUpdatedArmouryGenerator(
1393+
mv.tableMeta,
1394+
lateBindingData,
1395+
)
12831396
currentTcc := mv.insertPreparedStatementCtx.GetGCCtrlCtrs().Clone()
12841397
mv.graphHolder.AddTxnControlCounters(currentTcc)
12851398
mr := prov.InferMaxResultsElement(m)
@@ -1302,7 +1415,7 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
13021415
inlines[1:],
13031416
nil,
13041417
)
1305-
armoury, armouryErr := mv.tableMeta.GetHTTPArmoury()
1418+
armoury, armouryErr := armouryGenerator.GetHTTPArmoury()
13061419
if armouryErr != nil {
13071420
return internaldto.NewErroneousExecutorOutput(armouryErr)
13081421
}
@@ -1402,7 +1515,7 @@ func (mv *monoValentExecution) GetExecutor() (func(pc primitive.IPrimitiveCtx) i
14021515
case client.HTTP:
14031516
invRes, invErr := mv.invoker.Invoke(context.Background(), providerinvoker.Request{
14041517
Payload: formulation.NewPayload(
1405-
mv.tableMeta,
1518+
armouryGenerator,
14061519
provider,
14071520
m,
14081521
tableName,

internal/stackql/internal_data_transfer/builder_input/builder_input.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type BuilderInput interface {
4141
SetIsAwait(isAwait bool)
4242
SetCommentDirectives(commentDirectives sqlparser.CommentDirectives)
4343
SetIsUndo(isUndo bool)
44+
SetRequiredDataRequestKey(key string)
45+
GetRequiredDataRequestKey() (string, bool)
4446
SetDependencyNode(dependencyNode primitivegraph.PrimitiveNode)
4547
SetParserNode(node sqlparser.SQLNode)
4648
SetParamMap(paramMap map[int]map[string]interface{})
@@ -83,6 +85,7 @@ type builderInput struct {
8385
txnCtrlCtrs internaldto.TxnControlCounters
8486
tableInsertionContainer tableinsertioncontainer.TableInsertionContainer
8587
insertCtx drm.PreparedStatementCtx
88+
requiredDataRequestKey string
8689
}
8790

8891
func NewBuilderInput(
@@ -99,6 +102,14 @@ func NewBuilderInput(
99102
}
100103
}
101104

105+
func (bi *builderInput) GetRequiredDataRequestKey() (string, bool) {
106+
return bi.requiredDataRequestKey, bi.requiredDataRequestKey != ""
107+
}
108+
109+
func (bi *builderInput) SetRequiredDataRequestKey(key string) {
110+
bi.requiredDataRequestKey = key
111+
}
112+
102113
func (bi *builderInput) SetInsertCtx(insertCtx drm.PreparedStatementCtx) {
103114
bi.insertCtx = insertCtx
104115
}
@@ -272,21 +283,22 @@ func (bi *builderInput) SetIsUndo(isUndo bool) {
272283

273284
func (bi *builderInput) Clone() BuilderInput {
274285
return &builderInput{
275-
graphHolder: bi.graphHolder,
276-
handlerCtx: bi.handlerCtx,
277-
paramMap: bi.paramMap,
278-
tbl: bi.tbl,
279-
node: bi.node,
280-
dependencyNode: bi.dependencyNode,
281-
commentDirectives: bi.commentDirectives,
282-
isAwait: bi.isAwait,
283-
verb: bi.verb,
284-
inputAlias: bi.inputAlias,
285-
isUndo: bi.isUndo,
286-
isTargetPhysical: bi.isTargetPhysical,
287-
annotatedAst: bi.annotatedAst,
288-
txnCtrlCtrs: bi.txnCtrlCtrs,
289-
isReturning: bi.isReturning,
290-
insertCtx: bi.insertCtx,
286+
graphHolder: bi.graphHolder,
287+
handlerCtx: bi.handlerCtx,
288+
paramMap: bi.paramMap,
289+
tbl: bi.tbl,
290+
node: bi.node,
291+
dependencyNode: bi.dependencyNode,
292+
commentDirectives: bi.commentDirectives,
293+
isAwait: bi.isAwait,
294+
verb: bi.verb,
295+
inputAlias: bi.inputAlias,
296+
isUndo: bi.isUndo,
297+
isTargetPhysical: bi.isTargetPhysical,
298+
annotatedAst: bi.annotatedAst,
299+
txnCtrlCtrs: bi.txnCtrlCtrs,
300+
isReturning: bi.isReturning,
301+
insertCtx: bi.insertCtx,
302+
requiredDataRequestKey: bi.requiredDataRequestKey,
291303
}
292304
}

0 commit comments

Comments
 (0)