-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_migration.go
More file actions
180 lines (141 loc) · 5.08 KB
/
file_migration.go
File metadata and controls
180 lines (141 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// SPDX-FileCopyrightText: 2026 The DMorph contributors.
// SPDX-License-Identifier: MPL-2.0
package dmorph
import (
"bufio"
"bytes"
"context"
"database/sql"
"fmt"
"io"
"io/fs"
"log/slog"
"os"
"path/filepath"
"regexp"
"strings"
)
// FileMigration implements the Migration interface. It helps to apply migrations from a file or fs.FS.
type FileMigration struct {
Name string
FS fs.FS
migrationFunc func(ctx context.Context, tx *sql.Tx, migration string) error
}
// Key returns the key of the migration to register in the migration table.
func (f FileMigration) Key() string {
return f.Name
}
// Migrate executes the migration on the given transaction.
func (f FileMigration) Migrate(ctx context.Context, tx *sql.Tx) error {
return f.migrationFunc(ctx, tx, f.Name)
}
// WithMigrationFromFile generates a FileMigration that will run the content of the given file.
func WithMigrationFromFile(name string) MorphOption {
return func(morpher *Morpher) error {
morpher.Migrations = append(morpher.Migrations, FileMigration{
Name: name,
migrationFunc: func(ctx context.Context, tx *sql.Tx, migration string) error {
m, mErr := os.Open(filepath.Clean(migration))
if mErr != nil {
return wrapIfError("could not open file "+migration, mErr)
}
defer func() { _ = m.Close() }()
return applyStepsStream(ctx, tx, m, migration, morpher.Log)
},
})
return nil
}
}
// WithMigrationFromFileFS generates a FileMigration that will run the content of the given file from the
// given filesystem.
func WithMigrationFromFileFS(name string, dir fs.FS) MorphOption {
return func(morpher *Morpher) error {
morpher.Migrations = append(morpher.Migrations, migrationFromFileFS(name, dir, morpher.Log))
return nil
}
}
// WithMigrationsFromFS generates a FileMigration that will run all migration scripts of the files in the given
// filesystem.
func WithMigrationsFromFS(d fs.FS) MorphOption {
return func(morpher *Morpher) error {
dirEntry, err := fs.ReadDir(d, ".")
if err == nil {
for _, entry := range dirEntry {
morpher.Log.Info("entry", slog.String("name", entry.Name()))
if entry.Type().IsRegular() && strings.HasSuffix(entry.Name(), ".sql") {
morpher.Migrations = append(morpher.Migrations,
migrationFromFileFS(entry.Name(), d, morpher.Log))
}
}
}
return wrapIfError("could not read directory", err)
}
}
// migrationFromFileFS creates a FileMigration instance for a specific migration file from a fs.FS directory.
func migrationFromFileFS(name string, dir fs.FS, log *slog.Logger) FileMigration {
return FileMigration{
Name: name,
FS: dir,
migrationFunc: func(ctx context.Context, tx *sql.Tx, migration string) error {
m, mErr := dir.Open(migration)
if mErr != nil {
return wrapIfError("could not open file migration", mErr)
}
defer func() { _ = m.Close() }()
return applyStepsStream(ctx, tx, m, migration, log)
},
}
}
// applyStepsStream executes database migration steps read from an io.Reader, separated by semicolons, in a transaction.
// Returns the corresponding error if any step execution fails. Also, as some database drivers or engines seem to not
// support comments, leading comments are removed. This function does not undertake efforts to scan the SQL to find
// other comments. Such leading comments telling what a step is going to do, work. But comments in the middle of a
// statement will not be removed. At least with SQLite this will lead to hard-to-find errors.
func applyStepsStream(ctx context.Context, tx *sql.Tx, r io.Reader, migrationID string, log *slog.Logger) error {
const InitialScannerBufSize = 64 * 1024
const MaxScannerBufSize = 1024 * 1024
// The regex is here, as we do not expect to have overwhelming lots of calls even during larger migrations.
// No need to pollute the global namespace.
initialEmptyRegex := regexp.MustCompile(`^\s*(?:--.*)?$`)
buf := bytes.Buffer{}
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, InitialScannerBufSize), MaxScannerBufSize)
newStep := true
var step int
for step = 0; scanner.Scan(); {
if newStep && initialEmptyRegex.MatchString(scanner.Text()) {
// skip leading comments
continue
}
if scanner.Text() == ";" {
log.Info("migration step",
slog.String("migrationID", migrationID),
slog.Int("step", step),
)
if _, err := tx.ExecContext(ctx, buf.String()); err != nil {
return fmt.Errorf("apply migration %q step %d: %w", migrationID, step, err)
}
buf.Reset()
newStep = true
step++
continue
}
// Append the current line (preserve formatting by adding a newline between lines)
if buf.Len() > 0 {
buf.WriteByte('\n')
}
buf.Write(scanner.Bytes())
newStep = false
}
// cleanup after, for the final statement without the closing `;` on a new line
if buf.Len() > 0 {
log.Info("migration step",
slog.String("migrationID", migrationID),
slog.Int("step", step),
)
if _, err := tx.ExecContext(ctx, buf.String()); err != nil {
return fmt.Errorf("apply migration %q step %d (final): %w", migrationID, step, err)
}
}
return wrapIfError("scanner error", scanner.Err())
}