Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 61 additions & 39 deletions tools/workload/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"workload/schema/shop"
psysbench "workload/schema/sysbench"
puuu "workload/schema/uuu"
pwidetablewithjson "workload/schema/wide_table_with_json"
)

// WorkloadExecutor executes the workload and collects statistics
Expand Down Expand Up @@ -72,16 +73,17 @@ type WorkloadApp struct {
}

const (
bank = "bank"
sysbench = "sysbench"
largeRow = "large_row"
shopItem = "shop_item"
uuu = "uuu"
crawler = "crawler"
bank2 = "bank2"
bank3 = "bank3"
bankUpdate = "bank_update"
dc = "dc"
bank = "bank"
sysbench = "sysbench"
largeRow = "large_row"
shopItem = "shop_item"
uuu = "uuu"
crawler = "crawler"
bank2 = "bank2"
bank3 = "bank3"
bankUpdate = "bank_update"
dc = "dc"
wideTableWithJSON = "wide_table_with_json"
)

// stmtCacheKey is used as the key for statement cache
Expand Down Expand Up @@ -142,6 +144,8 @@ func (app *WorkloadApp) createWorkload() schema.Workload {
workload = bankupdate.NewBankUpdateWorkload(app.Config.TotalRowCount, app.Config.UpdateLargeColumnSize)
case dc:
workload = pdc.NewDCWorkload()
case wideTableWithJSON:
workload = pwidetablewithjson.NewWideTableWithJSONWorkload(app.Config.RowSize, app.Config.TableCount, app.Config.TableStartIndex, app.Config.TotalRowCount)
default:
plog.Panic("unsupported workload type", zap.String("workload", app.Config.WorkloadType))
}
Expand Down Expand Up @@ -236,6 +240,7 @@ func (app *WorkloadApp) handleWorkloadExecution(insertConcurrency, updateConcurr
zap.Int("ddlWorker", app.Config.DDLWorker),
zap.String("ddlTimeout", app.Config.DDLTimeout.String()),
zap.Int("batchSize", app.Config.BatchSize),
zap.Bool("batchInTxn", app.Config.BatchInTxn),
zap.String("action", app.Config.Action),
)

Expand Down Expand Up @@ -291,7 +296,9 @@ func (app *WorkloadApp) executeInsertWorkers(insertConcurrency int, wg *sync.Wai
plog.Info("start insert worker to write data to db", zap.Int("worker", workerID), zap.String("db", db.Name))

for {
err = app.doInsert(conn)
flushedRows, err := app.runTransaction(conn, func() (uint64, error) {
return app.doInsertOnce(conn)
})
if err != nil {
// Check if it's a connection-level error that requires reconnection
if app.isConnectionError(err) {
Expand All @@ -316,6 +323,10 @@ func (app *WorkloadApp) executeInsertWorkers(insertConcurrency int, wg *sync.Wai
time.Sleep(time.Second * 2)
continue
}

if flushedRows != 0 {
app.Stats.FlushedRowCount.Add(flushedRows)
}
}
}(i)
}
Expand All @@ -336,36 +347,47 @@ func (app *WorkloadApp) isConnectionError(err error) bool {
strings.Contains(errStr, "invalid connection")
}

// doInsert performs insert operations
func (app *WorkloadApp) doInsert(conn *sql.Conn) error {
for {
j := rand.Intn(app.Config.TableCount) + app.Config.TableStartIndex
var err error

switch app.Config.WorkloadType {
case uuu:
insertSql, values := app.Workload.(*puuu.UUUWorkload).BuildInsertSqlWithValues(j, app.Config.BatchSize)
_, err = app.executeWithValues(conn, insertSql, j, values)
case bank2:
insertSql, values := app.Workload.(*pbank2.Bank2Workload).BuildInsertSqlWithValues(j, app.Config.BatchSize)
_, err = app.executeWithValues(conn, insertSql, j, values)
default:
insertSql := app.Workload.BuildInsertSql(j, app.Config.BatchSize)
_, err = app.execute(conn, insertSql, j)
}
if err != nil {
if strings.Contains(err.Error(), "connection is already closed") {
plog.Info("connection is already closed", zap.Error(err))
app.Stats.ErrorCount.Add(1)
return err
}
func (app *WorkloadApp) doInsertOnce(conn *sql.Conn) (uint64, error) {
tableIndex := rand.Intn(app.Config.TableCount) + app.Config.TableStartIndex
var (
res sql.Result
err error
)

plog.Info("do insert error", zap.Error(err))
app.Stats.ErrorCount.Add(1)
continue
}
app.Stats.FlushedRowCount.Add(uint64(app.Config.BatchSize))
switch app.Config.WorkloadType {
case uuu:
insertSQL, values := app.Workload.(*puuu.UUUWorkload).BuildInsertSqlWithValues(tableIndex, app.Config.BatchSize)
res, err = app.executeWithValues(conn, insertSQL, tableIndex, values)
case bank2:
insertSQL, values := app.Workload.(*pbank2.Bank2Workload).BuildInsertSqlWithValues(tableIndex, app.Config.BatchSize)
res, err = app.executeWithValues(conn, insertSQL, tableIndex, values)
case wideTableWithJSON:
insertSQL, values := app.Workload.(*pwidetablewithjson.WideTableWithJSONWorkload).BuildInsertSqlWithValues(tableIndex, app.Config.BatchSize)
res, err = app.executeWithValues(conn, insertSQL, tableIndex, values)
default:
insertSQL := app.Workload.BuildInsertSql(tableIndex, app.Config.BatchSize)
res, err = app.execute(conn, insertSQL, tableIndex)
}

if err != nil {
return 0, err
}
Comment thread
lidezhu marked this conversation as resolved.

if res == nil {
return 0, nil
}

cnt, err := res.RowsAffected()
if err != nil {
plog.Info("get rows affected error",
zap.Error(err),
zap.Int("table", tableIndex),
zap.Int("batchSize", app.Config.BatchSize))
app.Stats.ErrorCount.Add(1)
return 0, nil
}

return uint64(cnt), nil
}

// execute executes a SQL statement
Expand Down
16 changes: 10 additions & 6 deletions tools/workload/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type WorkloadConfig struct {
DBNum int

// Workload related
WorkloadType string
TableCount int
TableStartIndex int
Thread int
BatchSize int
WorkloadType string
TableCount int
TableStartIndex int
Thread int
BatchSize int
// BatchInTxn wraps each batch in one explicit transaction (BEGIN/COMMIT).
BatchInTxn bool
TotalRowCount uint64
PercentageForUpdate float64
PercentageForDelete float64
Expand Down Expand Up @@ -88,6 +90,7 @@ func NewWorkloadConfig() *WorkloadConfig {
TableStartIndex: 0,
Thread: 16,
BatchSize: 10,
BatchInTxn: false,
TotalRowCount: 1000000000,
PercentageForUpdate: 0,
PercentageForDelete: 0,
Expand Down Expand Up @@ -131,12 +134,13 @@ func (c *WorkloadConfig) ParseFlags() error {
flag.IntVar(&c.TableStartIndex, "table-start-index", c.TableStartIndex, "table start index, sbtest<index>")
flag.IntVar(&c.Thread, "thread", c.Thread, "total thread of the workload")
flag.IntVar(&c.BatchSize, "batch-size", c.BatchSize, "batch size of each insert/update/delete")
flag.BoolVar(&c.BatchInTxn, "batch-in-txn", c.BatchInTxn, "wrap each batch in one explicit transaction")
flag.Uint64Var(&c.TotalRowCount, "total-row-count", c.TotalRowCount, "the total row count of the workload, default is 1 billion")
flag.Float64Var(&c.PercentageForUpdate, "percentage-for-update", c.PercentageForUpdate, "percentage for update: [0, 1.0]")
flag.Float64Var(&c.PercentageForDelete, "percentage-for-delete", c.PercentageForDelete, "percentage for delete: [0, 1.0]")
flag.BoolVar(&c.SkipCreateTable, "skip-create-table", c.SkipCreateTable, "do not create tables")
flag.StringVar(&c.Action, "action", c.Action, "action of the workload: [prepare, insert, update, delete, write, ddl, cleanup]")
flag.StringVar(&c.WorkloadType, "workload-type", c.WorkloadType, "workload type: [bank, sysbench, large_row, shop_item, uuu, bank2, bank3, bank_update, crawler, dc]")
flag.StringVar(&c.WorkloadType, "workload-type", c.WorkloadType, "workload type: [bank, sysbench, large_row, shop_item, uuu, bank2, bank3, bank_update, crawler, dc, wide_table_with_json]")
flag.StringVar(&c.DBHost, "database-host", c.DBHost, "database host")
flag.StringVar(&c.DBUser, "database-user", c.DBUser, "database user")
flag.StringVar(&c.DBPassword, "database-password", c.DBPassword, "database password")
Expand Down
52 changes: 23 additions & 29 deletions tools/workload/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"database/sql"
"fmt"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -79,7 +78,9 @@ func (app *WorkloadApp) executeDeleteWorkers(deleteConcurrency int, wg *sync.Wai
plog.Info("start delete worker", zap.Int("worker", workerID))

for {
err = app.doDelete(conn, deleteTaskCh)
flushedRows, err := app.runTransaction(conn, func() (uint64, error) {
return app.doDeleteOnce(conn, deleteTaskCh)
})
if err != nil {
// Check if it's a connection-level error that requires reconnection
if app.isConnectionError(err) {
Expand All @@ -98,8 +99,14 @@ func (app *WorkloadApp) executeDeleteWorkers(deleteConcurrency int, wg *sync.Wai
}
}

app.Stats.ErrorCount.Add(1)
plog.Info("delete worker failed, retrying", zap.Int("worker", workerID), zap.Error(err))
time.Sleep(time.Second * 2)
continue
}

if flushedRows != 0 {
app.Stats.FlushedRowCount.Add(flushedRows)
}
}
}(i)
Expand All @@ -121,42 +128,33 @@ func (app *WorkloadApp) genDeleteTask(output chan deleteTask) {
}
}

// doDelete performs delete operations on the database
func (app *WorkloadApp) doDelete(conn *sql.Conn, input chan deleteTask) error {
for task := range input {
if err := app.processDeleteTask(conn, task); err != nil {
// Return only if connection is closed, other errors are handled within processDeleteTask
if strings.Contains(err.Error(), "connection is already closed") {
return err
}
}
}
return nil
func (app *WorkloadApp) doDeleteOnce(conn *sql.Conn, input chan deleteTask) (uint64, error) {
task := <-input
return app.processDeleteTask(conn, &task)
}

// processDeleteTask handles a single delete task
func (app *WorkloadApp) processDeleteTask(conn *sql.Conn, task deleteTask) error {
func (app *WorkloadApp) processDeleteTask(conn *sql.Conn, task *deleteTask) (uint64, error) {
// Execute delete and get result
res, err := app.executeDelete(conn, task)
if err != nil {
return app.handleDeleteError(err, task)
app.handleDeleteError(err, task)
return 0, err
}

// Process delete result
if err := app.processDeleteResult(res, task); err != nil {
return err
}
affectedRows := app.processDeleteResult(res, task)

// Execute callback if exists
if task.callback != nil {
task.callback()
}

return nil
return affectedRows, nil
}

// executeDelete performs the actual delete operation
func (app *WorkloadApp) executeDelete(conn *sql.Conn, task deleteTask) (sql.Result, error) {
func (app *WorkloadApp) executeDelete(conn *sql.Conn, task *deleteTask) (sql.Result, error) {
deleteSQL := app.Workload.BuildDeleteSql(task.DeleteOption)
if deleteSQL == "" {
return nil, nil
Expand All @@ -166,19 +164,17 @@ func (app *WorkloadApp) executeDelete(conn *sql.Conn, task deleteTask) (sql.Resu
}

// handleDeleteError processes delete operation errors
func (app *WorkloadApp) handleDeleteError(err error, task deleteTask) error {
app.Stats.ErrorCount.Add(1)
func (app *WorkloadApp) handleDeleteError(err error, task *deleteTask) {
// Truncate long SQL for logging
plog.Info("delete error",
zap.Error(err),
zap.String("sql", getSQLPreview(task.generatedSQL)))
return err
}

// processDeleteResult handles the result of delete operation
func (app *WorkloadApp) processDeleteResult(res sql.Result, task deleteTask) error {
func (app *WorkloadApp) processDeleteResult(res sql.Result, task *deleteTask) uint64 {
if res == nil {
return nil
return 0
}

cnt, err := res.RowsAffected()
Expand All @@ -189,9 +185,7 @@ func (app *WorkloadApp) processDeleteResult(res sql.Result, task deleteTask) err
zap.Int("rowCount", task.Batch),
zap.String("sql", getSQLPreview(task.generatedSQL)))
app.Stats.ErrorCount.Add(1)
return 0
}

app.Stats.FlushedRowCount.Add(uint64(cnt))

return nil
return uint64(cnt)
}
Loading
Loading