From b2c5b1d5966cdf7af44573b240ccd1ad4d9106d7 Mon Sep 17 00:00:00 2001 From: Imre Halasz Date: Tue, 31 Mar 2026 15:28:50 +0200 Subject: [PATCH] gs: Cancel the context when downlink path expired --- pkg/gatewayserver/io/udp/udp.go | 2 + pkg/gatewayserver/io/udp/udp_test.go | 168 ++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 4 deletions(-) diff --git a/pkg/gatewayserver/io/udp/udp.go b/pkg/gatewayserver/io/udp/udp.go index 9523c108ed..9fcff7a3d7 100644 --- a/pkg/gatewayserver/io/udp/udp.go +++ b/pkg/gatewayserver/io/udp/udp.go @@ -450,6 +450,8 @@ var ( ) func (s *srv) handleDown(ctx context.Context, st *state) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() defer func() { st.lastDownlinkPath.Store(nil) st.startHandleDownMu.Lock() diff --git a/pkg/gatewayserver/io/udp/udp_test.go b/pkg/gatewayserver/io/udp/udp_test.go index c060f2eebb..47f30a2516 100644 --- a/pkg/gatewayserver/io/udp/udp_test.go +++ b/pkg/gatewayserver/io/udp/udp_test.go @@ -357,6 +357,164 @@ func TestScheduleLateCancel(t *testing.T) { // A deadline-exceeded error means nothing was written — the expected outcome. } +// TestScheduleLateDownlinkPathExpired verifies that a pending late-scheduled +// write goroutine is cancelled when handleDown exits due to the downlink path +// expiring (errDownlinkPathExpired), rather than firing after the path has gone. +// This is a regression test for the goroutine leak where func4 goroutines +// accumulated across handleDown reconnect cycles. +func TestScheduleLateDownlinkPathExpired(t *testing.T) { + t.Parallel() + + var ( + registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"} + timeout = (1 << 4) * test.Delay + testConfig = Config{ + PacketHandlers: 2, + PacketBuffer: 10, + DownlinkPathExpires: 8 * timeout, + ConnectionExpires: 20 * timeout, + ScheduleLateTime: 0, + } + ) + + a, ctx := test.New(t) + ctx, cancelCtx := context.WithCancel(ctx) + defer cancelCtx() + + is, _, closeIS := mockis.New(ctx) + defer closeIS() + + c := componenttest.NewComponent(t, &component.Config{ + ServiceBase: config.ServiceBase{ + FrequencyPlans: config.FrequencyPlansConfig{ + ConfigSource: "static", + Static: test.StaticFrequencyPlans, + }, + }, + }) + componenttest.StartComponent(t, c) + defer c.Close() + + gs := mock.NewServer(c, is) + addr, _ := net.ResolveUDPAddr("udp", ":0") + lis, err := net.ListenUDP("udp", addr) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + go Serve(ctx, gs, lis, testConfig) // nolint:errcheck + + connections := &sync.Map{} + eui := types.EUI64{0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06, 0x06} + + udpConn, err := net.Dial("udp", lis.LocalAddr().String()) + if !a.So(err, should.BeNil) { + t.FailNow() + } + defer udpConn.Close() + + // Establish a downlink path by sending PULL_DATA. + pullPacket := generatePullData(eui) + pullPacket.Token = [2]byte{0x00, 0x01} + pullBuf, err := pullPacket.MarshalBinary() + if !a.So(err, should.BeNil) { + t.FailNow() + } + _, err = udpConn.Write(pullBuf) + if !a.So(err, should.BeNil) { + t.FailNow() + } + expectAck(t, udpConn, true, encoding.PullAck, pullPacket.Token) + + conn := expectConnection(t, gs, connections, eui, true) + + // Sync the gateway clock by sending PUSH_DATA with a known concentrator timestamp. + syncConcentratorTime := 300 * test.Delay + pushPacket := generatePushData(eui, false, syncConcentratorTime) + pushPacket.Token = [2]byte{0x00, 0x02} + pushBuf, err := pushPacket.MarshalBinary() + if !a.So(err, should.BeNil) { + t.FailNow() + } + _, err = udpConn.Write(pushBuf) + if !a.So(err, should.BeNil) { + t.FailNow() + } + clockSynced := time.Now() + expectAck(t, udpConn, true, encoding.PushAck, pushPacket.Token) + time.Sleep(timeout) // ensure the clock sync is processed before scheduling + + // Schedule a Class A downlink whose concentrator time is set well beyond the + // downlink-path expiry window. This makes the late-schedule timer (d) larger + // than DownlinkPathExpires, so the timer will not fire on its own before the + // path expires and handleDown exits. + lateConcentratorTime := syncConcentratorTime + 2*testConfig.DownlinkPathExpires + path := &ttnpb.DownlinkPath{ + Path: &ttnpb.DownlinkPath_UplinkToken{ + UplinkToken: io.MustUplinkToken( + &ttnpb.GatewayAntennaIdentifiers{GatewayIds: ®isteredGatewayID}, + uint32(lateConcentratorTime/time.Microsecond), // nolint:gosec + scheduling.ConcentratorTime(lateConcentratorTime), + time.Unix(0, int64(lateConcentratorTime)), + nil, + ), + }, + } + msg := &ttnpb.DownlinkMessage{ + RawPayload: []byte{0x01}, + Settings: &ttnpb.DownlinkMessage_Request{ + Request: &ttnpb.TxRequest{ + Class: ttnpb.Class_CLASS_A, + Priority: ttnpb.TxSchedulePriority_NORMAL, + Rx1Delay: ttnpb.RxDelay_RX_DELAY_1, + Rx1DataRate: &ttnpb.DataRate{ + Modulation: &ttnpb.DataRate_Lora{ + Lora: &ttnpb.LoRaDataRate{ + SpreadingFactor: 7, + Bandwidth: 125000, + CodingRate: band.Cr4_5, + }, + }, + }, + Rx1Frequency: 868100000, + FrequencyPlanId: test.EUFrequencyPlanID, + }, + }, + } + _, _, _, err = conn.ScheduleDown(path, msg) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + // Compute when the timer goroutine would fire if not cancelled. + // serverTime(T) = clockSynced + (T - syncConcentratorTime); with ScheduleLateTime=0, + // d = time.Until(serverTime(scheduledTimestamp)). + scheduledTimestamp := time.Duration(msg.GetScheduled().Timestamp) * time.Microsecond + expectedFireTime := clockSynced.Add(-syncConcentratorTime).Add(scheduledTimestamp) + + // Give handleDown time to dequeue the message and start the timer goroutine. + // Then stop sending PULL_DATA so the downlink path expires. handleDown detects + // the expired path via its health-check ticker and returns errDownlinkPathExpired, + // which triggers defer cancel() and stops the timer goroutine before it fires. + time.Sleep(timeout) + time.Sleep(2 * testConfig.DownlinkPathExpires) + + // Read from the UDP connection until expectedFireTime + margin. A broken + // implementation (missing defer cancel in handleDown) would deliver a PULL_RESP + // near expectedFireTime. With the fix the goroutine exits on path expiry and + // nothing is written. + var buf [65507]byte + udpConn.SetReadDeadline(expectedFireTime.Add(2 * timeout)) // nolint:errcheck,gosec + n, readErr := udpConn.Read(buf[:]) + if readErr == nil { + var pkt encoding.Packet + if unmarshalErr := pkt.UnmarshalBinary(buf[:n]); unmarshalErr == nil { + a.So(pkt.PacketType, should.NotEqual, encoding.PullResp) + } + } + // A deadline-exceeded error means nothing was written — the expected outcome. +} + func TestFrontend(t *testing.T) { t.Parallel() iotest.Frontend(t, iotest.FrontendConfig{ @@ -533,10 +691,12 @@ func TestRawData(t *testing.T) { registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"} timeout = (1 << 4) * test.Delay testConfig = Config{ - PacketHandlers: 2, - PacketBuffer: 10, - DownlinkPathExpires: 8 * timeout, - ConnectionExpires: 20 * timeout, + PacketHandlers: 2, + PacketBuffer: 10, + // DownlinkPathExpires must exceed the ~300*test.Delay late-schedule timer + // used by the TxScheduledLate test cases. 32*timeout gives enough margin. + DownlinkPathExpires: 32 * timeout, + ConnectionExpires: 64 * timeout, ScheduleLateTime: 0, } )