Skip to content

Commit b4254fe

Browse files
committed
Improve long running streamed response handling
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
1 parent f91e762 commit b4254fe

4 files changed

Lines changed: 74 additions & 36 deletions

File tree

httputil/write_interceptor.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@ import (
77
)
88

99
func NewHttpWriteInterceptor(w http.ResponseWriter) *HttpWriteInterceptor {
10-
return &HttpWriteInterceptor{w, 0}
10+
return &HttpWriteInterceptor{
11+
ResponseWriter: w,
12+
statusCode: 0,
13+
bytesWritten: 0,
14+
}
1115
}
1216

1317
type HttpWriteInterceptor struct {
1418
http.ResponseWriter
15-
statusCode int
19+
statusCode int
20+
bytesWritten int64
1621
}
1722

1823
func (c *HttpWriteInterceptor) Status() int {
@@ -22,6 +27,10 @@ func (c *HttpWriteInterceptor) Status() int {
2227
return c.statusCode
2328
}
2429

30+
func (c *HttpWriteInterceptor) BytesWritten() int64 {
31+
return c.bytesWritten
32+
}
33+
2534
func (c *HttpWriteInterceptor) Header() http.Header {
2635
return c.ResponseWriter.Header()
2736
}
@@ -30,6 +39,9 @@ func (c *HttpWriteInterceptor) Write(data []byte) (int, error) {
3039
if c.statusCode == 0 {
3140
c.WriteHeader(http.StatusOK)
3241
}
42+
43+
c.bytesWritten += int64(len(data))
44+
3345
return c.ResponseWriter.Write(data)
3446
}
3547

httputil/write_interceptor_test.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,42 @@ import (
66
"testing"
77
)
88

9-
func Test_StatusIsRecorded(t *testing.T) {
10-
wantCode := http.StatusAccepted
11-
gotCode := 0
9+
func Test_WriteCountsBytes(t *testing.T) {
10+
w := httptest.NewRecorder()
11+
wi := NewHttpWriteInterceptor(w)
1212

13-
next := func(w http.ResponseWriter, r *http.Request) {
14-
w.WriteHeader(wantCode)
13+
writeStr := "hello world"
14+
wi.Write([]byte(writeStr))
1515

16+
want := int64(len(writeStr))
17+
got := wi.BytesWritten()
18+
if got != want {
19+
t.Errorf("want bytes: %d, got %d", want, got)
1620
}
17-
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
18-
ww := NewHttpWriteInterceptor(w)
19-
next(ww, r)
20-
gotCode = ww.Status()
21-
}))
21+
}
2222

23-
defer func() {
24-
s.Close()
25-
}()
23+
func Test_WriteGetsStatusCode(t *testing.T) {
24+
w := httptest.NewRecorder()
25+
wi := NewHttpWriteInterceptor(w)
2626

27-
req, _ := http.NewRequest("GET", s.URL, nil)
28-
_, err := http.DefaultClient.Do(req)
27+
wi.WriteHeader(http.StatusTeapot)
2928

30-
if err != nil {
31-
t.Fatalf("Error doing request: %v", err)
29+
want := http.StatusTeapot
30+
got := wi.Status()
31+
if got != want {
32+
t.Errorf("want status code: %d, got %d", want, got)
3233
}
34+
}
35+
36+
func Test_WriteGetsStatusCode_WithoutWriteHeader(t *testing.T) {
37+
w := httptest.NewRecorder()
38+
wi := NewHttpWriteInterceptor(w)
39+
40+
wi.Write([]byte("hello world"))
3341

34-
if gotCode != wantCode {
35-
t.Errorf("got code %d, want %d", gotCode, wantCode)
42+
want := http.StatusOK
43+
got := wi.Status()
44+
if got != want {
45+
t.Errorf("want default status code: %d, got %d", want, got)
3646
}
3747
}

httputil/writer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package httputil
22

33
import (
44
"fmt"
5-
"io/ioutil"
5+
"io"
66
"log"
77
"net/http"
88
"net/http/httptest"
@@ -24,7 +24,7 @@ func TestFirstWriteSetsStatusCode(t *testing.T) {
2424
t.Fatalf("incorrect status code in the original response object: %d", w.Result().StatusCode)
2525
}
2626

27-
out, _ := ioutil.ReadAll(w.Result().Body)
27+
out, _ := io.ReadAll(w.Result().Body)
2828
if string(out) != `{"value": "ok"}` {
2929
t.Fatalf("incorrect response content: %q", out)
3030
}

proxy/proxy.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ import (
2626
"log"
2727
"net"
2828
"net/http"
29+
"net/http/httputil"
2930
"net/url"
3031
"time"
3132

3233
"github.com/gorilla/mux"
33-
"github.com/openfaas/faas-provider/httputil"
34+
fhttputil "github.com/openfaas/faas-provider/httputil"
3435
"github.com/openfaas/faas-provider/types"
3536
)
3637

@@ -68,6 +69,14 @@ func NewHandlerFunc(config types.FaaSConfig, resolver BaseURLResolver, verbose b
6869

6970
proxyClient := NewProxyClientFromConfig(config)
7071

72+
reverseProxy := httputil.ReverseProxy{}
73+
reverseProxy.Director = func(req *http.Request) {
74+
// At least an empty director is required to prevent runtime errors.
75+
}
76+
77+
// Errors are common during disconnect of client, no need to log them.
78+
reverseProxy.ErrorLog = log.New(io.Discard, "", 0)
79+
7180
return func(w http.ResponseWriter, r *http.Request) {
7281
if r.Body != nil {
7382
defer r.Body.Close()
@@ -81,7 +90,7 @@ func NewHandlerFunc(config types.FaaSConfig, resolver BaseURLResolver, verbose b
8190
http.MethodGet,
8291
http.MethodOptions,
8392
http.MethodHead:
84-
proxyRequest(w, r, proxyClient, resolver, verbose)
93+
proxyRequest(w, r, proxyClient, resolver, &reverseProxy, verbose)
8594

8695
default:
8796
w.WriteHeader(http.StatusMethodNotAllowed)
@@ -134,15 +143,15 @@ func NewProxyClient(timeout time.Duration, maxIdleConns int, maxIdleConnsPerHost
134143
}
135144

136145
// proxyRequest handles the actual resolution of and then request to the function service.
137-
func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient *http.Client, resolver BaseURLResolver, verbose bool) {
146+
func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient *http.Client, resolver BaseURLResolver, reverseProxy *httputil.ReverseProxy, verbose bool) {
138147
ctx := originalReq.Context()
139148

140149
pathVars := mux.Vars(originalReq)
141150
functionName := pathVars["name"]
142151
if functionName == "" {
143152
w.Header().Add(openFaaSInternalHeader, "proxy")
144153

145-
httputil.Errorf(w, http.StatusBadRequest, "Provide function name in the request path")
154+
fhttputil.Errorf(w, http.StatusBadRequest, "Provide function name in the request path")
146155
return
147156
}
148157

@@ -152,7 +161,7 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
152161

153162
// TODO: Should record the 404/not found error in Prometheus.
154163
log.Printf("resolver error: no endpoints for %s: %s\n", functionName, err.Error())
155-
httputil.Errorf(w, http.StatusServiceUnavailable, "No endpoints available for: %s.", functionName)
164+
fhttputil.Errorf(w, http.StatusServiceUnavailable, "No endpoints available for: %s.", functionName)
156165
return
157166
}
158167

@@ -161,35 +170,42 @@ func proxyRequest(w http.ResponseWriter, originalReq *http.Request, proxyClient
161170

162171
w.Header().Add(openFaaSInternalHeader, "proxy")
163172

164-
httputil.Errorf(w, http.StatusInternalServerError, "Failed to resolve service: %s.", functionName)
173+
fhttputil.Errorf(w, http.StatusInternalServerError, "Failed to resolve service: %s.", functionName)
165174
return
166175
}
167176

168177
if proxyReq.Body != nil {
169178
defer proxyReq.Body.Close()
170179
}
171180

172-
start := time.Now()
181+
if verbose {
182+
start := time.Now()
183+
defer func() {
184+
seconds := time.Since(start)
185+
log.Printf("%s took %f seconds\n", functionName, seconds.Seconds())
186+
}()
187+
}
188+
189+
if v := originalReq.Header.Get("Accept"); v == "text/event-stream" {
190+
reverseProxy.ServeHTTP(w, proxyReq)
191+
return
192+
}
193+
173194
response, err := proxyClient.Do(proxyReq.WithContext(ctx))
174-
seconds := time.Since(start)
175195

176196
if err != nil {
177197
log.Printf("error with proxy request to: %s, %s\n", proxyReq.URL.String(), err.Error())
178198

179199
w.Header().Add(openFaaSInternalHeader, "proxy")
180200

181-
httputil.Errorf(w, http.StatusInternalServerError, "Can't reach service for: %s.", functionName)
201+
fhttputil.Errorf(w, http.StatusInternalServerError, "Can't reach service for: %s.", functionName)
182202
return
183203
}
184204

185205
if response.Body != nil {
186206
defer response.Body.Close()
187207
}
188208

189-
if verbose {
190-
log.Printf("%s took %f seconds\n", functionName, seconds.Seconds())
191-
}
192-
193209
clientHeader := w.Header()
194210
copyHeaders(clientHeader, &response.Header)
195211
w.Header().Set("Content-Type", getContentType(originalReq.Header, response.Header))

0 commit comments

Comments
 (0)