Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
pdpTerminate := pdp.NewTerminateServiceTask(db, must.One(dependencies.EthClient.Val()), senderEth)
pdpDelete := pdp.NewDeleteDataSetTask(db, must.One(dependencies.EthClient.Val()), senderEth)
payTask := pay.NewSettleTask(db, must.One(dependencies.EthClient.Val()), senderEth)
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete, payTask)
activeTasks = append(activeTasks, pdpProveTask, pdpNotifTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete, payTask)
}

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
Expand Down
113 changes: 55 additions & 58 deletions tasks/pdp/notify_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,69 @@ import (
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/passcall"
"github.com/filecoin-project/curio/lib/promise"
)

var log = logger.Logger("pdp")

type PDPNotifyTask struct {
db *harmonydb.DB
TF promise.Promise[harmonytask.AddTaskFunc]
}

func NewPDPNotifyTask(db *harmonydb.DB) *PDPNotifyTask {
return &PDPNotifyTask{db: db}
n := &PDPNotifyTask{db: db}
go n.poll(context.Background())
Comment thread
LexLuthr marked this conversation as resolved.
return n
}

func (t *PDPNotifyTask) poll(ctx context.Context) {
for {
var uploads []struct {
ID string `db:"id"`
}

err := t.db.Select(ctx, &uploads, `
SELECT pu.id
FROM pdp_piece_uploads pu
JOIN parked_piece_refs pr ON pr.ref_id = pu.piece_ref
JOIN parked_pieces pp ON pp.id = pr.piece_id
WHERE
pu.piece_ref IS NOT NULL
AND pp.complete = TRUE
AND pu.notify_task_id IS NULL LIMIT 10`)
Comment thread
rvagg marked this conversation as resolved.
if err != nil {
log.Errorf("getting uploads to notify: %s", err)
Comment thread
LexLuthr marked this conversation as resolved.
}

if len(uploads) == 0 {
// No uploads to process
continue
} else {
for _, upload := range uploads {
upload := upload
failed := false

t.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
n, err := tx.Exec(`
UPDATE pdp_piece_uploads
SET notify_task_id = $1
WHERE id = $2 AND notify_task_id IS NULL`, id, upload.ID)
if err != nil {
failed = true
return false, xerrors.Errorf("updating notify_task_id: %w", err)
}
return n > 0, nil
})
if failed {
// Let's exit the loop and try again later
break
}
}
}
Comment thread
LexLuthr marked this conversation as resolved.

time.Sleep(time.Second * 2)
}
}

func (t *PDPNotifyTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
Expand Down Expand Up @@ -110,66 +162,11 @@ func (t *PDPNotifyTask) TypeDetails() harmonytask.TaskTypeDetails {
},
MaxFailures: 14,
RetryWait: taskhelp.RetryWaitExp(5*time.Second, 2),
IAmBored: passcall.Every(time.Second, func(taskFunc harmonytask.AddTaskFunc) error {
return t.schedule(context.Background(), taskFunc)
}),
}
}

func (t *PDPNotifyTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error {
var stop bool
for !stop {
taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
stop = true // Assume we're done unless we find more tasks to schedule

// Query for pending notifications where:
// - piece_ref is not null
// - The piece_ref points to a parked_piece_refs entry
// - The parked_piece_refs entry points to a parked_pieces entry where complete = TRUE
// - notify_task_id is NULL

var uploads []struct {
ID string `db:"id"`
}

err := tx.Select(&uploads, `
SELECT pu.id
FROM pdp_piece_uploads pu
JOIN parked_piece_refs pr ON pr.ref_id = pu.piece_ref
JOIN parked_pieces pp ON pp.id = pr.piece_id
WHERE
pu.piece_ref IS NOT NULL
AND pp.complete = TRUE
AND pu.notify_task_id IS NULL
LIMIT 1
`)
if err != nil {
return false, xerrors.Errorf("getting uploads to notify: %w", err)
}

if len(uploads) == 0 {
// No uploads to process
return false, nil
}

// Update the pdp_piece_uploads entry to set notify_task_id
_, err = tx.Exec(`
UPDATE pdp_piece_uploads
SET notify_task_id = $1
WHERE id = $2 AND notify_task_id IS NULL
`, id, uploads[0].ID)
if err != nil {
return false, xerrors.Errorf("updating notify_task_id: %w", err)
}

stop = false // Continue scheduling as there might be more tasks
return true, nil // Commit the transaction
})
}
return nil
}

func (t *PDPNotifyTask) Adder(taskFunc harmonytask.AddTaskFunc) {
t.TF.Set(taskFunc)
}

var _ = harmonytask.Reg(&PDPNotifyTask{})
Expand Down
3 changes: 3 additions & 0 deletions tasks/pdp/task_init_pp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/chainsched"
"github.com/filecoin-project/curio/lib/promise"
"github.com/filecoin-project/curio/pdp/contract"
Expand Down Expand Up @@ -276,6 +278,7 @@ func (ipp *InitProvingPeriodTask) TypeDetails() harmonytask.TaskTypeDetails {
Ram: 1 << 20,
},
MaxFailures: 3, // Set retry limit to 3 attempts
RetryWait: taskhelp.RetryWaitExp(5*time.Second, 2),
}
}

Expand Down
3 changes: 3 additions & 0 deletions tasks/pdp/task_next_pp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"strings"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/chainsched"
"github.com/filecoin-project/curio/lib/promise"
"github.com/filecoin-project/curio/pdp/contract"
Expand Down Expand Up @@ -259,6 +261,7 @@ func (n *NextProvingPeriodTask) TypeDetails() harmonytask.TaskTypeDetails {
Ram: 1 << 20,
},
MaxFailures: 3, // Set retry limit to 3 attempts
RetryWait: taskhelp.RetryWaitExp(5*time.Second, 2),
}
}

Expand Down