1+ package main
2+
3+ import (
4+ "context"
5+ "fmt"
6+ "log"
7+ "time"
8+
9+ "github.com/rizome-dev/arc/pkg/messagequeue"
10+ "github.com/rizome-dev/arc/pkg/orchestrator"
11+ "github.com/rizome-dev/arc/pkg/runtime"
12+ "github.com/rizome-dev/arc/pkg/state"
13+ "github.com/rizome-dev/arc/pkg/types"
14+ )
15+
16+ func main () {
17+ ctx := context .Background ()
18+
19+ // Try to create gVisor runtime, fall back to Docker if not available
20+ var runtimeInstance runtime.Runtime
21+ var runtimeName string
22+
23+ gvisorRuntime , err := runtime .NewGVisorRuntime (runtime.Config {
24+ Type : "gvisor" ,
25+ Labels : map [string ]string {
26+ "managed-by" : "arc" ,
27+ "runtime" : "gvisor" ,
28+ },
29+ })
30+ if err != nil {
31+ fmt .Printf ("gVisor runtime not available (%v), falling back to Docker...\n " , err )
32+
33+ // Fall back to Docker runtime
34+ dockerRuntime , dockerErr := runtime .NewDockerRuntime (runtime.Config {
35+ Type : "docker" ,
36+ Labels : map [string ]string {
37+ "managed-by" : "arc" ,
38+ "runtime" : "docker" ,
39+ },
40+ })
41+ if dockerErr != nil {
42+ log .Fatalf ("Failed to create Docker runtime: %v" , dockerErr )
43+ }
44+ runtimeInstance = dockerRuntime
45+ runtimeName = "Docker"
46+ } else {
47+ runtimeInstance = gvisorRuntime
48+ runtimeName = "gVisor"
49+ }
50+
51+ // Create message queue
52+ mq , err := messagequeue .NewAMQMessageQueue (messagequeue.Config {
53+ StorePath : "./arc-amq-data" ,
54+ WorkerPoolSize : 10 ,
55+ MessageTimeout : 300 , // 5 minutes
56+ })
57+ if err != nil {
58+ log .Fatalf ("Failed to create message queue: %v" , err )
59+ }
60+ defer mq .Close ()
61+
62+ // Create state manager
63+ stateManager := state .NewMemoryStore ()
64+ if err := stateManager .Initialize (ctx ); err != nil {
65+ log .Fatalf ("Failed to initialize state manager: %v" , err )
66+ }
67+ defer stateManager .Close (ctx )
68+
69+ // Create orchestrator
70+ arc , err := orchestrator .New (orchestrator.Config {
71+ Runtime : runtimeInstance ,
72+ MessageQueue : mq ,
73+ StateManager : stateManager ,
74+ })
75+ if err != nil {
76+ log .Fatalf ("Failed to create orchestrator: %v" , err )
77+ }
78+
79+ // Start orchestrator
80+ if err := arc .Start (); err != nil {
81+ log .Fatalf ("Failed to start orchestrator: %v" , err )
82+ }
83+ defer arc .Stop ()
84+
85+ // Get runtime info if gVisor
86+ if gvisorRT , ok := runtimeInstance .(* runtime.GVisorRuntime ); ok && runtimeName == "gVisor" {
87+ info , err := gvisorRT .GetRuntimeInfo (ctx )
88+ if err == nil {
89+ fmt .Printf ("%s Runtime Info:\n " , runtimeName )
90+ for k , v := range info {
91+ fmt .Printf (" %s: %v\n " , k , v )
92+ }
93+ fmt .Println ()
94+ }
95+ }
96+
97+ // Create a sample workflow with sandboxed containers
98+ workflow := & types.Workflow {
99+ Name : "secure-data-processing" ,
100+ Description : "A secure data processing workflow using gVisor sandboxing" ,
101+ Tasks : []types.Task {
102+ {
103+ Name : "fetch-data" ,
104+ AgentConfig : types.AgentConfig {
105+ Command : []string {"alpine" },
106+ Args : []string {"sh" , "-c" , "echo 'Fetching data in gVisor sandbox...'; sleep 5; echo 'Data fetched securely!'" },
107+ Environment : map [string ]string {
108+ "TASK_TYPE" : "fetch" ,
109+ "RUNTIME" : "gvisor" ,
110+ },
111+ MessageQueue : types.MessageQueueConfig {
112+ Topics : []string {"secure-pipeline" },
113+ },
114+ Resources : types.ResourceRequirements {
115+ CPU : "500m" ,
116+ Memory : "256Mi" ,
117+ },
118+ },
119+ },
120+ {
121+ Name : "process-data" ,
122+ Dependencies : []string {}, // Will be set after task IDs are generated
123+ AgentConfig : types.AgentConfig {
124+ Command : []string {"alpine" },
125+ Args : []string {"sh" , "-c" , "echo 'Processing data in isolated environment...'; sleep 10; echo 'Data processed with isolation!'" },
126+ Environment : map [string ]string {
127+ "TASK_TYPE" : "process" ,
128+ "RUNTIME" : "gvisor" ,
129+ },
130+ MessageQueue : types.MessageQueueConfig {
131+ Topics : []string {"secure-pipeline" },
132+ },
133+ Resources : types.ResourceRequirements {
134+ CPU : "1000m" ,
135+ Memory : "512Mi" ,
136+ },
137+ },
138+ },
139+ {
140+ Name : "store-results" ,
141+ Dependencies : []string {}, // Will be set after task IDs are generated
142+ AgentConfig : types.AgentConfig {
143+ Command : []string {"alpine" },
144+ Args : []string {"sh" , "-c" , "echo 'Storing results securely...'; sleep 3; echo 'Results stored in sandbox!'" },
145+ Environment : map [string ]string {
146+ "TASK_TYPE" : "store" ,
147+ "RUNTIME" : "gvisor" ,
148+ },
149+ MessageQueue : types.MessageQueueConfig {
150+ Topics : []string {"secure-pipeline" },
151+ },
152+ Resources : types.ResourceRequirements {
153+ CPU : "250m" ,
154+ Memory : "128Mi" ,
155+ },
156+ },
157+ },
158+ },
159+ }
160+
161+ // Create workflow
162+ if err := arc .CreateWorkflow (ctx , workflow ); err != nil {
163+ log .Fatalf ("Failed to create workflow: %v" , err )
164+ }
165+
166+ // Set dependencies after workflow creation (tasks now have IDs)
167+ workflow .Tasks [1 ].Dependencies = []string {workflow .Tasks [0 ].ID }
168+ workflow .Tasks [2 ].Dependencies = []string {workflow .Tasks [1 ].ID }
169+
170+ fmt .Printf ("Created workflow: %s (ID: %s)\n " , workflow .Name , workflow .ID )
171+ fmt .Printf ("Using %s runtime for container execution\n " , runtimeName )
172+ if runtimeName == "gVisor" {
173+ fmt .Println ("Enhanced security and isolation enabled" )
174+ }
175+ fmt .Println ()
176+
177+ // Start workflow execution
178+ fmt .Printf ("Starting workflow execution with %s runtime...\n " , runtimeName )
179+ if err := arc .StartWorkflow (ctx , workflow .ID ); err != nil {
180+ log .Fatalf ("Failed to start workflow: %v" , err )
181+ }
182+
183+ // Monitor workflow progress
184+ ticker := time .NewTicker (2 * time .Second )
185+ defer ticker .Stop ()
186+
187+ for {
188+ select {
189+ case <- ticker .C :
190+ wf , err := arc .GetWorkflow (ctx , workflow .ID )
191+ if err != nil {
192+ log .Printf ("Failed to get workflow status: %v" , err )
193+ continue
194+ }
195+
196+ fmt .Printf ("\n Workflow Status: %s\n " , wf .Status )
197+ for _ , task := range wf .Tasks {
198+ fmt .Printf (" Task '%s' [%s]: %s\n " , task .Name , runtimeName , task .Status )
199+ }
200+
201+ // Check if workflow is complete
202+ if wf .Status == types .WorkflowStatusCompleted {
203+ fmt .Printf ("\n Workflow completed successfully with %s runtime!\n " , runtimeName )
204+ if runtimeName == "gVisor" {
205+ fmt .Println ("All tasks ran in secure sandboxed environments." )
206+ }
207+ return
208+ } else if wf .Status == types .WorkflowStatusFailed {
209+ fmt .Printf ("\n Workflow failed: %s\n " , wf .Error )
210+ return
211+ }
212+ case <- time .After (2 * time .Minute ):
213+ fmt .Println ("\n Timeout waiting for workflow to complete" )
214+ return
215+ }
216+ }
217+ }
0 commit comments