Skip to content

Commit a6b8081

Browse files
committed
feat: database backend add ssdb support
Change-Id: I054c5fc9b02f613601781de8613d684faa0ea7f2
1 parent e90ac67 commit a6b8081

12 files changed

Lines changed: 856 additions & 4 deletions

File tree

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,4 @@ List of contributors, in chronological order:
5252
* Steven Stone (https://github.com/smstone)
5353
* Josh Bayfield (https://github.com/jbayfield)
5454
* Boxjan (https://github.com/boxjan)
55+
* goldendeng (https://github.com/goldendeng)

context/context.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ package context
33

44
import (
55
gocontext "context"
6+
"errors"
67
"fmt"
78
"math/rand"
9+
"net/url"
810
"os"
911
"os/signal"
1012
"path/filepath"
1113
"runtime"
1214
"runtime/pprof"
15+
"strconv"
1316
"strings"
1417
"sync"
1518
"time"
@@ -19,6 +22,7 @@ import (
1922
"github.com/aptly-dev/aptly/console"
2023
"github.com/aptly-dev/aptly/database"
2124
"github.com/aptly-dev/aptly/database/goleveldb"
25+
"github.com/aptly-dev/aptly/database/ssdb"
2226
"github.com/aptly-dev/aptly/deb"
2327
"github.com/aptly-dev/aptly/files"
2428
"github.com/aptly-dev/aptly/http"
@@ -27,6 +31,7 @@ import (
2731
"github.com/aptly-dev/aptly/swift"
2832
"github.com/aptly-dev/aptly/task"
2933
"github.com/aptly-dev/aptly/utils"
34+
"github.com/seefan/gossdb/v2/conf"
3035
"github.com/smira/commander"
3136
"github.com/smira/flag"
3237
)
@@ -287,7 +292,32 @@ func (context *AptlyContext) _database() (database.Storage, error) {
287292
if context.database == nil {
288293
var err error
289294

290-
context.database, err = goleveldb.NewDB(context.dbPath())
295+
if context.config().DatabaseBackend.Type == "leveldb" {
296+
if context.config().DatabaseBackend.DbPath != "" {
297+
dbPath := filepath.Join(context.config().RootDir, context.config().DatabaseBackend.DbPath)
298+
context.database, err = goleveldb.NewDB(dbPath)
299+
} else {
300+
return nil, errors.New("leveldb databaseBackend config invalid")
301+
}
302+
} else if context.config().DatabaseBackend.Type == "ssdb" {
303+
var cfg conf.Config
304+
u, e := url.Parse(context.config().DatabaseBackend.URL)
305+
306+
if e != nil {
307+
return nil, e
308+
}
309+
cfg.Port, e = strconv.Atoi(u.Port())
310+
cfg.Host = strings.Split(u.Host, ":")[0]
311+
if e != nil {
312+
return nil, e
313+
}
314+
password, _ := u.User.Password()
315+
cfg.Password = password
316+
context.database, err = ssdb.NewOpenDB(&cfg)
317+
} else {
318+
context.database, err = goleveldb.NewDB(context.dbPath())
319+
}
320+
291321
if err != nil {
292322
return nil, fmt.Errorf("can't instantiate database: %s", err)
293323
}

database/ssdb/batch.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package ssdb
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/aptly-dev/aptly/database"
7+
"github.com/seefan/gossdb/v2/conf"
8+
"github.com/seefan/gossdb/v2/pool"
9+
)
10+
11+
type bWriteData struct {
12+
key []byte
13+
value []byte
14+
opts string
15+
err error
16+
}
17+
18+
type SsdbBatch struct {
19+
cfg *conf.Config
20+
// key-value chan
21+
w chan bWriteData
22+
p map[string]interface{}
23+
d []string
24+
db *pool.Client
25+
}
26+
27+
// func internalOpenBatch...
28+
func internalOpenBatch(t database.Storage) *SsdbBatch {
29+
b := &SsdbBatch{
30+
w: make(chan bWriteData),
31+
p: make(map[string]interface{}),
32+
}
33+
b.run()
34+
35+
return b
36+
}
37+
38+
func (b *SsdbBatch) run() {
39+
go func() {
40+
for {
41+
select {
42+
case w, ok := <-b.w:
43+
{
44+
if !ok {
45+
ssdb_log("ssdb batch write chan closed")
46+
return
47+
}
48+
49+
if w.opts == "write" {
50+
ssdb_log("ssdb batch write")
51+
var err error
52+
if len(b.p) > 0 && len(b.d) == 0 {
53+
err = b.db.MultiSet(b.p)
54+
ssdb_log("ssdb batch set errinfo: ", err)
55+
} else if len(b.d) > 0 && len(b.p) == 0 {
56+
err = b.db.MultiDel(b.d...)
57+
ssdb_log("ssdb batch del errinfo: ", err)
58+
} else if len(b.p) == 0 && len(b.d) == 0 {
59+
err = nil
60+
} else {
61+
err = fmt.Errorf("ssdb batch does not support both put and delete operations")
62+
}
63+
ssdb_log("ssdb batch write errinfo: ", err)
64+
b.w <- bWriteData{
65+
err: err,
66+
}
67+
ssdb_log("ssdb batch write end")
68+
} else {
69+
ssdb_log("ssdb batch", w.opts)
70+
if w.opts == "put" {
71+
b.p[string(w.key)] = w.value
72+
} else if w.opts == "del" {
73+
b.d = append(b.d, string(w.key))
74+
}
75+
}
76+
}
77+
}
78+
}
79+
}()
80+
}
81+
82+
func (b *SsdbBatch) stop() {
83+
ssdb_log("ssdb batch stop")
84+
close(b.w)
85+
}
86+
87+
func (b *SsdbBatch) Put(key, value []byte) (err error) {
88+
// err = b.db.Set(string(key), string(value))
89+
w := bWriteData{
90+
key: key,
91+
value: value,
92+
opts: "put",
93+
}
94+
95+
b.w <- w
96+
return nil
97+
}
98+
99+
func (b *SsdbBatch) Delete(key []byte) (err error) {
100+
/* err = b.db.Del(string(key))
101+
return */
102+
w := bWriteData{
103+
key: key,
104+
opts: "del",
105+
}
106+
107+
b.w <- w
108+
return nil
109+
}
110+
111+
func (b *SsdbBatch) Write() (err error) {
112+
defer b.stop()
113+
w := bWriteData{
114+
opts: "write",
115+
}
116+
117+
b.w <- w
118+
result := <-b.w
119+
return result.err
120+
}
121+
122+
// batch should implement database.Batch
123+
var (
124+
_ database.Batch = &SsdbBatch{}
125+
)

database/ssdb/database.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package ssdb
2+
3+
import (
4+
"os"
5+
"strconv"
6+
7+
"github.com/aptly-dev/aptly/database"
8+
"github.com/seefan/gossdb/v2"
9+
"github.com/seefan/gossdb/v2/conf"
10+
"github.com/seefan/gossdb/v2/pool"
11+
)
12+
13+
var defaultBufSize = 102400
14+
var defaultPoolSize = 1
15+
16+
func internalOpen(cfg *conf.Config) (*pool.Client, error) {
17+
ssdb_log("internalOpen")
18+
19+
cfg.ReadBufferSize = defaultBufSize
20+
cfg.WriteBufferSize = defaultBufSize
21+
cfg.MaxPoolSize = defaultPoolSize
22+
cfg.PoolSize = defaultPoolSize
23+
cfg.MinPoolSize = defaultPoolSize
24+
cfg.MaxWaitSize = 100 * defaultPoolSize
25+
cfg.RetryEnabled = true
26+
27+
//override by env
28+
if os.Getenv("SSDB_READBUFFERSIZE") != "" {
29+
readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE"))
30+
if err != nil {
31+
cfg.ReadBufferSize = readBufSize
32+
}
33+
}
34+
35+
if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" {
36+
writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE"))
37+
if err != nil {
38+
cfg.WriteBufferSize = writeBufSize
39+
}
40+
}
41+
42+
var cfgs = []*conf.Config{cfg}
43+
err := gossdb.Start(cfgs...)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
return gossdb.NewClient()
49+
}
50+
51+
func NewDB(cfg *conf.Config) (database.Storage, error) {
52+
return &SsdbStorage{cfg: cfg}, nil
53+
}
54+
55+
func NewOpenDB(cfg *conf.Config) (database.Storage, error) {
56+
db, err := NewDB(cfg)
57+
if err != nil {
58+
return nil, err
59+
}
60+
61+
return db, db.Open()
62+
}

0 commit comments

Comments
 (0)