Skip to content

Commit 1cf3c76

Browse files
committed
Add BenchmarkParquetBucketStore_SeriesBatch
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent bf66b3e commit 1cf3c76

1 file changed

Lines changed: 269 additions & 0 deletions

File tree

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package storegateway
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"math"
9+
"net"
10+
"os"
11+
"path/filepath"
12+
"testing"
13+
"time"
14+
15+
"github.com/go-kit/log"
16+
"github.com/gogo/protobuf/types"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/stretchr/testify/assert"
19+
"github.com/stretchr/testify/require"
20+
"github.com/thanos-io/objstore"
21+
"github.com/thanos-io/thanos/pkg/block"
22+
"github.com/thanos-io/thanos/pkg/block/metadata"
23+
"github.com/thanos-io/thanos/pkg/store/hintspb"
24+
"github.com/thanos-io/thanos/pkg/store/storepb"
25+
"github.com/weaveworks/common/middleware"
26+
"github.com/weaveworks/common/user"
27+
"google.golang.org/grpc"
28+
"google.golang.org/grpc/credentials/insecure"
29+
grpcMetadata "google.golang.org/grpc/metadata"
30+
31+
"github.com/cortexproject/cortex/pkg/parquetconverter"
32+
"github.com/cortexproject/cortex/pkg/ring"
33+
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
34+
"github.com/cortexproject/cortex/pkg/storage/bucket"
35+
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
36+
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
37+
"github.com/cortexproject/cortex/pkg/util/flagext"
38+
"github.com/cortexproject/cortex/pkg/util/services"
39+
"github.com/cortexproject/cortex/pkg/util/users"
40+
"github.com/cortexproject/cortex/pkg/util/validation"
41+
)
42+
43+
func BenchmarkParquetBucketStore_SeriesBatch(b *testing.B) {
44+
seriesNum := []int{100, 1000, 10000, 100000}
45+
samplePerSeries := 100
46+
batchSizes := []int{1, 10, 100, 1000, 10000}
47+
48+
for _, series := range seriesNum {
49+
b.Run(fmt.Sprintf("series_%d", series), func(b *testing.B) {
50+
ctx := context.Background()
51+
tmpDir := b.TempDir()
52+
storageDir := filepath.Join(tmpDir, "storage")
53+
dataDir := filepath.Join(tmpDir, "data")
54+
userID := "user-1"
55+
56+
// Initialize the BucketStore
57+
storageCfg := cortex_tsdb.BlocksStorageConfig{
58+
UsersScanner: users.UsersScannerConfig{
59+
Strategy: users.UserScanStrategyList,
60+
UpdateInterval: time.Second,
61+
},
62+
Bucket: bucket.Config{
63+
Backend: "filesystem",
64+
Filesystem: filesystem.Config{
65+
Directory: storageDir,
66+
},
67+
},
68+
BucketStore: cortex_tsdb.BucketStoreConfig{
69+
SyncDir: filepath.Join(tmpDir, "sync"),
70+
BucketStoreType: "parquet",
71+
BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery),
72+
},
73+
}
74+
bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry())
75+
require.NoError(b, err)
76+
77+
blockID := prepareParquetBlock(b, ctx, storageCfg, bucketClient, dataDir, userID, series, samplePerSeries)
78+
79+
reg := prometheus.NewPedanticRegistry()
80+
stores, err := NewBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg)
81+
require.NoError(b, err)
82+
83+
// Start gRPC Server
84+
listener, err := net.Listen("tcp", "localhost:0")
85+
require.NoError(b, err)
86+
87+
gRPCServer := grpc.NewServer(
88+
grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor),
89+
)
90+
storepb.RegisterStoreServer(gRPCServer, stores)
91+
92+
// start gRPC server
93+
go func() {
94+
if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped {
95+
b.Error(err)
96+
}
97+
}()
98+
defer gRPCServer.Stop()
99+
100+
// Initialize gRPC Client
101+
conn, err := grpc.NewClient(listener.Addr().String(),
102+
grpc.WithTransportCredentials(insecure.NewCredentials()),
103+
grpc.WithDefaultCallOptions(
104+
grpc.MaxCallRecvMsgSize(math.MaxInt32),
105+
),
106+
)
107+
require.NoError(b, err)
108+
defer conn.Close()
109+
110+
gRPCClient := storepb.NewStoreClient(conn)
111+
for _, batchSize := range batchSizes {
112+
b.Run(fmt.Sprintf("batchSize=%d", batchSize), func(b *testing.B) {
113+
b.ReportAllocs()
114+
for b.Loop() {
115+
benchmarkBatchingForParquetBucketStore(b, gRPCClient, userID, batchSize, series, blockID)
116+
}
117+
})
118+
}
119+
})
120+
}
121+
}
122+
123+
func prepareParquetBlock(b *testing.B, ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, bkt objstore.InstrumentedBucket, dataDir, userID string, numSeries, numSamples int) string {
124+
logger := log.NewNopLogger()
125+
reg := prometheus.NewRegistry()
126+
127+
// Generate TSDB block
128+
generateBenchmarkBlock(b, dataDir, userID, numSeries, numSamples)
129+
130+
userBucket := bucket.NewUserBucketClient("user-1", bkt, nil)
131+
srcBlockDir := filepath.Join(dataDir, userID)
132+
133+
// find blockID
134+
dirs, err := os.ReadDir(srcBlockDir)
135+
require.NoError(b, err)
136+
var blockID string
137+
for _, d := range dirs {
138+
if d.IsDir() {
139+
blockID = d.Name()
140+
break
141+
}
142+
}
143+
require.NotEmpty(b, blockID)
144+
145+
blockPath := filepath.Join(srcBlockDir, blockID)
146+
metaFilePath := filepath.Join(blockPath, "meta.json")
147+
metaBytes, err := os.ReadFile(metaFilePath)
148+
require.NoError(b, err)
149+
150+
var meta metadata.Meta
151+
err = json.Unmarshal(metaBytes, &meta)
152+
require.NoError(b, err)
153+
154+
if meta.Thanos.Labels == nil {
155+
meta.Thanos.Labels = make(map[string]string)
156+
}
157+
meta.Thanos.Labels["replica"] = "0" // append dummy label to success block.Upload
158+
159+
// write thanos label appended meta.json
160+
newMetaBytes, err := json.Marshal(meta)
161+
require.NoError(b, err)
162+
err = os.WriteFile(metaFilePath, newMetaBytes, 0666)
163+
require.NoError(b, err)
164+
165+
// Upload generated block to Storage
166+
err = block.Upload(ctx, logger, userBucket, blockPath, metadata.NoneFunc)
167+
require.NoError(b, err)
168+
169+
convCfg := parquetconverter.Config{}
170+
flagext.DefaultValues(&convCfg)
171+
convCfg.ConversionInterval = time.Second // to convert quickly
172+
convCfg.DataDir = filepath.Join(dataDir, "converter-data")
173+
174+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
175+
b.Cleanup(func() { assert.NoError(b, closer.Close()) })
176+
177+
convCfg.Ring.InstanceID = "parquet-converter-1"
178+
convCfg.Ring.InstanceAddr = "1.2.3.4"
179+
convCfg.Ring.KVStore.Mock = ringStore
180+
181+
limits := &validation.Limits{}
182+
flagext.DefaultValues(limits)
183+
limits.ParquetConverterEnabled = true
184+
overrides := validation.NewOverrides(*limits, nil)
185+
186+
// Create parquet converter
187+
converter, err := parquetconverter.NewConverter(convCfg, storageCfg, []int64{1, 2 * 3600 * 1000}, logger, reg, overrides)
188+
require.NoError(b, err)
189+
190+
err = services.StartAndAwaitRunning(context.Background(), converter)
191+
require.NoError(b, err)
192+
defer services.StopAndAwaitTerminated(ctx, converter) // nolint:errcheck
193+
194+
// check parquet converter mark file exist
195+
markerFile := filepath.Join(blockID, "parquet-converter-mark.json")
196+
require.Eventually(b, func() bool {
197+
exists, err := userBucket.Exists(ctx, markerFile)
198+
return err == nil && exists
199+
}, 10*time.Second, 100*time.Millisecond, "failed to wait for parquet conversion (marker file not found)")
200+
201+
// check chunk parquet file exist
202+
existsChunks, err := userBucket.Exists(ctx, filepath.Join(blockID, "0.chunks.parquet"))
203+
require.NoError(b, err)
204+
require.True(b, existsChunks, "chunks.parquet file should exist")
205+
206+
existsLabels, err := userBucket.Exists(ctx, filepath.Join(blockID, "0.labels.parquet"))
207+
require.NoError(b, err)
208+
require.True(b, existsLabels, "labels.parquet file should exist")
209+
210+
return blockID
211+
}
212+
213+
func benchmarkBatchingForParquetBucketStore(b *testing.B, client storepb.StoreClient, userID string, batchSize int, expectedSeries int, blockID string) {
214+
ctx := grpcMetadata.NewOutgoingContext(context.Background(), grpcMetadata.Pairs(cortex_tsdb.TenantIDExternalLabel, userID))
215+
ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, userID))
216+
require.NoError(b, err)
217+
218+
hintMatchers := []storepb.LabelMatcher{
219+
{
220+
Type: storepb.LabelMatcher_RE,
221+
Name: block.BlockIDLabel,
222+
Value: blockID,
223+
},
224+
}
225+
226+
dataMatchers := []storepb.LabelMatcher{
227+
{
228+
Type: storepb.LabelMatcher_RE,
229+
Name: "__name__",
230+
Value: ".+",
231+
},
232+
}
233+
234+
hints := &hintspb.SeriesRequestHints{
235+
BlockMatchers: hintMatchers,
236+
}
237+
hintsAny, err := types.MarshalAny(hints)
238+
require.NoError(b, err)
239+
240+
req := &storepb.SeriesRequest{
241+
MinTime: 0,
242+
MaxTime: math.MaxInt64,
243+
Matchers: dataMatchers,
244+
ResponseBatchSize: int64(batchSize),
245+
Hints: hintsAny,
246+
}
247+
248+
stream, err := client.Series(ctx, req)
249+
require.NoError(b, err)
250+
251+
got := 0
252+
for {
253+
resp, err := stream.Recv()
254+
if err == io.EOF {
255+
break
256+
}
257+
require.NoError(b, err)
258+
259+
if series := resp.GetSeries(); series != nil {
260+
got++
261+
} else if batch := resp.GetBatch(); batch != nil {
262+
got += len(batch.Series)
263+
}
264+
}
265+
266+
if got != expectedSeries {
267+
b.Fatalf("expected %d series, got %d", expectedSeries, got)
268+
}
269+
}

0 commit comments

Comments
 (0)