Skip to content

Commit 8324f4f

Browse files
authored
xds: fix VHDS empty subscription behavior to prevent wildcarding subscription (#34)
**Description** This PR fixes an issue where the control plane incorrectly treated an empty resource_names_subscribe list in a VHDS DeltaDiscoveryRequest as a wildcard subscription. According to the [Virtual Host Discovery Service (VHDS)](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/vhds) documentation, VHDS is designed for on-demand loading. Interpreting an empty subscription list as a wildcard is counter-productive to the "lazy-loading" goal of VHDS, as it could cause the control plane to push all virtual hosts to the proxy unexpectedly. **Changes** 1. Added logic to the xDS server to detect if the resource type is VHDS. 2. If the resource type is VHDS and the resource_names_subscribe list is empty, the server will now treat the subscription as empty rather than fetching all resources. 3. Preserved existing wildcard behavior for Listener, Cluster, Route, and Endpoint types. **Risk Level** Low. This specifically targets VHDS resource types and aligns with the expected xDS protocol behavior for on-demand resources. **Testing Done** I performed manual validation via the Observer to ensure the subscription is ignored when the resource is VHDS with either a wildcard or an empty subscription.
1 parent 0f097ed commit 8324f4f

2 files changed

Lines changed: 346 additions & 1 deletion

File tree

internal/server/subscription_manager.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"slices"
66
"sync"
77

8+
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
89
"github.com/linkedin/diderot/ads"
910
"github.com/linkedin/diderot/internal/utils"
1011
"google.golang.org/protobuf/proto"
@@ -131,7 +132,8 @@ func (m *deltaSubscriptionManager) ProcessSubscriptions(req *ads.DeltaDiscoveryR
131132

132133
if !m.firstCallReceived {
133134
m.firstCallReceived = true
134-
if len(subscribe) == 0 {
135+
// Skip wildcard subscription for VHDS resource type
136+
if len(subscribe) == 0 && !isVHDSType(m.typeURL) {
135137
subscribe = []string{ads.WildcardSubscription}
136138
}
137139
}
@@ -217,6 +219,11 @@ func (c *subscriptionManagerCore) unsubscribe(name string) {
217219
}
218220
}
219221

222+
// isVHDSType checks if the given typeURL is for VHDS (Virtual Host Discovery Service).
223+
func isVHDSType(typeURL string) bool {
224+
return typeURL == resource.VirtualHostType
225+
}
226+
220227
// cleanSubscriptionsAndEstimateSize clones the given slice and removes duplicate elements by sorting
221228
// it. This ensures that the server does not process the same subscription twice for the same
222229
// request. It then estimates the size of send buffer to pass to
Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
8+
"github.com/linkedin/diderot/ads"
9+
"github.com/linkedin/diderot/testutils"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/protobuf/proto"
12+
)
13+
14+
// mockResourceLocator simulates the ResourceLocator interface for testing.
15+
type mockResourceLocator func(typeURL, resourceName string, handler ads.RawSubscriptionHandler) func()
16+
17+
func (m mockResourceLocator) Subscribe(
18+
_ context.Context,
19+
typeURL, resourceName string,
20+
handler ads.RawSubscriptionHandler,
21+
) (unsubscribe func()) {
22+
return m(typeURL, resourceName, handler)
23+
}
24+
25+
// noopBatchSubscriptionHandler is a simple implementation of BatchSubscriptionHandler for testing.
26+
type noopBatchSubscriptionHandler struct{}
27+
28+
func (n *noopBatchSubscriptionHandler) StartNotificationBatch(_ map[string]string, _ int) {}
29+
func (n *noopBatchSubscriptionHandler) EndNotificationBatch() {}
30+
func (n *noopBatchSubscriptionHandler) Notify(_ string, _ *ads.RawResource, _ ads.SubscriptionMetadata) {
31+
}
32+
func (n *noopBatchSubscriptionHandler) ResourceMarshalError(_ string, _ proto.Message, _ error) {}
33+
34+
// TestDeltaSubscriptionManager_VHDSSkipsWildcard tests that VHDS type URLs skip the implicit
35+
// wildcard subscription on empty first request, while other types do not.
36+
func TestDeltaSubscriptionManager_VHDSSkipsWildcard(t *testing.T) {
37+
const foo = "foo"
38+
ctx := testutils.Context(t)
39+
handler := &noopBatchSubscriptionHandler{}
40+
41+
testCases := []struct {
42+
name string
43+
typeURL string
44+
shouldCreateWildcardOnEmpty bool
45+
shouldCreateWildcardWithNames bool
46+
}{
47+
{
48+
name: "VHDS type skips wildcard on empty first request",
49+
typeURL: resource.VirtualHostType,
50+
shouldCreateWildcardOnEmpty: false,
51+
shouldCreateWildcardWithNames: false,
52+
},
53+
{
54+
name: "Cluster type creates wildcard on empty first request",
55+
typeURL: resource.ClusterType,
56+
shouldCreateWildcardOnEmpty: true,
57+
shouldCreateWildcardWithNames: false,
58+
},
59+
{
60+
name: "Endpoint type creates wildcard on empty first request",
61+
typeURL: resource.EndpointType,
62+
shouldCreateWildcardOnEmpty: true,
63+
shouldCreateWildcardWithNames: false,
64+
},
65+
{
66+
name: "Listener type creates wildcard on empty first request",
67+
typeURL: resource.ListenerType,
68+
shouldCreateWildcardOnEmpty: true,
69+
shouldCreateWildcardWithNames: false,
70+
},
71+
{
72+
name: "Route type creates wildcard on empty first request",
73+
typeURL: resource.RouteType,
74+
shouldCreateWildcardOnEmpty: true,
75+
shouldCreateWildcardWithNames: false,
76+
},
77+
}
78+
79+
for _, tc := range testCases {
80+
t.Run(tc.name, func(t *testing.T) {
81+
t.Run("empty first request", func(t *testing.T) {
82+
subscribedResources := make(map[string]bool)
83+
locator := mockResourceLocator(func(actualTypeURL, resourceName string, _ ads.RawSubscriptionHandler) func() {
84+
require.Equal(t, tc.typeURL, actualTypeURL, "typeURL mismatch")
85+
subscribedResources[resourceName] = true
86+
return func() {
87+
delete(subscribedResources, resourceName)
88+
}
89+
})
90+
91+
manager := NewDeltaSubscriptionManager(ctx, locator, tc.typeURL, handler, nil)
92+
93+
// First request with no resource names
94+
req := &ads.DeltaDiscoveryRequest{
95+
ResourceNamesSubscribe: nil,
96+
ResourceNamesUnsubscribe: nil,
97+
InitialResourceVersions: nil,
98+
}
99+
manager.ProcessSubscriptions(req)
100+
101+
if tc.shouldCreateWildcardOnEmpty {
102+
require.True(t, subscribedResources[ads.WildcardSubscription],
103+
"Expected wildcard subscription for %s", tc.typeURL)
104+
} else {
105+
require.False(t, subscribedResources[ads.WildcardSubscription],
106+
"Did not expect wildcard subscription for %s", tc.typeURL)
107+
}
108+
})
109+
110+
t.Run("non-empty first request", func(t *testing.T) {
111+
subscribedResources := make(map[string]bool)
112+
locator := mockResourceLocator(func(actualTypeURL, resourceName string, _ ads.RawSubscriptionHandler) func() {
113+
require.Equal(t, tc.typeURL, actualTypeURL, "typeURL mismatch")
114+
subscribedResources[resourceName] = true
115+
return func() {
116+
delete(subscribedResources, resourceName)
117+
}
118+
})
119+
120+
manager := NewDeltaSubscriptionManager(ctx, locator, tc.typeURL, handler, nil)
121+
122+
// First request with explicit resource names
123+
req := &ads.DeltaDiscoveryRequest{
124+
ResourceNamesSubscribe: []string{foo},
125+
ResourceNamesUnsubscribe: nil,
126+
InitialResourceVersions: nil,
127+
}
128+
manager.ProcessSubscriptions(req)
129+
130+
if tc.shouldCreateWildcardWithNames {
131+
require.True(t, subscribedResources[ads.WildcardSubscription],
132+
"Expected wildcard subscription for %s with explicit names", tc.typeURL)
133+
} else {
134+
require.False(t, subscribedResources[ads.WildcardSubscription],
135+
"Did not expect wildcard subscription for %s with explicit names", tc.typeURL)
136+
}
137+
require.True(t, subscribedResources[foo],
138+
"Expected subscription to %s for %s", foo, tc.typeURL)
139+
})
140+
141+
t.Run("subsequent empty requests", func(t *testing.T) {
142+
subscribedResources := make(map[string]bool)
143+
subscriptionCount := make(map[string]int)
144+
locator := mockResourceLocator(func(actualTypeURL, resourceName string, _ ads.RawSubscriptionHandler) func() {
145+
require.Equal(t, tc.typeURL, actualTypeURL, "typeURL mismatch")
146+
subscribedResources[resourceName] = true
147+
subscriptionCount[resourceName]++
148+
return func() {
149+
delete(subscribedResources, resourceName)
150+
}
151+
})
152+
153+
manager := NewDeltaSubscriptionManager(ctx, locator, tc.typeURL, handler, nil)
154+
155+
// First empty request
156+
req := &ads.DeltaDiscoveryRequest{
157+
ResourceNamesSubscribe: nil,
158+
ResourceNamesUnsubscribe: nil,
159+
InitialResourceVersions: nil,
160+
}
161+
manager.ProcessSubscriptions(req)
162+
163+
initialWildcardCount := subscriptionCount[ads.WildcardSubscription]
164+
165+
// Second empty request - should not change subscription state
166+
manager.ProcessSubscriptions(req)
167+
168+
require.Equal(t, initialWildcardCount, subscriptionCount[ads.WildcardSubscription],
169+
"Wildcard subscription count should not change on subsequent empty requests")
170+
})
171+
})
172+
}
173+
}
174+
175+
// TestDeltaSubscriptionManager_VHDSExplicitWildcard tests that VHDS can still explicitly
176+
// subscribe to wildcard if the client requests it.
177+
func TestDeltaSubscriptionManager_VHDSExplicitWildcard(t *testing.T) {
178+
ctx := testutils.Context(t)
179+
handler := &noopBatchSubscriptionHandler{}
180+
181+
subscribedResources := make(map[string]bool)
182+
locator := mockResourceLocator(func(actualTypeURL, resourceName string, _ ads.RawSubscriptionHandler) func() {
183+
require.Equal(t, resource.VirtualHostType, actualTypeURL, "typeURL mismatch")
184+
subscribedResources[resourceName] = true
185+
return func() {
186+
delete(subscribedResources, resourceName)
187+
}
188+
})
189+
190+
manager := NewDeltaSubscriptionManager(ctx, locator, resource.VirtualHostType, handler, nil)
191+
192+
// Explicitly subscribe to wildcard
193+
req := &ads.DeltaDiscoveryRequest{
194+
ResourceNamesSubscribe: []string{ads.WildcardSubscription},
195+
ResourceNamesUnsubscribe: nil,
196+
InitialResourceVersions: nil,
197+
}
198+
manager.ProcessSubscriptions(req)
199+
200+
require.True(t, subscribedResources[ads.WildcardSubscription],
201+
"Expected explicit wildcard subscription for VHDS")
202+
}
203+
204+
// TestIsVHDSType tests the helper function that identifies VHDS type URLs.
205+
func TestIsVHDSType(t *testing.T) {
206+
testCases := []struct {
207+
name string
208+
typeURL string
209+
expected bool
210+
}{
211+
{
212+
name: "VirtualHost type is VHDS",
213+
typeURL: resource.VirtualHostType,
214+
expected: true,
215+
},
216+
{
217+
name: "Cluster type is not VHDS",
218+
typeURL: resource.ClusterType,
219+
expected: false,
220+
},
221+
{
222+
name: "Endpoint type is not VHDS",
223+
typeURL: resource.EndpointType,
224+
expected: false,
225+
},
226+
{
227+
name: "Listener type is not VHDS",
228+
typeURL: resource.ListenerType,
229+
expected: false,
230+
},
231+
{
232+
name: "Route type is not VHDS",
233+
typeURL: resource.RouteType,
234+
expected: false,
235+
},
236+
{
237+
name: "Secret type is not VHDS",
238+
typeURL: resource.SecretType,
239+
expected: false,
240+
},
241+
{
242+
name: "Runtime type is not VHDS",
243+
typeURL: resource.RuntimeType,
244+
expected: false,
245+
},
246+
{
247+
name: "Empty string is not VHDS",
248+
typeURL: "",
249+
expected: false,
250+
},
251+
{
252+
name: "Random string is not VHDS",
253+
typeURL: "type.googleapis.com/random.Type",
254+
expected: false,
255+
},
256+
}
257+
258+
for _, tc := range testCases {
259+
t.Run(tc.name, func(t *testing.T) {
260+
result := isVHDSType(tc.typeURL)
261+
require.Equal(t, tc.expected, result,
262+
"isVHDSType(%q) = %v, expected %v", tc.typeURL, result, tc.expected)
263+
})
264+
}
265+
}
266+
267+
// TestDeltaSubscriptionManager_FirstCallReceivedFlag tests that the firstCallReceived flag
268+
// is properly set and only the first call triggers the implicit wildcard logic.
269+
func TestDeltaSubscriptionManager_FirstCallReceivedFlag(t *testing.T) {
270+
ctx := testutils.Context(t)
271+
handler := &noopBatchSubscriptionHandler{}
272+
const foo = "foo"
273+
const bar = "bar"
274+
275+
subscribedResources := make(map[string]bool)
276+
subscriptionCount := make(map[string]int)
277+
locator := mockResourceLocator(func(typeURL, resourceName string, _ ads.RawSubscriptionHandler) func() {
278+
subscribedResources[resourceName] = true
279+
subscriptionCount[resourceName]++
280+
return func() {
281+
delete(subscribedResources, resourceName)
282+
}
283+
})
284+
285+
manager := NewDeltaSubscriptionManager(ctx, locator, resource.ClusterType, handler, nil)
286+
287+
// First call: empty, should create implicit wildcard
288+
req1 := &ads.DeltaDiscoveryRequest{
289+
ResourceNamesSubscribe: nil,
290+
ResourceNamesUnsubscribe: nil,
291+
InitialResourceVersions: nil,
292+
}
293+
manager.ProcessSubscriptions(req1)
294+
require.True(t, subscribedResources[ads.WildcardSubscription],
295+
"First empty call should create wildcard")
296+
require.Equal(t, 1, subscriptionCount[ads.WildcardSubscription],
297+
"Wildcard should be subscribed once")
298+
299+
// Second call: empty, should NOT recreate wildcard
300+
req2 := &ads.DeltaDiscoveryRequest{
301+
ResourceNamesSubscribe: nil,
302+
ResourceNamesUnsubscribe: nil,
303+
InitialResourceVersions: nil,
304+
}
305+
manager.ProcessSubscriptions(req2)
306+
require.True(t, subscribedResources[ads.WildcardSubscription],
307+
"Wildcard should still be subscribed")
308+
require.Equal(t, 1, subscriptionCount[ads.WildcardSubscription],
309+
"Wildcard subscription count should not increase")
310+
311+
// Third call: subscribe to foo, wildcard should remain (not implicit anymore)
312+
req3 := &ads.DeltaDiscoveryRequest{
313+
ResourceNamesSubscribe: []string{foo},
314+
ResourceNamesUnsubscribe: nil,
315+
InitialResourceVersions: nil,
316+
}
317+
manager.ProcessSubscriptions(req3)
318+
require.True(t, subscribedResources[ads.WildcardSubscription],
319+
"Wildcard should remain after subscribing to foo")
320+
require.True(t, subscribedResources[foo],
321+
"foo should be subscribed")
322+
require.Equal(t, 1, subscriptionCount[ads.WildcardSubscription],
323+
"Wildcard subscription count should remain 1")
324+
325+
// Fourth call: explicit unsubscribe from wildcard
326+
req4 := &ads.DeltaDiscoveryRequest{
327+
ResourceNamesSubscribe: []string{bar},
328+
ResourceNamesUnsubscribe: []string{ads.WildcardSubscription},
329+
InitialResourceVersions: nil,
330+
}
331+
manager.ProcessSubscriptions(req4)
332+
require.False(t, subscribedResources[ads.WildcardSubscription],
333+
"Wildcard should be unsubscribed")
334+
require.True(t, subscribedResources[foo],
335+
"foo should still be subscribed")
336+
require.True(t, subscribedResources[bar],
337+
"bar should be subscribed")
338+
}

0 commit comments

Comments
 (0)