-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrun.go
More file actions
167 lines (143 loc) · 5.07 KB
/
run.go
File metadata and controls
167 lines (143 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/initializ/forge/forge-cli/channels"
"github.com/initializ/forge/forge-cli/runtime"
"github.com/initializ/forge/forge-core/a2a"
corechannels "github.com/initializ/forge/forge-core/channels"
"github.com/spf13/cobra"
)
var (
runPort int
runHost string
runShutdownTimeout time.Duration
runMockTools bool
runEnforceGuardrails bool
runNoGuardrails bool
runModel string
runProvider string
runEnvFile string
runWithChannels string
runNoAuth bool
runAuthToken string
)
var runCmd = &cobra.Command{
Use: "run",
Short: "Run the agent locally with an A2A-compliant dev server",
RunE: runRun,
}
func init() {
runCmd.Flags().IntVar(&runPort, "port", 8080, "port for the A2A dev server")
runCmd.Flags().StringVar(&runHost, "host", "", "bind address (e.g. 0.0.0.0 for containers)")
runCmd.Flags().DurationVar(&runShutdownTimeout, "shutdown-timeout", 0, "graceful shutdown timeout (e.g. 30s)")
runCmd.Flags().BoolVar(&runMockTools, "mock-tools", false, "use mock runtime instead of subprocess")
runCmd.Flags().BoolVar(&runEnforceGuardrails, "enforce-guardrails", true, "enforce guardrail violations as errors")
runCmd.Flags().BoolVar(&runNoGuardrails, "no-guardrails", false, "disable all guardrail enforcement")
runCmd.Flags().StringVar(&runModel, "model", "", "override model name (sets MODEL_NAME env var)")
runCmd.Flags().StringVar(&runProvider, "provider", "", "LLM provider (openai, anthropic, ollama)")
runCmd.Flags().StringVar(&runEnvFile, "env", ".env", "path to .env file")
runCmd.Flags().StringVar(&runWithChannels, "with", "", "comma-separated channel adapters to start (e.g. slack,telegram)")
runCmd.Flags().BoolVar(&runNoAuth, "no-auth", false, "disable bearer token authentication (localhost only)")
runCmd.Flags().StringVar(&runAuthToken, "auth-token", "", "explicit bearer token (default: auto-generated)")
}
func runRun(cmd *cobra.Command, args []string) error {
cfg, workDir, err := loadAndPrepareConfig(runEnvFile)
if err != nil {
return err
}
activeChannels := parseChannels(runWithChannels)
enforceGuardrails := runEnforceGuardrails
if runNoGuardrails {
enforceGuardrails = false
}
runner, err := runtime.NewRunner(runtime.RunnerConfig{
Config: cfg,
WorkDir: workDir,
Port: runPort,
Host: runHost,
ShutdownTimeout: runShutdownTimeout,
MockTools: runMockTools,
EnforceGuardrails: enforceGuardrails,
ModelOverride: runModel,
ProviderOverride: runProvider,
EnvFilePath: resolveEnvPath(workDir, runEnvFile),
Verbose: verbose,
Channels: activeChannels,
NoAuth: runNoAuth,
AuthToken: runAuthToken,
})
if err != nil {
return fmt.Errorf("creating runner: %w", err)
}
// Resolve auth token early so channel adapters can use it.
if err := runner.ResolveAuth(); err != nil {
return fmt.Errorf("resolving auth: %w", err)
}
// Set up signal handling
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Fprintln(os.Stderr, "\nShutting down...")
cancel()
}()
// Start channel adapters if --with flag is set
if runWithChannels != "" {
registry := defaultRegistry()
agentURL := fmt.Sprintf("http://localhost:%d", runPort)
router := channels.NewRouter(agentURL, runner.AuthToken())
// Collect initialized plugins so the scheduler can deliver results.
activePlugins := make(map[string]corechannels.ChannelPlugin)
names := strings.Split(runWithChannels, ",")
for _, name := range names {
name = strings.TrimSpace(name)
if name == "" {
continue
}
plugin := registry.Get(name)
if plugin == nil {
return fmt.Errorf("unknown channel adapter: %s", name)
}
chCfgPath := filepath.Join(workDir, name+"-config.yaml")
chCfg, err := channels.LoadChannelConfig(chCfgPath)
if err != nil {
return fmt.Errorf("loading %s config: %w", name, err)
}
if err := plugin.Init(*chCfg); err != nil {
return fmt.Errorf("initialising %s: %w", name, err)
}
defer plugin.Stop() //nolint:errcheck
activePlugins[name] = plugin
go func() {
if err := plugin.Start(ctx, router.Handler()); err != nil {
fmt.Fprintf(os.Stderr, "channel %s error: %v\n", plugin.Name(), err)
}
}()
fmt.Fprintf(os.Stderr, " Channel: %s adapter started\n", name)
}
// Wire up schedule notifier so cron results are delivered to channels.
if len(activePlugins) > 0 {
runner.SetScheduleNotifier(func(ctx context.Context, channel, target string, response *a2a.Message) error {
plugin, ok := activePlugins[channel]
if !ok {
return fmt.Errorf("channel adapter %q not active", channel)
}
event := &corechannels.ChannelEvent{
Channel: channel,
WorkspaceID: target,
}
return plugin.SendResponse(event, response)
})
}
}
return runner.Run(ctx)
}