Skip to content

Commit f727283

Browse files
committed
feat: Support Managed Instances entrypoint
1 parent 7ae5682 commit f727283

File tree

5 files changed

+673
-140
lines changed

5 files changed

+673
-140
lines changed

cmd/localstack/main.go

Lines changed: 68 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1-
// main entrypoint of init
2-
// initial structure based upon /cmd/aws-lambda-rie/main.go
31
package main
42

53
import (
6-
"context"
4+
"io"
5+
"log/slog"
76
"os"
87
"runtime/debug"
9-
"strconv"
108
"strings"
11-
"time"
129

13-
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
14-
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore"
10+
mlogging "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/logging"
11+
12+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapidcore/env"
1513
log "github.com/sirupsen/logrus"
1614
)
1715

@@ -105,160 +103,90 @@ func main() {
105103
UnsetLsEnvs()
106104

107105
// set up logging following the Logrus logging levels: https://github.com/sirupsen/logrus#level-logging
106+
configureLogging(lsOpts.InitLogLevel)
107+
108+
// Download code archives
109+
if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
110+
log.Fatal("Failed to download code archives: " + err.Error())
111+
}
112+
113+
if err := AdaptFilesystemPermissions(lsOpts.ChmodPaths); err != nil {
114+
log.Warnln("Could not change file mode of code directories:", err)
115+
}
116+
117+
// Check if running in managed mode
118+
if _, ok := os.LookupEnv(env.AWS_LAMBDA_MAX_CONCURRENCY); ok {
119+
runManaged(lsOpts)
120+
return
121+
}
122+
123+
runStandard(lsOpts)
124+
}
125+
126+
func doInitDaemon(addr, port string, enable bool, lvl string) *Daemon {
127+
endpoint := "http://" + addr + ":" + port
128+
xrayConfig := initConfig(endpoint, getXRayLogLevel(lvl))
129+
d := initDaemon(xrayConfig, enable)
130+
runDaemon(d)
131+
return d
132+
}
133+
134+
func configureManagedLogger(logLevel string) {
135+
level := slogLevelFromString(logLevel)
136+
slog.SetDefault(mlogging.CreateNewLogger(level, io.Writer(os.Stderr)))
137+
}
138+
139+
func configureStandardLogger(logLevel string) {
140+
log.SetOutput(os.Stderr)
141+
}
142+
143+
func configureLogging(logLevel string) {
108144
log.SetReportCaller(true)
109-
// https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html
110-
xRayLogLevel := "info"
111-
switch lsOpts.InitLogLevel {
145+
switch logLevel {
112146
case "trace":
113147
log.SetFormatter(&log.JSONFormatter{})
114148
log.SetLevel(log.TraceLevel)
115-
xRayLogLevel = "debug"
116149
case "debug":
117150
log.SetLevel(log.DebugLevel)
118-
xRayLogLevel = "debug"
119151
case "info":
120152
log.SetLevel(log.InfoLevel)
121153
case "warn":
122154
log.SetLevel(log.WarnLevel)
123-
xRayLogLevel = "warn"
124155
case "error":
125156
log.SetLevel(log.ErrorLevel)
126-
xRayLogLevel = "error"
127157
case "fatal":
128158
log.SetLevel(log.FatalLevel)
129-
xRayLogLevel = "error"
130159
case "panic":
131160
log.SetLevel(log.PanicLevel)
132-
xRayLogLevel = "error"
133161
default:
134162
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
135163
}
164+
}
136165

137-
// patch MaxPayloadSize
138-
payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize)
139-
if err != nil {
140-
log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE")
141-
}
142-
interop.MaxPayloadSize = payloadSize
143-
144-
// download code archive if env variable is set
145-
if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
146-
log.Fatal("Failed to download code archives: " + err.Error())
147-
}
148-
149-
if err := AdaptFilesystemPermissions(lsOpts.ChmodPaths); err != nil {
150-
log.Warnln("Could not change file mode of code directories:", err)
151-
}
152-
153-
// parse CLI args
154-
bootstrap, handler := getBootstrap(os.Args)
155-
156-
// Switch to non-root user and drop root privileges
157-
if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" {
158-
uid := 993
159-
gid := 990
160-
AddUser(lsOpts.User, uid, gid)
161-
if err := os.Chown("/tmp", uid, gid); err != nil {
162-
log.Warnln("Could not change owner of directory /tmp:", err)
163-
}
164-
UserLogger().Debugln("Process running as root user.")
165-
err := DropPrivileges(lsOpts.User)
166-
if err != nil {
167-
log.Warnln("Could not drop root privileges.", err)
168-
} else {
169-
UserLogger().Debugln("Process running as non-root user.")
170-
}
171-
}
172-
173-
// file watcher for hot-reloading
174-
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())
175-
176-
logCollector := NewLogCollector()
177-
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
178-
tracer := NewLocalStackTracer()
179-
180-
// build sandbox
181-
sandbox := rapidcore.
182-
NewSandboxBuilder().
183-
//SetTracer(tracer).
184-
AddShutdownFunc(func() {
185-
log.Debugln("Stopping file watcher")
186-
cancelFileWatcher()
187-
}).
188-
SetExtensionsFlag(true).
189-
SetInitCachingFlag(true).
190-
SetLogsEgressAPI(localStackLogsEgressApi).
191-
SetTracer(tracer)
192-
193-
// Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable.
194-
// We need to ensure the runtime server is up before the INIT phase,
195-
// but this envar is only set after the InitHandler is called.
196-
runtimeAPIAddress := "127.0.0.1:9001"
197-
sandbox.SetRuntimeAPIAddress(runtimeAPIAddress)
198-
199-
// xray daemon
200-
endpoint := "http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort
201-
xrayConfig := initConfig(endpoint, xRayLogLevel)
202-
d := initDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1")
203-
sandbox.AddShutdownFunc(func() {
204-
log.Debugln("Shutting down xray daemon")
205-
d.stop()
206-
log.Debugln("Flushing segments in xray daemon")
207-
d.close()
208-
})
209-
runDaemon(d) // async
210-
211-
defaultInterop := sandbox.DefaultInteropServer()
212-
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
213-
sandbox.SetInteropServer(interopServer)
214-
if len(handler) > 0 {
215-
sandbox.SetHandler(handler)
216-
}
217-
exitChan := make(chan struct{})
218-
sandbox.AddShutdownFunc(func() {
219-
exitChan <- struct{}{}
220-
})
221-
222-
// initialize all flows and start runtime API
223-
sandboxContext, internalStateFn := sandbox.Create()
224-
// Populate our custom interop server
225-
interopServer.SetSandboxContext(sandboxContext)
226-
interopServer.SetInternalStateGetter(internalStateFn)
227-
228-
// get timeout
229-
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing
230-
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
231-
if err != nil {
232-
log.Fatalln(err)
233-
}
234-
go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
235-
236-
log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddress)
237-
// Fixes https://github.com/localstack/localstack/issues/12680
238-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
239-
if err := waitForRuntimeAPI(ctx, runtimeAPIAddress); err != nil {
240-
log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddress, err.Error())
241-
}
242-
cancel()
243-
244-
// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
245-
// notification channels and status fields are properly initialized before `AwaitInitialized`
246-
log.Debugln("Starting runtime init.")
247-
InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init
248-
249-
log.Debugln("Awaiting initialization of runtime init.")
250-
if err := interopServer.delegate.AwaitInitialized(); err != nil {
251-
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
252-
log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.")
253-
// NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
254-
// callback SendInitErrorResponse because it contains the correct error response payload.
255-
return
166+
func slogLevelFromString(logLevel string) slog.Level {
167+
switch logLevel {
168+
case "trace", "debug":
169+
return slog.LevelDebug
170+
case "info":
171+
return slog.LevelInfo
172+
case "warn":
173+
return slog.LevelWarn
174+
case "error", "fatal", "panic":
175+
return slog.LevelError
176+
default:
177+
return slog.LevelInfo
256178
}
179+
}
257180

258-
log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
259-
if err := interopServer.localStackAdapter.SendStatus(Ready, []byte{}); err != nil {
260-
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
181+
func getXRayLogLevel(initLogLevel string) string {
182+
switch initLogLevel {
183+
case "trace", "debug":
184+
return "debug"
185+
case "warn":
186+
return "warn"
187+
case "error", "fatal", "panic":
188+
return "error"
189+
default:
190+
return "info"
261191
}
262-
263-
<-exitChan
264192
}

cmd/localstack/run_managed.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"strconv"
9+
"syscall"
10+
"time"
11+
12+
rie "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie"
13+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/model"
14+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/supervisor/local"
15+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/utils"
16+
log "github.com/sirupsen/logrus"
17+
)
18+
19+
func runManaged(lsOpts *LsOpts) {
20+
configureManagedLogger(lsOpts.InitLogLevel)
21+
22+
// Initialize X-Ray daemon
23+
d := doInitDaemon(
24+
lsOpts.LocalstackIP,
25+
lsOpts.EdgePort,
26+
lsOpts.EnableXRayTelemetry == "1",
27+
lsOpts.InitLogLevel,
28+
)
29+
30+
defer func() {
31+
log.Debugln("Shutting down xray daemon")
32+
d.stop()
33+
log.Debugln("Flushing segments in xray daemon")
34+
d.close()
35+
}()
36+
37+
var credential *syscall.Credential
38+
if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" {
39+
uid := 993
40+
gid := 990
41+
AddUser(lsOpts.User, uid, gid)
42+
if err := os.Chown("/tmp", uid, gid); err != nil {
43+
log.Warnln("Could not change owner of directory /tmp:", err)
44+
}
45+
46+
credential = &syscall.Credential{
47+
Uid: uint32(uid),
48+
Gid: uint32(gid),
49+
}
50+
51+
UserLogger().Debugln("Configured runtime to run as non-root user:", lsOpts.User)
52+
}
53+
54+
adapter := LocalStackAdapter{
55+
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
56+
RuntimeId: lsOpts.RuntimeId,
57+
}
58+
59+
rieAddr := fmt.Sprintf("0.0.0.0:%s", lsOpts.InteropPort)
60+
rapiAddr := "127.0.0.1:9001"
61+
62+
sigChan := make(chan os.Signal, 1)
63+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
64+
65+
logCollector := NewLogCollector()
66+
67+
var supervOpts []local.ProcessSupervisorOption
68+
supervOpts = append(supervOpts, local.WithLowerPriorities(false))
69+
if credential != nil {
70+
supervOpts = append(supervOpts, local.WithProcessCredential(credential))
71+
}
72+
supv := local.NewProcessSupervisor(supervOpts...)
73+
74+
fileUtil := utils.NewFileUtil()
75+
76+
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT")
77+
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
78+
if err != nil {
79+
log.Fatalln(err)
80+
}
81+
82+
raptorApp, err := rie.Run(
83+
rapiAddr, supv, fileUtil, logCollector,
84+
)
85+
if err != nil {
86+
log.Errorf("failed with error: %s", err.Error())
87+
return
88+
}
89+
90+
initReq, err := rie.GetInitRequestMessage(fileUtil, os.Args)
91+
if err != nil {
92+
log.Errorf("could not build initialization parameters: %s", err.Error())
93+
return
94+
}
95+
96+
// HACK(gregfurman): expects the account to be set via the AWS_ACCOUNT_ID env var which is undocumented
97+
initReq.AccountID = lsOpts.AccountId
98+
initReq.FunctionARN = fmt.Sprintf("arn:aws:lambda:%s:%s:function:%s:%s", initReq.AwsRegion, initReq.AccountID, initReq.TaskName, initReq.FunctionVersion)
99+
// Convert seconds to time.Duration (invokeTimeoutSeconds is in seconds, need to convert to Duration)
100+
initReq.InvokeTimeout = model.DurationMS(time.Duration(invokeTimeoutSeconds) * time.Second)
101+
102+
runtimeAPIAddr := raptorApp.RuntimeAPIAddrPort()
103+
104+
rieHandler, err := NewInvokeHandler(
105+
*lsOpts, initReq, raptorApp, logCollector,
106+
)
107+
if err != nil {
108+
log.Fatal("creating RIE handler error:", err)
109+
}
110+
111+
if err := rieHandler.Init(); err != nil {
112+
log.Warn("INIT failed", "err", err)
113+
}
114+
115+
server, err := rie.StartServer(raptorApp, rieHandler, rieAddr, sigChan)
116+
if err != nil {
117+
log.Fatal("Proxy ListenAndServe error:", err)
118+
}
119+
120+
// go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)
121+
122+
log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddr.String())
123+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
124+
if err := waitForRuntimeAPI(ctx, runtimeAPIAddr.String()); err != nil {
125+
log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddr.String(), err.Error())
126+
}
127+
cancel()
128+
129+
log.Debugln("Completed initialization of runtime. Sending status ready to LocalStack.")
130+
if err := adapter.SendStatus(Ready, []byte{}); err != nil {
131+
log.Fatalln("Failed to send status ready to LocalStack", err, ". Exiting.")
132+
}
133+
134+
select {
135+
case <-server.Done():
136+
if err := server.Err(); err != nil {
137+
log.Warn("rie server stopped", "err", err)
138+
os.Exit(1)
139+
}
140+
case <-raptorApp.Done():
141+
if err := raptorApp.Err(); err != nil {
142+
log.Errorln("Runtime error:", err)
143+
}
144+
}
145+
146+
}

0 commit comments

Comments
 (0)