-
Notifications
You must be signed in to change notification settings - Fork 165
Expand file tree
/
Copy pathutils.go
More file actions
343 lines (285 loc) · 9.81 KB
/
utils.go
File metadata and controls
343 lines (285 loc) · 9.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package pipelines
import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"time"
"github.com/databricks/cli/bundle"
configresources "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/resources"
"github.com/databricks/cli/bundle/run"
"github.com/databricks/cli/libs/cmdio"
databricks "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)
// Copied from cmd/bundle/run.go
// promptResource prompts the user to select a pipeline.
// If filter is provided, only resources that pass the filter will be included.
func promptResource(ctx context.Context, b *bundle.Bundle, filters ...resources.Filter) (string, error) {
// Compute map of "Human readable name of resource" -> "resource key".
inv := make(map[string]string)
completions := resources.Completions(b, filters...)
for k, ref := range completions {
title := fmt.Sprintf("%s: %s", ref.Description.SingularTitle, ref.Resource.GetName())
inv[title] = k
}
key, err := cmdio.Select(ctx, inv, "Select a pipeline")
if err != nil {
return "", err
}
return key, nil
}
// autoSelectSinglePipeline checks if there's exactly one pipeline resource in the bundle and returns its key.
// Returns empty string if there's not exactly one pipeline.
func autoSelectSinglePipeline(b *bundle.Bundle) string {
completions := resources.Completions(b, isPipeline)
if len(completions) == 1 {
for key := range completions {
return key
}
}
return ""
}
// Copied from cmd/bundle/run.go
// resolveRunArgument resolves the resource key to run
// Returns the remaining arguments to pass to the runner, if applicable.
// When no arguments are specified, auto-selects a pipeline if there's exactly one,
// otherwise prompts the user to select a pipeline.
func resolveRunArgument(ctx context.Context, b *bundle.Bundle, args []string) (string, []string, error) {
if len(args) == 0 {
if key := autoSelectSinglePipeline(b); key != "" {
return key, args, nil
}
if cmdio.IsPromptSupported(ctx) {
key, err := promptResource(ctx, b, run.IsRunnable)
if err != nil {
return "", nil, err
}
return key, args, nil
}
}
if len(args) < 1 {
return "", nil, errors.New("expected a KEY of the resource to run")
}
return args[0], args[1:], nil
}
// resolvePipelineArgument auto-selects a pipeline if there's exactly one and no arguments are specified,
// otherwise prompts the user to select a pipeline.
func resolvePipelineArgument(ctx context.Context, b *bundle.Bundle, args []string) (string, error) {
if len(args) == 1 {
return args[0], nil
}
if key := autoSelectSinglePipeline(b); key != "" {
return key, nil
}
if cmdio.IsPromptSupported(ctx) {
return promptResource(ctx, b, run.IsRunnable, func(ref resources.Reference) bool {
_, ok := ref.Resource.(*configresources.Pipeline)
return ok
})
}
return "", errors.New("expected a KEY of the pipeline")
}
// resolvePipelineIdFromKey resolves a pipeline KEY to its actual pipeline ID.
// This function handles the bundle initialization, resource lookup, and pipeline ID extraction.
func resolvePipelineIdFromKey(ctx context.Context, b *bundle.Bundle, key string) (string, error) {
ref, err := resources.Lookup(b, key)
if err != nil {
return "", err
}
pipeline, ok := ref.Resource.(*configresources.Pipeline)
if !ok {
return "", fmt.Errorf("resource %s is not a pipeline", key)
}
pipelineId := pipeline.ID
if pipelineId == "" {
return "", fmt.Errorf("pipeline ID for pipeline %s is not found", ref.Key)
}
return pipelineId, nil
}
// Copied from cmd/bundle/run.go
// keyToRunner converts a resource key to a runner.
func keyToRunner(b *bundle.Bundle, arg string) (run.Runner, error) {
// Locate the resource to run.
ref, err := resources.Lookup(b, arg, run.IsRunnable)
if err != nil {
return nil, err
}
// Convert the resource to a runnable resource.
runner, err := run.ToRunner(b, ref)
if err != nil {
return nil, err
}
return runner, nil
}
type PipelineEventsResponse struct {
Events []pipelines.PipelineEvent `json:"events"`
NextPageToken string `json:"next_page_token,omitempty"`
}
type PipelineEventsQueryParams struct {
Filter string `json:"filter,omitempty"`
MaxResults int `json:"max_results,omitempty"`
PageToken string `json:"page_token,omitempty"`
OrderBy string `json:"order_by,omitempty"`
}
// fetchAllPipelineEvents retrieves pipeline events with optional SQL filtering and ordering.
// Necessary as current Go SDK endpoints don't support OrderBy parameter.
// Retrieves only one page of results, so the number of results is bound by the API's limit of results per page.
func fetchAllPipelineEvents(ctx context.Context, w *databricks.WorkspaceClient, pipelineID string, params *PipelineEventsQueryParams) ([]pipelines.PipelineEvent, error) {
maxResultsPerPage := 250
if params.MaxResults > maxResultsPerPage {
return nil, fmt.Errorf("number of results must be %d or less", maxResultsPerPage)
}
apiClient, err := client.New(w.Config)
if err != nil {
return nil, fmt.Errorf("failed to create API client: %w", err)
}
path := fmt.Sprintf("/api/2.0/pipelines/%s/events", pipelineID)
queryParams := map[string]string{}
if params.Filter != "" {
queryParams["filter"] = params.Filter
}
if params.MaxResults > 0 {
queryParams["max_results"] = strconv.Itoa(params.MaxResults)
}
if params.OrderBy != "" {
queryParams["order_by"] = params.OrderBy
}
var response PipelineEventsResponse
err = apiClient.Do(
ctx,
"GET",
path,
nil,
nil,
queryParams,
&response,
)
if err != nil {
return nil, fmt.Errorf("failed to fetch pipeline events: %w", err)
}
return response.Events, nil
}
// getMostRecentUpdateId fetches one page of updates for a given pipeline and returns the first update ID.
// Expects to receive updates in decreasing timestamp order, so the first update is the most recent.
func getMostRecentUpdateId(ctx context.Context, w *databricks.WorkspaceClient, pipelineID string) (string, error) {
request := pipelines.ListUpdatesRequest{
PipelineId: pipelineID,
}
response, err := w.Pipelines.ListUpdates(ctx, request)
if err != nil {
return "", err
}
updates := response.Updates
if len(updates) == 0 {
return "", errors.New("no updates")
}
return updates[0].UpdateId, nil
}
// parseAndFormatTimestamp parses a timestamp string and formats it to the pipeline events API format.
func parseAndFormatTimestamp(timestamp string) (string, error) {
if timestamp == "" {
return "", nil
}
t, err := time.Parse(time.RFC3339Nano, timestamp)
if err != nil {
return "", err
}
return t.Format("2006-01-02T15:04:05.000Z"), nil
}
// parseTimeToUnixMillis parses a time string and returns the number of milliseconds since epoch in UTC.
func parseTimeToUnixMillis(timeStr string) (*int64, error) {
if timeStr == "" {
return nil, nil
}
t, err := time.Parse(time.RFC3339Nano, timeStr)
if err != nil {
return nil, fmt.Errorf("invalid time format. Expected format: 2025-01-15T10:30:00Z (YYYY-MM-DDTHH:MM:SSZ), got: %s", timeStr)
}
unixMillis := t.UnixMilli()
return &unixMillis, nil
}
// updatesBefore returns all updates with CreationTime <= ts
// Assumes updates are sorted in descending order, largest CreationTime first.
func updatesBefore(updates []pipelines.UpdateInfo, ts int64) []pipelines.UpdateInfo {
// Binary search for index with CreationTime <= ts
idx, _ := slices.BinarySearchFunc(updates, ts, func(u pipelines.UpdateInfo, target int64) int {
if u.CreationTime <= target {
return 1
}
return -1
})
return updates[idx:]
}
// updatesAfter returns all updates with CreationTime >= ts
// Assumes updates are sorted in descending order, largest CreationTime first.
func updatesAfter(updates []pipelines.UpdateInfo, ts int64) []pipelines.UpdateInfo {
// Binary search for index with CreationTime < ts
idx, _ := slices.BinarySearchFunc(updates, ts, func(u pipelines.UpdateInfo, target int64) int {
if u.CreationTime < target {
return 1
}
return -1
})
return updates[:idx]
}
// filterUpdates filters for updates within the startTime and endTime,
// assuming updates are in descending order, largest CreationTime first.
// Time is in milliseconds since epoch. If time is nil, it is ignored.
func filterUpdates(updates []pipelines.UpdateInfo, startTime, endTime *int64) ([]pipelines.UpdateInfo, error) {
if (endTime == nil || updates[0].CreationTime <= *endTime) && (startTime == nil || updates[len(updates)-1].CreationTime >= *startTime) {
return updates, nil
}
if startTime != nil && updates[0].CreationTime < *startTime {
return nil, nil
}
if endTime != nil && updates[len(updates)-1].CreationTime > *endTime {
return nil, nil
}
if startTime != nil {
updates = updatesAfter(updates, *startTime)
}
if endTime != nil {
updates = updatesBefore(updates, *endTime)
}
return updates, nil
}
// fetchPipelineUpdates fetches pipeline updates with optional filtering by time.
// Time is in milliseconds since epoch. If time is nil, it is ignored.
func fetchPipelineUpdates(ctx context.Context, w *databricks.WorkspaceClient, startTime, endTime *int64, pipelineId string) ([]pipelines.UpdateInfo, error) {
var updates []pipelines.UpdateInfo
var pageToken string
for {
request := pipelines.ListUpdatesRequest{
PipelineId: pipelineId,
}
if pageToken != "" {
request.PageToken = pageToken
}
response, err := w.Pipelines.ListUpdates(ctx, request)
if err != nil {
return nil, err
}
filteredUpdates, err := filterUpdates(response.Updates, startTime, endTime)
if err != nil {
return nil, err
}
updates = append(updates, filteredUpdates...)
if response.NextPageToken == "" {
break
}
pageToken = response.NextPageToken
}
return updates, nil
}
func isPipeline(ref resources.Reference) bool {
switch ref.Resource.(type) {
case *configresources.Pipeline:
return true
default:
return false
}
}