diff --git a/cmd/stream.go b/cmd/dataset.go similarity index 77% rename from cmd/stream.go rename to cmd/dataset.go index 5afa8cc..5f19327 100644 --- a/cmd/stream.go +++ b/cmd/dataset.go @@ -17,45 +17,42 @@ package cmd import ( "encoding/json" - "errors" "fmt" "io" "net/http" internalHTTP "pb/pkg/http" - "strconv" - "strings" "time" "github.com/dustin/go-humanize" "github.com/spf13/cobra" ) -// StreamStatsData is the data structure for stream stats -type StreamStatsData struct { +// DatasetStatsData is the data structure for dataset stats +type DatasetStatsData struct { Ingestion struct { Count int `json:"count"` Format string `json:"format"` - Size string `json:"size"` + Size uint64 `json:"size"` } `json:"ingestion"` Storage struct { Format string `json:"format"` - Size string `json:"size"` + Size uint64 `json:"size"` } `json:"storage"` Stream string `json:"stream"` Time time.Time `json:"time"` } -type StreamListItem struct { +type DatasetListItem struct { Name string } -func (item *StreamListItem) Render() string { +func (item *DatasetListItem) Render() string { render := StandardStyle.Render(item.Name) return ItemOuter.Render(render) } -// StreamRetentionData is the data structure for stream retention -type StreamRetentionData []struct { +// DatasetRetentionData is the data structure for dataset retention +type DatasetRetentionData []struct { Description string `json:"description"` Action string `json:"action"` Duration string `json:"duration"` @@ -105,11 +102,11 @@ type RuleConfig struct { Repeats int `json:"repeats"` } -// AddStreamCmd is the parent command for stream -var AddStreamCmd = &cobra.Command{ - Use: "add stream-name", - Example: " pb stream add backend_logs", - Short: "Create a new stream", +// AddDatasetCmd is the parent command for dataset +var AddDatasetCmd = &cobra.Command{ + Use: "add dataset-name", + Example: " pb dataset add backend_logs", + Short: "Create a new dataset", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { // Capture start time @@ -139,7 +136,7 @@ var AddStreamCmd = &cobra.Command{ cmd.Annotations["executionTime"] = time.Since(startTime).String() if resp.StatusCode == 200 { - fmt.Printf("Created stream %s\n", StyleBold.Render(name)) + fmt.Printf("Created dataset %s\n", StyleBold.Render(name)) } else { bytes, err := io.ReadAll(resp.Body) if err != nil { @@ -155,11 +152,11 @@ var AddStreamCmd = &cobra.Command{ }, } -// StatStreamCmd is the stat command for stream -var StatStreamCmd = &cobra.Command{ - Use: "info stream-name", - Example: " pb stream info backend_logs", - Short: "Get statistics for a stream", +// StatDatasetCmd is the stat command for dataset +var StatDatasetCmd = &cobra.Command{ + Use: "info dataset-name", + Example: " pb dataset info backend_logs", + Short: "Get statistics for a dataset", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { // Capture start time @@ -181,9 +178,12 @@ var StatStreamCmd = &cobra.Command{ } ingestionCount := stats.Ingestion.Count - ingestionSize, _ := strconv.Atoi(strings.TrimRight(stats.Ingestion.Size, " Bytes")) - storageSize, _ := strconv.Atoi(strings.TrimRight(stats.Storage.Size, " Bytes")) - compressionRatio := 100 - (float64(storageSize) / float64(ingestionSize) * 100) + ingestionSize := stats.Ingestion.Size + storageSize := stats.Storage.Size + var compressionRatio float64 + if ingestionSize > 0 { + compressionRatio = 100 - (float64(storageSize) / float64(ingestionSize) * 100) + } // Fetch retention data retention, err := fetchRetention(&client, name) @@ -201,8 +201,8 @@ var StatStreamCmd = &cobra.Command{ return err } - // Fetch stream type - streamType, err := fetchInfo(&client, name) + // Fetch dataset type + datasetType, err := fetchInfo(&client, name) if err != nil { // Capture error cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) @@ -220,9 +220,9 @@ var StatStreamCmd = &cobra.Command{ "storage_size": humanize.Bytes(uint64(storageSize)), "compression_ratio": fmt.Sprintf("%.2f%%", compressionRatio), }, - "retention": retention, - "alerts": alertsData.Alerts, - "stream_type": streamType, + "retention": retention, + "alerts": alertsData.Alerts, + "dataset_type": datasetType, } jsonData, err := json.MarshalIndent(data, "", " ") @@ -243,7 +243,7 @@ var StatStreamCmd = &cobra.Command{ fmt.Printf(" %-18s %s\n", "Ingestion Size:", humanize.Bytes(uint64(ingestionSize))) fmt.Printf(" %-18s %s\n", "Storage Size:", humanize.Bytes(uint64(storageSize))) fmt.Printf(" %-18s %.2f%s\n", "Compression Ratio:", compressionRatio, "%") - fmt.Printf(" %-18s %s\n", "Stream Type:", streamType) + fmt.Printf(" %-18s %s\n", "Dataset Type:", datasetType) fmt.Println() if isRetentionSet { @@ -254,7 +254,7 @@ var StatStreamCmd = &cobra.Command{ fmt.Println() } } else { - fmt.Println(StyleBold.Render("No retention period set on stream\n")) + fmt.Println(StyleBold.Render("No retention period set on dataset\n")) } if isAlertsSet { @@ -276,7 +276,7 @@ var StatStreamCmd = &cobra.Command{ fmt.Print("\n\n") } } else { - fmt.Println(StyleBold.Render("No alerts set on stream\n")) + fmt.Println(StyleBold.Render("No alerts set on dataset\n")) } } @@ -285,14 +285,14 @@ var StatStreamCmd = &cobra.Command{ } func init() { - StatStreamCmd.Flags().StringVarP(&outputFormat, "output", "o", "", "Output format (text|json)") + StatDatasetCmd.Flags().StringVarP(&outputFormat, "output", "o", "", "Output format (text|json)") } -var RemoveStreamCmd = &cobra.Command{ - Use: "remove stream-name", +var RemoveDatasetCmd = &cobra.Command{ + Use: "remove dataset-name", Aliases: []string{"rm"}, - Example: " pb stream remove backend_logs", - Short: "Delete a stream", + Example: " pb dataset remove backend_logs", + Short: "Delete a dataset", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { // Capture start time @@ -322,7 +322,7 @@ var RemoveStreamCmd = &cobra.Command{ cmd.Annotations["executionTime"] = time.Since(startTime).String() if resp.StatusCode == 200 { - fmt.Printf("Successfully deleted stream %s\n", StyleBold.Render(name)) + fmt.Printf("Successfully deleted dataset %s\n", StyleBold.Render(name)) } else { bytes, err := io.ReadAll(resp.Body) if err != nil { @@ -338,11 +338,11 @@ var RemoveStreamCmd = &cobra.Command{ }, } -// ListStreamCmd is the list command for streams -var ListStreamCmd = &cobra.Command{ +// ListDatasetCmd is the list command for datasets +var ListDatasetCmd = &cobra.Command{ Use: "list", - Example: " pb stream list", - Short: "List all streams", + Example: " pb dataset list", + Short: "List all datasets", RunE: func(cmd *cobra.Command, _ []string) error { // Capture start time startTime := time.Now() @@ -366,23 +366,23 @@ var ListStreamCmd = &cobra.Command{ return err } - var streams []StreamListItem + var datasets []DatasetListItem if resp.StatusCode == 200 { bytes, err := io.ReadAll(resp.Body) if err != nil { cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } - if err := json.Unmarshal(bytes, &streams); err != nil { + if err := json.Unmarshal(bytes, &datasets); err != nil { cmd.Annotations["errors"] = fmt.Sprintf("Error: %s", err.Error()) return err } - for _, stream := range streams { - fmt.Println(stream.Render()) + for _, dataset := range datasets { + fmt.Println(dataset.Render()) } } else { - fmt.Printf("Failed to fetch streams. Status Code: %s\n", resp.Status) + fmt.Printf("Failed to fetch datasets. Status Code: %s\n", resp.Status) } return nil @@ -391,10 +391,10 @@ var ListStreamCmd = &cobra.Command{ func init() { // Add the --output flag with default value "text" - ListStreamCmd.Flags().StringP("output", "o", "text", "Output format: 'text' or 'json'") + ListDatasetCmd.Flags().StringP("output", "o", "text", "Output format: 'text' or 'json'") } -func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsData, err error) { +func fetchStats(client *internalHTTP.HTTPClient, name string) (data DatasetStatsData, err error) { req, err := client.NewRequest("GET", fmt.Sprintf("logstream/%s/stats", name), nil) if err != nil { return @@ -411,17 +411,18 @@ func fetchStats(client *internalHTTP.HTTPClient, name string) (data StreamStatsD } defer resp.Body.Close() - if resp.StatusCode == 200 { + switch resp.StatusCode { + case http.StatusOK: err = json.Unmarshal(bytes, &data) - } else { - body := string(bytes) - body = fmt.Sprintf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body) - err = errors.New(body) + case http.StatusNotFound: + // stream exists but has no stats yet (empty stream) + default: + err = fmt.Errorf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, string(bytes)) } return } -func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRetentionData, err error) { +func fetchRetention(client *internalHTTP.HTTPClient, name string) (data DatasetRetentionData, err error) { req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/retention", name), nil) if err != nil { return @@ -438,12 +439,13 @@ func fetchRetention(client *internalHTTP.HTTPClient, name string) (data StreamRe } defer resp.Body.Close() - if resp.StatusCode == 200 { + switch resp.StatusCode { + case http.StatusOK: err = json.Unmarshal(bytes, &data) - } else { - body := string(bytes) - body = fmt.Sprintf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body) - err = errors.New(body) + case http.StatusNotFound: + // no retention configured + default: + err = fmt.Errorf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, string(bytes)) } return } @@ -465,17 +467,18 @@ func fetchAlerts(client *internalHTTP.HTTPClient, name string) (data AlertConfig } defer resp.Body.Close() - if resp.StatusCode == 200 { + switch resp.StatusCode { + case http.StatusOK: err = json.Unmarshal(bytes, &data) - } else { - body := string(bytes) - body = fmt.Sprintf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, body) - err = errors.New(body) + case http.StatusNotFound: + // no alerts configured + default: + err = fmt.Errorf("Request Failed\nStatus Code: %s\nResponse: %s\n", resp.Status, string(bytes)) } return } -func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, err error) { +func fetchInfo(client *internalHTTP.HTTPClient, name string) (datasetType string, err error) { // Create a new HTTP GET request req, err := client.NewRequest(http.MethodGet, fmt.Sprintf("logstream/%s/info", name), nil) if err != nil { @@ -496,7 +499,8 @@ func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, } // Check for successful status code - if resp.StatusCode == http.StatusOK { + switch resp.StatusCode { + case http.StatusOK: // Define a struct to parse the response var response struct { StreamType string `json:"stream_type"` @@ -509,10 +513,11 @@ func fetchInfo(client *internalHTTP.HTTPClient, name string) (streamType string, // Return the extracted stream_type return response.StreamType, nil + case http.StatusNotFound: + // endpoint not available on this server version or stream has no type info + return "unknown", nil + default: + // Handle non-200 responses + return "", fmt.Errorf("Request Failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, string(bytes)) } - - // Handle non-200 responses - body := string(bytes) - errMsg := fmt.Sprintf("Request failed\nStatus Code: %d\nResponse: %s\n", resp.StatusCode, body) - return "", errors.New(errMsg) } diff --git a/cmd/generate.go b/cmd/generate.go index 94b8d34..b0e10ab 100644 --- a/cmd/generate.go +++ b/cmd/generate.go @@ -98,17 +98,17 @@ var GenerateSchemaCmd = &cobra.Command{ var CreateSchemaCmd = &cobra.Command{ Use: "create", - Short: "Create Schema for a Parseable stream", - Example: "pb schema create --stream=my_stream --file=schema.json", + Short: "Create Schema for a Parseable dataset", + Example: "pb schema create --dataset=my_dataset --file=schema.json", RunE: func(cmd *cobra.Command, _ []string) error { - // Get the stream name from the `--stream` flag - streamName, err := cmd.Flags().GetString("stream") + // Get the dataset name from the `--dataset` flag + streamName, err := cmd.Flags().GetString("dataset") if err != nil { - return fmt.Errorf(common.Red+"failed to read stream flag: %w"+common.Reset, err) + return fmt.Errorf(common.Red+"failed to read dataset flag: %w"+common.Reset, err) } if streamName == "" { - return fmt.Errorf(common.Red + "stream flag is required" + common.Reset) + return fmt.Errorf(common.Red + "dataset flag is required" + common.Reset) } // Get the file path from the `--file` flag @@ -171,6 +171,6 @@ var CreateSchemaCmd = &cobra.Command{ func init() { // Add the `--file` flag to the command GenerateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to generate schema") - CreateSchemaCmd.Flags().StringP("stream", "s", "", "Name of the stream to associate with the schema") + CreateSchemaCmd.Flags().StringP("dataset", "s", "", "Name of the dataset to associate with the schema") CreateSchemaCmd.Flags().StringP("file", "f", "", "Path to the JSON file to create schema") } diff --git a/cmd/login.go b/cmd/login.go index 2007562..c0382cb 100644 --- a/cmd/login.go +++ b/cmd/login.go @@ -27,7 +27,7 @@ import ( "github.com/spf13/cobra" ) -const defaultCloudURL = "https://staging.parseable.com:8000" +const cloudURL = "https://app.parseable.com" var ( loginToken string @@ -115,7 +115,7 @@ func cloudLogin() error { token := loginToken if token == "" { - loginPageURL := defaultCloudURL + "/login" + loginPageURL := cloudURL + "/login" fmt.Printf("Opening login page: %s\n\n", loginPageURL) if err := openBrowser(loginPageURL); err != nil { @@ -137,7 +137,7 @@ func cloudLogin() error { } profile := config.Profile{ - URL: defaultCloudURL, + URL: cloudURL, Token: token, } if err := writeProfile(profile, loginProfileName); err != nil { @@ -145,7 +145,7 @@ func cloudLogin() error { } fmt.Printf("✓ Logged in. Profile '%s' saved.\n", loginProfileName) - fmt.Printf(" URL: %s\n", defaultCloudURL) + fmt.Printf(" URL: %s\n", cloudURL) return nil } diff --git a/cmd/query.go b/cmd/query.go index 49afd93..2e46468 100644 --- a/cmd/query.go +++ b/cmd/query.go @@ -48,8 +48,8 @@ var ( var query = &cobra.Command{ Use: "run [query] [flags]", Example: " pb query run \"select * from frontend\" --from=10m --to=now", - Short: "Run SQL query on a log stream", - Long: "\nRun SQL query on a log stream. Default output format is text. Use --output flag to set output format to json.", + Short: "Run SQL query on a dataset", + Long: "\nRun SQL query on a dataset. Default output format is text. Use --output flag to set output format to json.", Args: cobra.MaximumNArgs(1), PreRunE: PreRunDefaultProfile, RunE: func(command *cobra.Command, args []string) error { @@ -112,14 +112,16 @@ func init() { var QueryCmd = query func fetchData(client *internalHTTP.HTTPClient, query string, startTime, endTime, outputFormat string) error { - queryTemplate := `{ - "query": "%s", - "startTime": "%s", - "endTime": "%s" - }` - finalQuery := fmt.Sprintf(queryTemplate, query, startTime, endTime) - - req, err := client.NewRequest("POST", "query", bytes.NewBuffer([]byte(finalQuery))) + body, err := json.Marshal(struct { + Query string `json:"query"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + }{Query: query, StartTime: startTime, EndTime: endTime}) + if err != nil { + return fmt.Errorf("failed to build request body: %w", err) + } + + req, err := client.NewRequest("POST", "query", bytes.NewBuffer(body)) if err != nil { return fmt.Errorf("failed to create new request: %w", err) } diff --git a/cmd/role.go b/cmd/role.go index dbd1cbd..143049c 100644 --- a/cmd/role.go +++ b/cmd/role.go @@ -34,7 +34,6 @@ import ( type RoleResource struct { Stream string `json:"stream,omitempty"` - Tag string `json:"tag,omitempty"` } type RoleData struct { @@ -53,11 +52,6 @@ func (user *RoleData) Render() string { s.WriteString(StandardStyleAlt.Render(user.Resource.Stream)) s.WriteString("\n") } - if user.Resource.Tag != "" { - s.WriteString(StandardStyle.Render("Tag: ")) - s.WriteString(StandardStyleAlt.Render(user.Resource.Tag)) - s.WriteString("\n") - } } return s.String() @@ -98,7 +92,6 @@ var AddRoleCmd = &cobra.Command{ m := _m.(role.Model) privilege := m.Selection.Value() stream := m.Stream.Value() - tag := m.Tag.Value() if !m.Success { fmt.Println("aborted by user") @@ -112,7 +105,7 @@ var AddRoleCmd = &cobra.Command{ case "writer", "ingestor": roleData.Resource = &RoleResource{Stream: stream} case "reader": - roleData.Resource = &RoleResource{Stream: stream, Tag: tag} + roleData.Resource = &RoleResource{Stream: stream} } roleDataJSON, _ := json.Marshal([]RoleData{roleData}) putBody = bytes.NewBuffer(roleDataJSON) @@ -287,10 +280,13 @@ func fetchRoles(client *internalHTTP.HTTPClient, data *[]string) error { defer resp.Body.Close() if resp.StatusCode == 200 { - err = json.Unmarshal(bytes, data) - if err != nil { + var roleMap map[string]json.RawMessage + if err = json.Unmarshal(bytes, &roleMap); err != nil { return err } + for name := range roleMap { + *data = append(*data, name) + } } else { body := string(bytes) return fmt.Errorf("request failed\nstatus code: %s\nresponse: %s", resp.Status, body) @@ -317,10 +313,13 @@ func fetchSpecificRole(client *internalHTTP.HTTPClient, role string) (res []Role defer resp.Body.Close() if resp.StatusCode == 200 { - err = json.Unmarshal(bytes, &res) - if err != nil { + var wrapper struct { + Actions []RoleData `json:"actions"` + } + if err = json.Unmarshal(bytes, &wrapper); err != nil { return } + res = wrapper.Actions } else { body := string(bytes) err = fmt.Errorf("request failed\nstatus code: %s\nresponse: %s", resp.Status, body) diff --git a/cmd/tail.go b/cmd/tail.go index 6ccde73..6889786 100644 --- a/cmd/tail.go +++ b/cmd/tail.go @@ -22,22 +22,26 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "pb/pkg/analytics" "pb/pkg/config" internalHTTP "pb/pkg/http" + "time" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/flight" "github.com/spf13/cobra" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) var TailCmd = &cobra.Command{ - Use: "tail stream-name", + Use: "tail dataset-name", Example: " pb tail backend_logs", - Short: "Stream live events from a log stream", + Short: "Stream live events from a dataset", Args: cobra.ExactArgs(1), PreRunE: PreRunDefaultProfile, RunE: func(_ *cobra.Command, args []string) error { @@ -62,34 +66,59 @@ func tail(profile config.Profile, stream string) error { } url := profile.GrpcAddr(fmt.Sprint(about.GRPCPort)) - client, err := flight.NewClientWithMiddleware(url, nil, nil, grpc.WithTransportCredentials(insecure.NewCredentials())) + flightClient, err := flight.NewClientWithMiddleware(url, nil, nil, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return err } authHeader := basicAuth(profile.Username, profile.Password) - resp, err := client.DoGet(metadata.NewOutgoingContext(context.Background(), metadata.New(map[string]string{"Authorization": "Basic " + authHeader})), &flight.Ticket{ - Ticket: payload, - }) - if err != nil { - return err - } - - records, err := flight.NewRecordReader(resp) - if err != nil { - return err - } - defer records.Release() for { - record, err := records.Read() + resp, err := flightClient.DoGet( + metadata.NewOutgoingContext(context.Background(), metadata.New(map[string]string{ + "Authorization": "Basic " + authHeader, + })), + &flight.Ticket{Ticket: payload}, + ) + if err != nil { + return err + } + + records, err := flight.NewRecordReader(resp) if err != nil { return err } - var buf bytes.Buffer - array.RecordToJSON(record, &buf) - fmt.Println(buf.String()) + + for { + record, err := records.Read() + if err != nil { + records.Release() + if isStreamEnd(err) { + break + } + return err + } + var buf bytes.Buffer + array.RecordToJSON(record, &buf) + fmt.Println(buf.String()) + } + + time.Sleep(500 * time.Millisecond) + } +} + +// isStreamEnd returns true for normal stream termination codes that warrant a reconnect. +func isStreamEnd(err error) bool { + if err == io.EOF { + return true + } + if s, ok := status.FromError(err); ok { + switch s.Code() { + case codes.Canceled, codes.Unavailable, codes.OK: + return true + } } + return false } func basicAuth(username, password string) string { diff --git a/cmd/user.go b/cmd/user.go index fc02679..dca5ee8 100644 --- a/cmd/user.go +++ b/cmd/user.go @@ -35,7 +35,22 @@ type UserData struct { Method string `json:"method"` } -type UserRoleData map[string][]RoleData +// UserRoleAction is a single privilege entry within a named role +type UserRoleAction struct { + Privilege string `json:"privilege"` + Resource *RoleResource `json:"resource,omitempty"` +} + +// UserServerRole is a named role definition returned by the server +type UserServerRole struct { + Actions []UserRoleAction `json:"actions"` +} + +// UserRolesResponse is the response from GET /user/{name}/role +type UserRolesResponse struct { + DirectRoles map[string]UserServerRole `json:"roles"` + GroupRoles map[string]map[string]UserServerRole `json:"group_roles"` +} var ( roleFlag = "role" @@ -289,11 +304,16 @@ var ListUserCmd = &cobra.Command{ userID := user.ID client := &client go func() { - var userRolesData UserRoleData - userRolesData, out.err = fetchUserRoles(client, userID) + var rolesResp UserRolesResponse + rolesResp, out.err = fetchUserRoles(client, userID) if out.err == nil { - for role := range userRolesData { - out.data = append(out.data, role) + for roleName := range rolesResp.DirectRoles { + out.data = append(out.data, roleName) + } + for _, groupRoles := range rolesResp.GroupRoles { + for roleName := range groupRoles { + out.data = append(out.data, roleName) + } } } wsg.Done() @@ -393,7 +413,7 @@ func fetchUsers(client *internalHTTP.HTTPClient) (res []UserData, err error) { return } -func fetchUserRoles(client *internalHTTP.HTTPClient, user string) (res UserRoleData, err error) { +func fetchUserRoles(client *internalHTTP.HTTPClient, user string) (res UserRolesResponse, err error) { req, err := client.NewRequest("GET", fmt.Sprintf("user/%s/role", user), nil) if err != nil { return diff --git a/main.go b/main.go index e8875b7..5cd61a6 100644 --- a/main.go +++ b/main.go @@ -85,16 +85,16 @@ var profile = &cobra.Command{ var schema = &cobra.Command{ Use: "schema", - Short: "Generate or create schemas for JSON data or Parseable streams", + Short: "Generate or create schemas for JSON data or Parseable datasets", Long: `The "schema" command allows you to either: - Generate a schema automatically from a JSON file for analysis or integration. - - Create a custom schema for Parseable streams (PB streams) to structure and process your data. + - Create a custom schema for Parseable datasets to structure and process your data. Examples: - To generate a schema from a JSON file: pb schema generate --file=data.json - - To create a schema for a PB stream: - pb schema create --stream-name=my_stream --config=data.json + - To create a schema for a dataset: + pb schema create --dataset=my_dataset --config=data.json `, PersistentPreRunE: combinedPreRun, PersistentPostRun: func(cmd *cobra.Command, args []string) { @@ -143,10 +143,10 @@ var role = &cobra.Command{ }, } -var stream = &cobra.Command{ - Use: "stream", - Short: "Manage streams", - Long: "\nstream command is used to manage streams.", +var dataset = &cobra.Command{ + Use: "dataset", + Short: "Manage datasets", + Long: "\ndataset command is used to manage datasets.", PersistentPreRunE: combinedPreRun, PersistentPostRun: func(cmd *cobra.Command, args []string) { if os.Getenv("PB_ANALYTICS") == "disable" { @@ -155,15 +155,15 @@ var stream = &cobra.Command{ wg.Add(1) go func() { defer wg.Done() - analytics.PostRunAnalytics(cmd, "stream", args) + analytics.PostRunAnalytics(cmd, "dataset", args) }() }, } var query = &cobra.Command{ Use: "query", - Short: "Run SQL query on a log stream", - Long: "\nRun SQL query on a log stream. Default output format is json. Use -i flag to open interactive table view.", + Short: "Run SQL query on a dataset", + Long: "\nRun SQL query on a dataset. Default output format is json. Use -i flag to open interactive table view.", PersistentPreRunE: combinedPreRun, PersistentPostRun: func(cmd *cobra.Command, args []string) { if os.Getenv("PB_ANALYTICS") == "disable" { @@ -260,10 +260,10 @@ func main() { role.AddCommand(pb.RemoveRoleCmd) role.AddCommand(pb.ListRoleCmd) - stream.AddCommand(pb.AddStreamCmd) - stream.AddCommand(pb.RemoveStreamCmd) - stream.AddCommand(pb.ListStreamCmd) - stream.AddCommand(pb.StatStreamCmd) + dataset.AddCommand(pb.AddDatasetCmd) + dataset.AddCommand(pb.RemoveDatasetCmd) + dataset.AddCommand(pb.ListDatasetCmd) + dataset.AddCommand(pb.StatDatasetCmd) query.AddCommand(pb.QueryCmd) query.AddCommand(pb.SavedQueryList) @@ -284,7 +284,7 @@ func main() { cli.AddCommand(profile) cli.AddCommand(query) - cli.AddCommand(stream) + cli.AddCommand(dataset) cli.AddCommand(user) cli.AddCommand(role) cli.AddCommand(pb.TailCmd) diff --git a/pkg/installer/installer.go b/pkg/installer/installer.go index b9a98ca..0699bd5 100644 --- a/pkg/installer/installer.go +++ b/pkg/installer/installer.go @@ -26,6 +26,7 @@ import ( "net" "os" "os/exec" + "regexp" "runtime" "strings" "sync" @@ -71,6 +72,7 @@ func waterFall(verbose bool) { if plan.Name == "Playground" { chartValues = append(chartValues, "parseable.store=local-store") chartValues = append(chartValues, "parseable.localModeSecret.enabled=true") + chartValues = append(chartValues, "parseable.auditLogging.enabled=false") // Prompt for namespace and credentials pbInfo, err := promptNamespaceAndCredentials() @@ -95,7 +97,7 @@ func waterFall(verbose bool) { RepoName: "parseable", RepoURL: "https://charts.parseable.com", ChartName: "parseable", - Version: "1.6.6", + Version: "2.6.6", Values: agentValues, Verbose: verbose, } @@ -120,6 +122,7 @@ func waterFall(verbose bool) { // pb supports only distributed deployments chartValues = append(chartValues, "parseable.highAvailability.enabled=true") + chartValues = append(chartValues, "parseable.auditLogging.enabled=false") // Prompt for namespace and credentials pbInfo, err := promptNamespaceAndCredentials() @@ -156,7 +159,7 @@ func waterFall(verbose bool) { RepoName: "parseable", RepoURL: "https://charts.parseable.com", ChartName: "parseable", - Version: "1.6.6", + Version: "2.6.6", Values: storeConfigs, Verbose: verbose, } @@ -227,6 +230,12 @@ func promptStorageClass() (string, error) { } // promptNamespaceAndCredentials prompts the user for namespace and credentials +var helmReleaseNameRe = regexp.MustCompile(`^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$`) + +func isValidReleaseName(name string) bool { + return len(name) <= 53 && helmReleaseNameRe.MatchString(name) +} + func promptNamespaceAndCredentials() (*ParseableInfo, error) { // Prompt user for release name fmt.Print(common.Yellow + "Enter the Name for deployment: " + common.Reset) @@ -236,6 +245,9 @@ func promptNamespaceAndCredentials() (*ParseableInfo, error) { return nil, fmt.Errorf("failed to read namespace: %w", err) } name = strings.TrimSpace(name) + if !isValidReleaseName(name) { + return nil, fmt.Errorf("invalid deployment name %q: must be lowercase alphanumeric and hyphens only, max 53 chars (e.g. parseable-test)", name) + } // Prompt user for namespace fmt.Print(common.Yellow + "Enter the Kubernetes namespace for deployment: " + common.Reset) @@ -667,10 +679,23 @@ func applyManifest(manifest string) error { return fmt.Errorf("failed to get GVR: %w", err) } - // Apply the manifest using the dynamic client - _, err = dynamicClient.Resource(gvr).Namespace(namespace).Create(context.TODO(), &obj, metav1.CreateOptions{}) + // Apply the manifest: create if new, update if it already exists + resourceClient := dynamicClient.Resource(gvr).Namespace(namespace) + existing, err := resourceClient.Get(context.TODO(), obj.GetName(), metav1.GetOptions{}) if err != nil { - return fmt.Errorf("failed to apply manifest: %w", err) + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to check existing resource: %w", err) + } + _, err = resourceClient.Create(context.TODO(), &obj, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to apply manifest: %w", err) + } + } else { + obj.SetResourceVersion(existing.GetResourceVersion()) + _, err = resourceClient.Update(context.TODO(), &obj, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update manifest: %w", err) + } } return nil }