Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/gatewayserver/io/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ var (
)

func (s *srv) handleDown(ctx context.Context, st *state) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
st.lastDownlinkPath.Store(nil)
st.startHandleDownMu.Lock()
Expand Down
168 changes: 164 additions & 4 deletions pkg/gatewayserver/io/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,164 @@ func TestScheduleLateCancel(t *testing.T) {
// A deadline-exceeded error means nothing was written — the expected outcome.
}

// TestScheduleLateDownlinkPathExpired verifies that a pending late-scheduled
// write goroutine is cancelled when handleDown exits due to the downlink path
// expiring (errDownlinkPathExpired), rather than firing after the path has gone.
// This is a regression test for the goroutine leak where func4 goroutines
// accumulated across handleDown reconnect cycles.
func TestScheduleLateDownlinkPathExpired(t *testing.T) {
t.Parallel()

var (
registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"}
timeout = (1 << 4) * test.Delay
testConfig = Config{
PacketHandlers: 2,
PacketBuffer: 10,
DownlinkPathExpires: 8 * timeout,
ConnectionExpires: 20 * timeout,
ScheduleLateTime: 0,
}
)

a, ctx := test.New(t)
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()

is, _, closeIS := mockis.New(ctx)
defer closeIS()

c := componenttest.NewComponent(t, &component.Config{
ServiceBase: config.ServiceBase{
FrequencyPlans: config.FrequencyPlansConfig{
ConfigSource: "static",
Static: test.StaticFrequencyPlans,
},
},
})
componenttest.StartComponent(t, c)
defer c.Close()

gs := mock.NewServer(c, is)
addr, _ := net.ResolveUDPAddr("udp", ":0")
lis, err := net.ListenUDP("udp", addr)
if !a.So(err, should.BeNil) {
t.FailNow()
}

go Serve(ctx, gs, lis, testConfig) // nolint:errcheck

connections := &sync.Map{}
eui := types.EUI64{0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06}

udpConn, err := net.Dial("udp", lis.LocalAddr().String())
if !a.So(err, should.BeNil) {
t.FailNow()
}
defer udpConn.Close()

// Establish a downlink path by sending PULL_DATA.
pullPacket := generatePullData(eui)
pullPacket.Token = [2]byte{0x00, 0x01}
pullBuf, err := pullPacket.MarshalBinary()
if !a.So(err, should.BeNil) {
t.FailNow()
}
_, err = udpConn.Write(pullBuf)
if !a.So(err, should.BeNil) {
t.FailNow()
}
expectAck(t, udpConn, true, encoding.PullAck, pullPacket.Token)

conn := expectConnection(t, gs, connections, eui, true)

// Sync the gateway clock by sending PUSH_DATA with a known concentrator timestamp.
syncConcentratorTime := 300 * test.Delay
pushPacket := generatePushData(eui, false, syncConcentratorTime)
pushPacket.Token = [2]byte{0x00, 0x02}
pushBuf, err := pushPacket.MarshalBinary()
if !a.So(err, should.BeNil) {
t.FailNow()
}
_, err = udpConn.Write(pushBuf)
if !a.So(err, should.BeNil) {
t.FailNow()
}
clockSynced := time.Now()
expectAck(t, udpConn, true, encoding.PushAck, pushPacket.Token)
time.Sleep(timeout) // ensure the clock sync is processed before scheduling

// Schedule a Class A downlink whose concentrator time is set well beyond the
// downlink-path expiry window. This makes the late-schedule timer (d) larger
// than DownlinkPathExpires, so the timer will not fire on its own before the
// path expires and handleDown exits.
lateConcentratorTime := syncConcentratorTime + 2*testConfig.DownlinkPathExpires
path := &ttnpb.DownlinkPath{
Path: &ttnpb.DownlinkPath_UplinkToken{
UplinkToken: io.MustUplinkToken(
&ttnpb.GatewayAntennaIdentifiers{GatewayIds: &registeredGatewayID},
uint32(lateConcentratorTime/time.Microsecond), // nolint:gosec
scheduling.ConcentratorTime(lateConcentratorTime),
time.Unix(0, int64(lateConcentratorTime)),
nil,
),
},
}
msg := &ttnpb.DownlinkMessage{
RawPayload: []byte{0x01},
Settings: &ttnpb.DownlinkMessage_Request{
Request: &ttnpb.TxRequest{
Class: ttnpb.Class_CLASS_A,
Priority: ttnpb.TxSchedulePriority_NORMAL,
Rx1Delay: ttnpb.RxDelay_RX_DELAY_1,
Rx1DataRate: &ttnpb.DataRate{
Modulation: &ttnpb.DataRate_Lora{
Lora: &ttnpb.LoRaDataRate{
SpreadingFactor: 7,
Bandwidth: 125000,
CodingRate: band.Cr4_5,
},
},
},
Rx1Frequency: 868100000,
FrequencyPlanId: test.EUFrequencyPlanID,
},
},
}
_, _, _, err = conn.ScheduleDown(path, msg)
if !a.So(err, should.BeNil) {
t.FailNow()
}

// Compute when the timer goroutine would fire if not cancelled.
// serverTime(T) = clockSynced + (T - syncConcentratorTime); with ScheduleLateTime=0,
// d = time.Until(serverTime(scheduledTimestamp)).
scheduledTimestamp := time.Duration(msg.GetScheduled().Timestamp) * time.Microsecond
expectedFireTime := clockSynced.Add(-syncConcentratorTime).Add(scheduledTimestamp)

// Give handleDown time to dequeue the message and start the timer goroutine.
// Then stop sending PULL_DATA so the downlink path expires. handleDown detects
// the expired path via its health-check ticker and returns errDownlinkPathExpired,
// which triggers defer cancel() and stops the timer goroutine before it fires.
time.Sleep(timeout)
time.Sleep(2 * testConfig.DownlinkPathExpires)

// Read from the UDP connection until expectedFireTime + margin. A broken
// implementation (missing defer cancel in handleDown) would deliver a PULL_RESP
// near expectedFireTime. With the fix the goroutine exits on path expiry and
// nothing is written.
var buf [65507]byte
udpConn.SetReadDeadline(expectedFireTime.Add(2 * timeout)) // nolint:errcheck,gosec
n, readErr := udpConn.Read(buf[:])
if readErr == nil {
var pkt encoding.Packet
if unmarshalErr := pkt.UnmarshalBinary(buf[:n]); unmarshalErr == nil {
a.So(pkt.PacketType, should.NotEqual, encoding.PullResp)
}
}
// A deadline-exceeded error means nothing was written — the expected outcome.
}

func TestFrontend(t *testing.T) {
t.Parallel()
iotest.Frontend(t, iotest.FrontendConfig{
Expand Down Expand Up @@ -533,10 +691,12 @@ func TestRawData(t *testing.T) {
registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"}
timeout = (1 << 4) * test.Delay
testConfig = Config{
PacketHandlers: 2,
PacketBuffer: 10,
DownlinkPathExpires: 8 * timeout,
ConnectionExpires: 20 * timeout,
PacketHandlers: 2,
PacketBuffer: 10,
// DownlinkPathExpires must exceed the ~300*test.Delay late-schedule timer
// used by the TxScheduledLate test cases. 32*timeout gives enough margin.
DownlinkPathExpires: 32 * timeout,
ConnectionExpires: 64 * timeout,
ScheduleLateTime: 0,
}
)
Expand Down
Loading