Skip to content

Commit 458a370

Browse files
authored
Merge pull request #1 from gigapi/feat/layers
data layers support
2 parents 7a312fb + 9c0c2da commit 458a370

21 files changed

Lines changed: 1202 additions & 429 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
/.idea/
2+
_testdata

json_db_index.go

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,58 +9,83 @@ import (
99
)
1010

1111
type jsonDBIndex struct {
12-
root string
12+
layers []jsonLayer
1313
}
1414

15-
/*func NewJSONDBIndex(root string) DBIndex {
15+
func NewJSONDBIndex(layers []Layer) DBIndex {
16+
jLayers := make([]jsonLayer, len(layers))
17+
for i, layer := range layers {
18+
jLayers[i] = layer2JsonLayer(layer)
19+
}
1620
return &jsonDBIndex{
17-
root: root,
21+
layers: jLayers,
1822
}
19-
}*/
23+
}
2024

2125
func (j *jsonDBIndex) Databases() ([]string, error) {
22-
ents, err := os.ReadDir(j.root)
23-
if err != nil {
24-
return nil, err
25-
}
26-
var res []string
27-
for _, ent := range ents {
28-
if ent.IsDir() {
29-
res = append(res, ent.Name())
26+
res := map[string]bool{}
27+
for _, l := range j.layers {
28+
ents, err := os.ReadDir(path.Join(l.Path))
29+
if errors.Is(err, os.ErrNotExist) {
30+
continue
31+
}
32+
if err != nil {
33+
return nil, err
3034
}
35+
for _, ent := range ents {
36+
if ent.IsDir() {
37+
res[ent.Name()] = true
38+
}
39+
}
40+
}
41+
_res := make([]string, 0, len(res))
42+
for k := range res {
43+
_res = append(_res, k)
3144
}
32-
return res, nil
45+
return _res, nil
3346
}
3447

3548
func (j *jsonDBIndex) Tables(database string) ([]string, error) {
36-
ents, err := os.ReadDir(path.Join(j.root, database))
37-
if errors.Is(err, os.ErrNotExist) {
38-
return nil, nil
39-
}
40-
if err != nil {
41-
return nil, err
42-
}
43-
var res []string
44-
for _, ent := range ents {
45-
if ent.IsDir() {
46-
res = append(res, ent.Name())
49+
res := map[string]bool{}
50+
for _, l := range j.layers {
51+
ents, err := os.ReadDir(path.Join(l.Path, database))
52+
if errors.Is(err, os.ErrNotExist) {
53+
continue
54+
}
55+
if err != nil {
56+
return nil, err
57+
}
58+
for _, ent := range ents {
59+
if ent.IsDir() {
60+
res[ent.Name()] = true
61+
}
4762
}
4863
}
49-
return res, nil
64+
_res := make([]string, 0, len(res))
65+
for k := range res {
66+
_res = append(_res, k)
67+
}
68+
return _res, nil
5069
}
5170

52-
func (j *jsonDBIndex) Paths(database string, table string) []string {
53-
root := path.Join(j.root, database, table)
54-
var res []string
55-
filepath.Walk(path.Join(j.root, database, table), func(path string, info os.FileInfo, err error) error {
56-
if !info.IsDir() {
71+
func (j *jsonDBIndex) Paths(database string, table string) ([]string, error) {
72+
res := map[string]bool{}
73+
for _, l := range j.layers {
74+
root := path.Join(l.Path, database, table)
75+
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
76+
if !info.IsDir() {
77+
return nil
78+
}
79+
if strings.HasPrefix(info.Name(), "hour=") {
80+
res[path[len(root)+1:]] = true
81+
return filepath.SkipDir
82+
}
5783
return nil
58-
}
59-
if strings.HasPrefix(info.Name(), "hour=") {
60-
res = append(res, path[len(root):])
61-
return filepath.SkipDir
62-
}
63-
return nil
64-
})
65-
return res
84+
})
85+
}
86+
_res := make([]string, 0, len(res))
87+
for k := range res {
88+
_res = append(_res, k)
89+
}
90+
return _res, nil
6691
}

json_drop_planner.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package metadata
2+
3+
import "path"
4+
5+
func (J *JSONIndex) GetDropQueue(writerId string, layer string) (DropPlan, error) {
6+
parts := J.parts[layer]
7+
if parts == nil {
8+
return DropPlan{}, nil
9+
}
10+
for _, idx := range parts {
11+
p, err := idx.GetDropPlanner().GetDropQueue(writerId, layer)
12+
if err != nil {
13+
return DropPlan{}, err
14+
}
15+
if p.Path != "" {
16+
return p, nil
17+
}
18+
}
19+
return DropPlan{}, nil
20+
}
21+
22+
func (J *JSONIndex) GetDropPlanner() TableDropPlanner {
23+
return J
24+
}
25+
26+
func (J *JSONIndex) RmFromDropQueue(plan DropPlan) Promise[int32] {
27+
l := J.parts[plan.Layer]
28+
if l == nil {
29+
return Fulfilled(nil, int32(0))
30+
}
31+
dir := path.Dir(plan.Path)
32+
part := l[dir]
33+
if part != nil {
34+
return part.RmFromDropQueue(plan)
35+
}
36+
return Fulfilled(nil, int32(0))
37+
}

0 commit comments

Comments
 (0)