Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ TEMPLATES_DIR=templates
PLUGIN_PACKAGES ?= $(PLUGIN_PACKAGES:)
PLUGIN_PACKAGES += mattermost-plugin-calls-v1.11.4
PLUGIN_PACKAGES += mattermost-plugin-github-v2.6.0
PLUGIN_PACKAGES += mattermost-plugin-gitlab-v1.12.0
PLUGIN_PACKAGES += mattermost-plugin-gitlab-v1.12.1
PLUGIN_PACKAGES += mattermost-plugin-jira-v4.5.1
PLUGIN_PACKAGES += mattermost-plugin-playbooks-v2.8.0
PLUGIN_PACKAGES += mattermost-plugin-servicenow-v2.4.0
Expand Down
10 changes: 9 additions & 1 deletion server/channels/jobs/import_process/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,16 @@ func MakeWorker(jobServer *jobs.JobServer, app AppIface) *jobs.SimpleWorker {
defer jsonFile.Close()

extractContent := job.Data["extract_content"] == "true"

numWorkers := runtime.NumCPU()
if workersStr, ok := job.Data["workers"]; ok {
if n, err := strconv.Atoi(workersStr); err == nil && n > 0 {
numWorkers = n
}
}

// do the actual import.
lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, runtime.NumCPU(), model.ExportDataDir)
lineNumber, appErr := app.BulkImportWithPath(appContext, jsonFile, importZipReader, false, extractContent, numWorkers, model.ExportDataDir)
if appErr != nil {
job.Data["line_number"] = strconv.Itoa(lineNumber)
return appErr
Expand Down
23 changes: 18 additions & 5 deletions server/cmd/mmctl/commands/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"text/template"
Expand Down Expand Up @@ -123,6 +124,7 @@ func init() {

ImportProcessCmd.Flags().Bool("bypass-upload", false, "If this is set, the file is not processed from the server, but rather directly read from the filesystem. Works only in --local mode.")
ImportProcessCmd.Flags().Bool("extract-content", true, "If this is set, document attachments will be extracted and indexed during the import process. It is advised to disable it to improve performance.")
ImportProcessCmd.Flags().Int("workers", 0, "The number of concurrent import worker goroutines. Controls database load during import. When set to 0 (default), uses the number of CPUs available. Maximum allowed is 4x the CPU count.")

ImportListCmd.AddCommand(
ImportListAvailableCmd,
Expand Down Expand Up @@ -310,14 +312,25 @@ func importProcessCmdF(c client.Client, command *cobra.Command, args []string) e
}

extractContent, _ := command.Flags().GetBool("extract-content")
workers, _ := command.Flags().GetInt("workers")

maxWorkers := runtime.NumCPU() * 4
if workers > maxWorkers {
return fmt.Errorf("workers value %d exceeds maximum allowed (%d = 4 * CPU count)", workers, maxWorkers)
}

jobData := map[string]string{
"import_file": importFile,
"local_mode": strconv.FormatBool(isLocal && bypassUpload),
"extract_content": strconv.FormatBool(extractContent),
}
if workers > 0 {
jobData["workers"] = strconv.Itoa(workers)
}

job, _, err := c.CreateJob(context.TODO(), &model.Job{
Type: model.JobTypeImportProcess,
Data: map[string]string{
"import_file": importFile,
"local_mode": strconv.FormatBool(isLocal && bypassUpload),
"extract_content": strconv.FormatBool(extractContent),
},
Data: jobData,
})
if err != nil {
return fmt.Errorf("failed to create import process job: %w", err)
Expand Down
103 changes: 81 additions & 22 deletions server/cmd/mmctl/commands/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -211,28 +213,85 @@ func (s *MmctlUnitTestSuite) TestImportJobListCmdF() {
}

func (s *MmctlUnitTestSuite) TestImportProcessCmdF() {
printer.Clean()
importFile := "import.zip"
mockJob := &model.Job{
Type: model.JobTypeImportProcess,
Data: map[string]string{
"import_file": importFile,
"local_mode": "false",
"extract_content": "false",
},
}

s.client.
EXPECT().
CreateJob(context.TODO(), mockJob).
Return(mockJob, &model.Response{}, nil).
Times(1)

err := importProcessCmdF(s.client, &cobra.Command{}, []string{importFile})
s.Require().Nil(err)
s.Len(printer.GetLines(), 1)
s.Empty(printer.GetErrorLines())
s.Equal(mockJob, printer.GetLines()[0].(*model.Job))
s.Run("default workers", func() {
printer.Clean()
importFile := "import.zip"
mockJob := &model.Job{
Type: model.JobTypeImportProcess,
Data: map[string]string{
"import_file": importFile,
"local_mode": "false",
"extract_content": "false",
},
}

s.client.
EXPECT().
CreateJob(context.TODO(), mockJob).
Return(mockJob, &model.Response{}, nil).
Times(1)

cmd := &cobra.Command{}
cmd.Flags().Bool("bypass-upload", false, "")
cmd.Flags().Bool("extract-content", false, "")
cmd.Flags().Int("workers", 0, "")

err := importProcessCmdF(s.client, cmd, []string{importFile})
s.Require().Nil(err)
s.Len(printer.GetLines(), 1)
s.Empty(printer.GetErrorLines())
s.Equal(mockJob, printer.GetLines()[0].(*model.Job))
})

s.Run("workers exceeds max", func() {
printer.Clean()
importFile := "import.zip"
tooMany := runtime.NumCPU()*4 + 1

cmd := &cobra.Command{}
cmd.Flags().Bool("bypass-upload", false, "")
cmd.Flags().Bool("extract-content", false, "")
cmd.Flags().Int("workers", 0, "")
_ = cmd.Flags().Set("workers", strconv.Itoa(tooMany))

err := importProcessCmdF(s.client, cmd, []string{importFile})
s.Require().NotNil(err)
s.Contains(err.Error(), "exceeds maximum allowed")
s.Empty(printer.GetLines())
s.Empty(printer.GetErrorLines())
})

s.Run("custom workers", func() {
printer.Clean()
importFile := "import.zip"
mockJob := &model.Job{
Type: model.JobTypeImportProcess,
Data: map[string]string{
"import_file": importFile,
"local_mode": "false",
"extract_content": "false",
"workers": "2",
},
}

s.client.
EXPECT().
CreateJob(context.TODO(), mockJob).
Return(mockJob, &model.Response{}, nil).
Times(1)

cmd := &cobra.Command{}
cmd.Flags().Bool("bypass-upload", false, "")
cmd.Flags().Bool("extract-content", false, "")
cmd.Flags().Int("workers", 0, "")
_ = cmd.Flags().Set("workers", "2")

err := importProcessCmdF(s.client, cmd, []string{importFile})
s.Require().Nil(err)
s.Len(printer.GetLines(), 1)
s.Empty(printer.GetErrorLines())
s.Equal(mockJob, printer.GetLines()[0].(*model.Job))
})
}

func (s *MmctlUnitTestSuite) TestImportValidateCmdF() {
Expand Down
Loading
Loading