diff --git a/compaction/merge.go b/compaction/merge.go new file mode 100644 index 00000000..928b3044 --- /dev/null +++ b/compaction/merge.go @@ -0,0 +1,162 @@ +package compaction + +import ( + "errors" + "os" + + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/indexwriter" +) + +func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.PreloadedData, error) { + writer := indexwriter.New(params) + src := NewMergeSource(filename, srcs) + + if err := createAndWrite( + filename+consts.OffsetsTmpFileSuffix, + filename+consts.OffsetsFileSuffix, + func(f *os.File) error { return writer.WriteOffsetsFile(f, src) }, + ); err != nil { + return nil, err + } + + if err := createAndWrite( + filename+consts.IDTmpFileSuffix, + filename+consts.IDFileSuffix, + func(f *os.File) error { return writer.WriteIDFile(f, src) }, + ); err != nil { + return nil, err + } + + if err := createAndWriteBoth( + filename+consts.TokenTmpFileSuffix, + filename+consts.TokenFileSuffix, + filename+consts.LIDTmpFileSuffix, + filename+consts.LIDFileSuffix, + func(tf, lf *os.File) error { return writer.WriteTokenTriplet(tf, lf, src) }, + ); err != nil { + return nil, err + } + + if err := createAndWrite( + filename+consts.InfoTmpFileSuffix, + filename+consts.InfoFileSuffix, + func(f *os.File) error { return writer.WriteInfoFile(f, src) }, + ); err != nil { + return nil, err + } + + if err := mergeDocs(filename, srcs...); err != nil { + return nil, err + } + + info := src.Info() + info.IndexOnDisk = 0 + + for _, suffix := range []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, + } { + st, err := os.Stat(info.Path + suffix) + if err != nil { + return nil, err + } + info.IndexOnDisk += uint64(st.Size()) + } + + lidsTable := writer.LIDsTable() + preloaded := &sealed.PreloadedData{ + Info: info, + TokenTable: writer.TokenTable(), + BlocksData: sealed.BlocksData{ + LIDsTable: &lidsTable, + IDsTable: writer.IDsTable(), + BlocksOffsets: src.BlockOffsets(), + }, + } + + return preloaded, nil +} + +func mergeDocs(filename string, srcs ...Source) error { + return createAndWrite( + filename+consts.DocsTmpFileSuffix, + filename+consts.DocsFileSuffix, + func(f *os.File) error { + var docsSize uint64 + for _, src := range srcs { + for loc, err := range src.DocBlock() { + if err != nil { + return err + } + + payload, offset := loc.First, loc.Second + if _, err := f.WriteAt(payload, int64(offset+docsSize)); err != nil { + return err + } + } + + docsSize += src.Info().DocsOnDisk + } + + return nil + }, + ) +} + +func syncAndClose(f *os.File) error { + if err := f.Sync(); err != nil { + f.Close() + return err + } + return f.Close() +} + +func createAndWrite( + tmp, final string, + write func(*os.File) error, +) error { + f, err := os.Create(tmp) + if err != nil { + return err + } + + if err := errors.Join(write(f), syncAndClose(f)); err != nil { + return err + } + + return os.Rename(tmp, final) +} + +func createAndWriteBoth( + atmp, afinal, + btmp, bfinal string, + write func(*os.File, *os.File) error, +) error { + a, err := os.Create(atmp) + if err != nil { + return err + } + + b, err := os.Create(btmp) + if err != nil { + a.Close() + return err + } + + writeErr := write(a, b) + if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil { + return err + } + + if err := os.Rename(atmp, afinal); err != nil { + return err + } + + return os.Rename(btmp, bfinal) +} diff --git a/compaction/merge_source.go b/compaction/merge_source.go new file mode 100644 index 00000000..f2e49da7 --- /dev/null +++ b/compaction/merge_source.go @@ -0,0 +1,445 @@ +package compaction + +import ( + "bytes" + "iter" + "slices" + "sync" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/indexwriter" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +type ( + Document = util.Pair[seq.ID, []byte] + DocBlockLocation = util.Pair[[]byte, uint64] + TokenPosting = util.Pair[[]byte, []uint32] + DocLocation = util.Pair[seq.ID, seq.DocPos] + IndexedDocBlock = util.Pair[[]byte, []seq.DocPos] +) + +type Source interface { + indexwriter.Source + DocBlock() iter.Seq2[DocBlockLocation, error] +} + +type MergeSource struct { + filename string + + // sources is a slice of [sealing.Source] + // which provide view into underlying fractions. + sources []Source + + info *common.Info + infoOnce sync.Once + + offsets []uint64 + offsetsOnce sync.Once + + // docBlockCount is populated during [MergeSource.BlockOffsets] call. + // This slice is used for changing block indexes in [seq.DocPos]. + docBlockCount []int + + // lidMapping describes the transformation of lids + // after k-merge of several fractions. + // + // i-th index of [lidMapping] correponds to i-th fraction. + // j-th index of i-th [lidMapping] corresponds to rename of j-th lid. + lidMapping [][]uint32 +} + +func NewMergeSource(filename string, sources []Source) *MergeSource { + lidMapping := make([][]uint32, len(sources)) + + for i, src := range sources { + lidMapping[i] = make( + []uint32, + // Increment for [seq.SystemID]. + src.Info().DocsTotal+1, + ) + } + + s := &MergeSource{ + filename: filename, + sources: sources, + lidMapping: lidMapping, + } + + s.info = s.prepareInfo() + return s +} + +func (s *MergeSource) prepareInfo() *common.Info { + info := common.NewInfo(s.filename, 0, 0) + + var ( + from seq.MID = seq.MaxID.MID + to seq.MID = seq.MinID.MID + ) + + for _, src := range s.sources { + from = min(from, src.Info().From) + to = max(to, src.Info().To) + } + + info.From, info.To = from, to + info.SealingTime = info.CreationTime + + info.InitEmptyDistribution() + return info +} + +func (s *MergeSource) Info() *common.Info { + s.infoOnce.Do(func() { + for i := range s.sources { + sinfo := s.sources[i].Info() + + s.info.DocsRaw += sinfo.DocsRaw + s.info.DocsTotal += sinfo.DocsTotal + s.info.DocsOnDisk += sinfo.DocsOnDisk + + // NOTE(dkharms): [IndexOnDisk] is calculated later. + } + }) + + return s.info +} + +func (s *MergeSource) BlockOffsets() []uint64 { + s.offsetsOnce.Do(func() { + var ( + docsSize uint64 + offsets []uint64 + ) + + s.docBlockCount = append(s.docBlockCount, 0) + for i := 0; i < len(s.sources); i++ { + for _, offset := range s.sources[i].BlockOffsets() { + offsets = append(offsets, uint64(offset)+docsSize) + } + docsSize += s.sources[i].Info().DocsOnDisk + s.docBlockCount = append(s.docBlockCount, len(offsets)) + } + + s.offsets = offsets + }) + + return s.offsets +} + +func (s *MergeSource) ID() iter.Seq2[DocLocation, error] { + // TODO(dkharms): For now, I will use stupid-simple linear scan for k-way merge. + // + // Its time complexity O(k*n) so it's not efficient enough if we compare it + // against time complexity of min-heap (which is O(n*log(k))) + // or another great data structure -- tournament tree -- which is O(n*log(k)) as well. + // + // However, tournament tree performs less comparisons than min-heap + // and it is around log(k) vs 2*log(k). + + type cursor struct { + next func() (DocLocation, error, bool) + stop func() + + loc DocLocation + lidOld uint32 + + ok bool + } + + return func(yield func(DocLocation, error) bool) { + var cursors []cursor + + defer func() { + for _, c := range cursors { + c.stop() + } + }() + + for i := range s.sources { + src := s.sources[i] + next, stop := iter.Pull2(src.ID()) + + // Skip [seq.SystemID] and [seq.SystemDocPos]. + _, _, _ = next() + + loc, err, ok := next() + cursors = append(cursors, cursor{ + next: next, stop: stop, + loc: loc, lidOld: 1, + ok: ok && err == nil, + }) + + if err != nil { + yield(DocLocation{}, err) + return + } + } + + lid := uint32(1) + // We've previosly dropped [seq.SystemID] from + // iterators however we do have to emit one such id. + if !yield(DocLocation{First: seq.SystemID, Second: seq.SystemDocPos}, nil) { + return + } + + for { + var ( + id seq.ID = seq.MinID + idx int = -1 + ) + + for i, c := range cursors { + // We exhausted i-th cursor so there is nothing pull. + if !c.ok { + continue + } + + if seq.Less(id, c.loc.First) { + id = c.loc.First + idx = i + } + } + + // All pull-iterators are exhausted. + // Close all iterators and return. + if idx == -1 { + break + } + + c := cursors[idx] + + minID, lidOld := c.loc.First, c.lidOld + s.info.AddMID(uint64(minID.MID)) + + blockIdx, offset := c.loc.Second.Unpack() + minDocPos := seq.PackDocPos(uint32(s.docBlockCount[idx]+int(blockIdx)), offset) + + if !yield(DocLocation{First: minID, Second: minDocPos}, nil) { + return + } + + // Rename lid from picked cursor to the new value. + s.lidMapping[idx][lidOld] = lid + + var err error + c.loc, err, c.ok = c.next() + c.lidOld += 1 + + if err != nil { + cursors[idx] = c + yield(DocLocation{}, err) + return + } + + lid += 1 + cursors[idx] = c + } + } +} + +func (s *MergeSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { + // TODO(dkharms): For now, I will use stupid-simple linear scan for k-way merge. + // + // Its time complexity O(k*n) so it's not efficient enough if we compare it + // against time complexity of min-heap (which is O(n*log(k))) + // or another great data structure -- tournament tree -- which is O(n*log(k)) as well. + // + // However, tournament tree performs less comparisons than min-heap + // and it is around log(k) vs 2*log(k). + + type cursor struct { + next func() (string, iter.Seq2[TokenPosting, error], bool) + stop func() + + field string + tokIt iter.Seq2[TokenPosting, error] + + ok bool + } + + minimal := func(cursors []cursor) (string, bool) { + var ( + set bool + field string + ) + + for _, c := range cursors { + if !c.ok { + continue + } + + if !set { + field = c.field + set = true + continue + } + + field = min(field, c.field) + } + + return field, set + } + + return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { + var cursors []cursor + + for i := range s.sources { + src := s.sources[i] + + next, stop := iter.Pull2(src.TokenTriplet()) + field, tokIt, has := next() + + cursors = append(cursors, cursor{ + next: next, stop: stop, + field: field, tokIt: tokIt, + ok: has, + }) + } + + defer func() { + for _, c := range cursors { + c.stop() + } + }() + + for { + field, ok := minimal(cursors) + if !ok { + break + } + + var ( + idxs []int + iters []iter.Seq2[TokenPosting, error] + ) + + for i, c := range cursors { + if !c.ok || c.field != field { + continue + } + + idxs = append(idxs, i) + iters = append(iters, c.tokIt) + } + + if !yield(field, s.postingsForField(idxs, iters)) { + return + } + + // Advance all cursors that were on this field. + for _, idx := range idxs { + c := cursors[idx] + c.field, c.tokIt, c.ok = c.next() + cursors[idx] = c + } + } + } +} + +func (s *MergeSource) postingsForField( + idxs []int, iters []iter.Seq2[TokenPosting, error], +) iter.Seq2[TokenPosting, error] { + type cursor struct { + next func() (TokenPosting, error, bool) + stop func() + + idx int + posting TokenPosting + + ok bool + } + + minimal := func(cursors []cursor) ([]byte, bool) { + var ( + set bool + token []byte + ) + + for _, c := range cursors { + if !c.ok { + continue + } + + if !set { + token = c.posting.First + set = true + continue + } + + if bytes.Compare(c.posting.First, token) < 0 { + token = c.posting.First + } + } + + return token, set + } + + // NB: This buffer will be reused across + // all calls within current field. + var lidRenamed []uint32 + + return func(yield func(TokenPosting, error) bool) { + var cursors []cursor + + defer func() { + for _, c := range cursors { + c.stop() + } + }() + + for i := range iters { + next, stop := iter.Pull2(iters[i]) + posting, err, ok := next() + + cursors = append(cursors, cursor{ + next: next, stop: stop, + idx: idxs[i], posting: posting, + ok: ok && err == nil, + }) + + if err != nil { + yield(TokenPosting{}, err) + return + } + } + + for { + token, ok := minimal(cursors) + if !ok { + break + } + + // Collect and remap lids from all cursors at this token, then advance them. + for i, c := range cursors { + if !c.ok || !bytes.Equal(c.posting.First, token) { + continue + } + + for _, lid := range c.posting.Second { + lidRenamed = append(lidRenamed, s.lidMapping[c.idx][lid]) + } + + var err error + c.posting, err, c.ok = c.next() + + if err != nil { + cursors[i] = c + yield(TokenPosting{}, err) + return + } + + cursors[i] = c + } + + slices.Sort(lidRenamed) + if !yield(TokenPosting{First: token, Second: lidRenamed}, nil) { + return + } + + lidRenamed = lidRenamed[:0] + } + } +} diff --git a/compaction/merge_source_test.go b/compaction/merge_source_test.go new file mode 100644 index 00000000..bb5fb3b1 --- /dev/null +++ b/compaction/merge_source_test.go @@ -0,0 +1,352 @@ +package compaction + +import ( + "cmp" + "fmt" + "iter" + "math/rand" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/seq" +) + +type mockSealingSource struct { + ids []seq.ID + pos []seq.DocPos + blocks []uint64 + docsOnDisk uint64 + fields map[string]map[string][]uint32 +} + +func (m *mockSealingSource) Info() *common.Info { + return &common.Info{ + DocsRaw: m.docsOnDisk, + DocsTotal: uint32(len(m.ids)), + DocsOnDisk: m.docsOnDisk, + + From: slices.MinFunc(m.ids, func(x, y seq.ID) int { + return cmp.Compare(x.MID, y.MID) + }).MID, + + To: slices.MaxFunc(m.ids, func(x, y seq.ID) int { + return cmp.Compare(x.MID, y.MID) + }).MID, + } +} + +func (m *mockSealingSource) BlockOffsets() []uint64 { + return m.blocks +} + +func (m *mockSealingSource) ID() iter.Seq2[DocLocation, error] { + return func(yield func(DocLocation, error) bool) { + docloc := DocLocation{First: seq.SystemID, Second: seq.SystemDocPos} + if !yield(docloc, nil) { + return + } + + for i, id := range m.ids { + docloc = DocLocation{First: id, Second: m.pos[i]} + if !yield(docloc, nil) { + return + } + } + } +} + +func (m *mockSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { + fields := make([]string, 0, len(m.fields)) + for f := range m.fields { + fields = append(fields, f) + } + + slices.Sort(fields) + return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { + for _, field := range fields { + if !yield(field, m.postingsForField(field)) { + return + } + } + } +} + +func (m *mockSealingSource) postingsForField(field string) iter.Seq2[TokenPosting, error] { + return func(yield func(TokenPosting, error) bool) { + tokens := make([]string, 0, len(m.fields[field])) + for t := range m.fields[field] { + tokens = append(tokens, t) + } + + slices.Sort(tokens) + for _, tok := range tokens { + posting := TokenPosting{ + First: []byte(tok), + Second: m.fields[field][tok], + } + + if !yield(posting, nil) { + return + } + } + } +} + +func (m *mockSealingSource) DocBlock() iter.Seq2[DocBlockLocation, error] { + return func(yield func(DocBlockLocation, error) bool) { + if !yield(DocBlockLocation{}, nil) { + return + } + } +} + +func (m *mockSealingSource) LastError() error { + return nil +} + +func TestMergeSource(t *testing.T) { + first := &mockSealingSource{ + ids: []seq.ID{ + {MID: 3}, + {MID: 2}, + {MID: 1}, + }, + + pos: []seq.DocPos{ + seq.PackDocPos(0, 0), + seq.PackDocPos(0, 1024), + seq.PackDocPos(0, 2048), + }, + + fields: map[string]map[string][]uint32{ + "level": { + "error": {1, 3}, + "info": {2, 3}, + }, + }, + + blocks: []uint64{0}, + docsOnDisk: 1024, + } + + second := &mockSealingSource{ + ids: []seq.ID{ + {MID: 6}, + {MID: 5}, + }, + + pos: []seq.DocPos{ + seq.PackDocPos(0, 0), + seq.PackDocPos(0, 2048), + }, + + fields: map[string]map[string][]uint32{ + "level": { + "debug": {1}, + "info": {2}, + }, + }, + + blocks: []uint64{0}, + docsOnDisk: 2048, + } + + source := NewMergeSource("inmemory", []Source{first, second}) + + t.Run("offsets", func(t *testing.T) { + // Validate correctness of [storage.DocBlock] calculation. + offsets := source.BlockOffsets() + require.Equal(t, []uint64{0, 1024}, offsets) + }) + + t.Run("ids", func(t *testing.T) { + var ( + ids []seq.ID + docpos []seq.DocPos + ) + + for loc, err := range source.ID() { + require.NoError(t, err) + ids = append(ids, loc.First) + docpos = append(docpos, loc.Second) + } + + require.Equal(t, + []seq.ID{ + seq.SystemID, + // [seq.ID] from the second source. + {MID: 6}, + {MID: 5}, + // [seq.ID] from the first source. + {MID: 3}, + {MID: 2}, + {MID: 1}, + }, + ids, + ) + + require.Equal(t, + []seq.DocPos{ + seq.SystemDocPos, + // [seq.DocPos] from the second source. + seq.PackDocPos(1, 0), seq.PackDocPos(1, 2048), + // [seq.DocPos] from the first source. + seq.PackDocPos(0, 0), seq.PackDocPos(0, 1024), seq.PackDocPos(0, 2048), + }, + docpos, + ) + }) + + t.Run("tokens-lids", func(t *testing.T) { + var ( + fields []string + tokens [][]byte + lids [][]uint32 + ) + + for field, fieldIt := range source.TokenTriplet() { + fields = append(fields, field) + + for posting, err := range fieldIt { + require.NoError(t, err) + tokens = append(tokens, posting.First) + lids = append(lids, slices.Clone(posting.Second)) + } + } + + // Both sources have the same and the only field. + require.Equal(t, []string{"level"}, fields) + + // Ensure tokens are sorted in ascending order. + require.Equal(t, + [][]byte{[]byte("debug"), []byte("error"), []byte("info")}, + tokens, + ) + + // Ensure correctness of lids remapping: + // ------------------------- + // seq.MID 6 5 | 3 2 1 + // seq.LID (old) 1 2 | 1 2 3 + // seq.LID (new) 1 2 | 3 4 5 + // ------------------------- + require.Equal(t, + [][]uint32{ + // Sequence of [seq.LID] for token `debug`. + {1}, + // Sequence of [seq.LID] for token `error`. + {3, 5}, + // Sequence of [seq.LID] for token `info`. + {2, 4, 5}, + }, + lids, + ) + }) + + t.Run("info", func(t *testing.T) { + merged := source.Info() + finfo, sinfo := first.Info(), second.Info() + + // Validate correctness of fraction time-range. + require.Equal(t, merged.From, min(finfo.From, sinfo.From)) + require.Equal(t, merged.To, max(finfo.To, sinfo.To)) + + // Validate correctness of total documents of merged fractions. + require.Equal(t, merged.DocsTotal, finfo.DocsTotal+sinfo.DocsTotal) + require.Equal(t, merged.DocsOnDisk, finfo.DocsOnDisk+sinfo.DocsOnDisk) + require.Equal(t, merged.DocsRaw, finfo.DocsRaw+sinfo.DocsRaw) + + // Validate correctness of distribution. + require.NotNil(t, merged.Distribution) + require.True(t, merged.IsIntersecting(finfo.From, finfo.To)) + require.True(t, merged.IsIntersecting(sinfo.From, sinfo.To)) + require.True(t, merged.IsIntersecting(min(finfo.From, sinfo.From), max(finfo.To, sinfo.To))) + }) +} + +func BenchmarkMergeSource(b *testing.B) { + const ( + numSources = 4 + docsPerSource = 512_000 + + // Total count of pairs of (field, token) will be + // [numFields] * [numTokens]. + numFields = 512 + numTokens = 16384 + ) + + rng := rand.New(rand.NewSource(42)) + + fieldNames := make([]string, numFields) + for i := range fieldNames { + fieldNames[i] = fmt.Sprintf("field-%d", i) + } + + tokenNames := make([]string, numTokens) + for i := range tokenNames { + tokenNames[i] = fmt.Sprintf("token-%d", i) + } + + makeSource := func(midOffset seq.MID) Source { + ids := make([]seq.ID, docsPerSource) + pos := make([]seq.DocPos, docsPerSource) + + for j := range ids { + // IDs must be in descending MID order within each source. + ids[j] = seq.ID{MID: midOffset + seq.MID(docsPerSource-j)} + pos[j] = seq.PackDocPos(0, uint64(j*64)) + } + + // Assign each lid to a random (field, token) pair from the vocabulary + // so that total lids per source equals [docsPerSource]. + fields := make(map[string]map[string][]uint32) + for lid := uint32(1); lid <= uint32(docsPerSource); lid++ { + field := fieldNames[rng.Intn(numFields)] + token := tokenNames[rng.Intn(numTokens)] + + if fields[field] == nil { + fields[field] = make(map[string][]uint32) + } + + fields[field][token] = append(fields[field][token], lid) + } + + for _, tokens := range fields { + for tok, lids := range tokens { + slices.Sort(lids) + tokens[tok] = lids + } + } + + return &mockSealingSource{ + ids: ids, + pos: pos, + blocks: []uint64{0}, + docsOnDisk: docsPerSource * 64, + fields: fields, + } + } + + sources := make([]Source, numSources) + for i := range sources { + sources[i] = makeSource(seq.MID(i * docsPerSource)) + } + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + ms := NewMergeSource("bench", sources) + + ms.BlockOffsets() + for range ms.ID() { + } + + for _, tokIt := range ms.TokenTriplet() { + for range tokIt { + } + } + } +} diff --git a/consts/consts.go b/consts/consts.go index ccaba4e2..3341aecd 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -56,6 +56,7 @@ const ( WalFileSuffix = ".wal" DocsFileSuffix = ".docs" + DocsTmpFileSuffix = "._docs" DocsDelFileSuffix = ".docs.del" SdocsFileSuffix = ".sdocs" diff --git a/frac/common/info.go b/frac/common/info.go index b82f6b99..2a3805aa 100644 --- a/frac/common/info.go +++ b/frac/common/info.go @@ -82,6 +82,13 @@ func (s *Info) BuildDistribution(mids []uint64) { } } +func (s *Info) AddMID(mid uint64) { + if s.Distribution == nil { + return + } + s.Distribution.Add(seq.MID(mid)) +} + func (s *Info) InitEmptyDistribution() bool { from := s.From.Time() creationTime := time.UnixMilli(int64(s.CreationTime)) diff --git a/frac/sealed_source.go b/frac/sealed_source.go new file mode 100644 index 00000000..beb7e4bd --- /dev/null +++ b/frac/sealed_source.go @@ -0,0 +1,159 @@ +package frac + +import ( + "iter" + "slices" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/indexwriter" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" +) + +type DocBlockLocation = util.Pair[[]byte, uint64] + +// SealedSource implements [indexwriter.Source] for a sealed fraction. +// Used as input to [compaction.MergeSource] when compacting multiple fractions. +type SealedSource struct { + f *Sealed + + idsProvider *seqids.Provider + lidsLoader *lids.Loader + + tokenBlockLoader *token.BlockLoader + tokenTableLoader *token.TableLoader +} + +func NewSealedSource(f *Sealed) *SealedSource { + f.init(true) + return &SealedSource{ + f: f, + idsProvider: seqids.NewProvider( + &f.idReader, + f.indexCache.MIDs, + f.indexCache.RIDs, + f.indexCache.Params, + &f.blocksData.IDsTable, + f.info.BinaryDataVer, + ), + lidsLoader: lids.NewLoader(f.Info().BinaryDataVer, &f.lidReader, f.indexCache.LIDs), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.tokenReader, f.indexCache.Tokens), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, &f.tokenReader, f.indexCache.TokenTable), + } +} + +func (s *SealedSource) Info() *common.Info { + return s.f.info +} + +func (s *SealedSource) BlockOffsets() []uint64 { + return s.f.blocksData.BlocksOffsets +} + +func (s *SealedSource) ID() iter.Seq2[indexwriter.DocLocation, error] { + return func(yield func(indexwriter.DocLocation, error) bool) { + for lid := uint32(0); lid < s.f.blocksData.IDsTable.IDsTotal; lid++ { + mid, err := s.idsProvider.MID(seq.LID(lid)) + if err != nil { + yield(indexwriter.DocLocation{}, err) + return + } + + rid, err := s.idsProvider.RID(seq.LID(lid)) + if err != nil { + yield(indexwriter.DocLocation{}, err) + return + } + + pos, err := s.idsProvider.DocPos(seq.LID(lid)) + if err != nil { + yield(indexwriter.DocLocation{}, err) + return + } + + if !yield(indexwriter.DocLocation{First: seq.ID{MID: mid, RID: rid}, Second: pos}, nil) { + return + } + } + } +} + +func (s *SealedSource) TokenTriplet() iter.Seq2[string, iter.Seq2[indexwriter.TokenPosting, error]] { + tokenTable := s.tokenTableLoader.Load() + + fields := make([]string, 0, len(tokenTable)) + for field := range tokenTable { + fields = append(fields, field) + } + + slices.Sort(fields) + return func(yield func(string, iter.Seq2[indexwriter.TokenPosting, error]) bool) { + for _, field := range fields { + if !yield(field, s.postingsForField(field)) { + return + } + } + } +} + +func (s *SealedSource) postingsForField(field string) iter.Seq2[indexwriter.TokenPosting, error] { + lidsTable := s.f.blocksData.LIDsTable + tokenTable := s.tokenTableLoader.Load() + + var lidsBuf []uint32 + return func(yield func(indexwriter.TokenPosting, error) bool) { + for _, entry := range tokenTable[field].Entries { + block := s.tokenBlockLoader.Load(entry.BlockIndex) + + for tid := entry.StartTID; tid < entry.StartTID+entry.ValCount; tid++ { + lidsBuf = lidsBuf[:0] + + tokenVal := block.GetToken(entry.GetIndexInTokensBlock(tid)) + firstBlock := lidsTable.GetFirstBlockIndexForTID(tid) + lastBlock := lidsTable.GetLastBlockIndexForTID(tid) + + for bi := firstBlock; bi <= lastBlock; bi++ { + lidBlock, err := s.lidsLoader.GetLIDsBlock(bi) + if err != nil { + yield(indexwriter.TokenPosting{}, err) + return + } + + chunkIdx := lidsTable.GetChunkIndex(bi, tid) + lidsBuf = append(lidsBuf, lidBlock.LIDs[lidBlock.Offsets[chunkIdx]:lidBlock.Offsets[chunkIdx+1]]...) + } + + if !yield(indexwriter.TokenPosting{First: tokenVal, Second: lidsBuf}, nil) { + return + } + } + } + } +} + +func (s *SealedSource) DocBlock() iter.Seq2[DocBlockLocation, error] { + return func(yield func(DocBlockLocation, error) bool) { + // We do not want to cache payload of DocBlock because + // it will just pollute cache and cause unnecessary evictions. + r := storage.NewDocBlocksReader(s.f.readLimiter, s.f.docsFile) + + for _, offset := range s.f.blocksData.BlocksOffsets { + // Read DocBlock payload (including its header) but do not decompress it. + // Caller of [SealedSource.DocBlock] will decide whether it requires decompressed data. + payload, _, err := r.ReadDocBlock(int64(offset)) + if err != nil { + yield(DocBlockLocation{}, err) + return + } + + loc := DocBlockLocation{First: payload, Second: offset} + if !yield(loc, nil) { + return + } + } + } +} diff --git a/seq/seq.go b/seq/seq.go index adae4265..d3557a16 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -11,9 +11,13 @@ import ( ) var ( - SystemMID MID = math.MaxUint64 - SystemRID RID = math.MaxUint64 - SystemID ID = ID{SystemMID, SystemRID} + SystemMID MID = math.MaxUint64 + SystemRID RID = math.MaxUint64 + + SystemID ID = ID{SystemMID, SystemRID} + MinID ID = ID{0, 0} + MaxID ID = SystemID + SystemDocPos DocPos = DocPos(0) )