From 97c4ca5f94240fe3c688170d71d599339fb878c0 Mon Sep 17 00:00:00 2001 From: Imre Halasz Date: Fri, 27 Mar 2026 15:41:06 +0100 Subject: [PATCH 1/2] gs: Do not schedule too far ahead --- config/messages.json | 9 +++ pkg/gatewayserver/scheduling/scheduler.go | 2 +- .../scheduling/scheduler_internal_test.go | 9 +-- pkg/gatewayserver/scheduling/sub_band.go | 24 +++++++- pkg/gatewayserver/scheduling/sub_band_test.go | 59 ++++++++++++++++++- pkg/gatewayserver/scheduling/util_test.go | 1 + pkg/webui/locales/ja.json | 1 + 7 files changed, 95 insertions(+), 10 deletions(-) diff --git a/config/messages.json b/config/messages.json index 8baff606a9..891c527d7f 100644 --- a/config/messages.json +++ b/config/messages.json @@ -5687,6 +5687,15 @@ "file": "scheduler.go" } }, + "error:pkg/gatewayserver/scheduling:schedule_too_far_ahead": { + "translations": { + "en": "schedule time is too far in the future" + }, + "description": { + "package": "pkg/gatewayserver/scheduling", + "file": "sub_band.go" + } + }, "error:pkg/gatewayserver/scheduling:sub_band_not_found": { "translations": { "en": "sub-band not found for frequency `{frequency}` Hz" diff --git a/pkg/gatewayserver/scheduling/scheduler.go b/pkg/gatewayserver/scheduling/scheduler.go index 5a73a1206b..b21d37909e 100644 --- a/pkg/gatewayserver/scheduling/scheduler.go +++ b/pkg/gatewayserver/scheduling/scheduler.go @@ -459,7 +459,7 @@ func (s *Scheduler) ScheduleAnytime(ctx context.Context, opts Options) (res Emis } return em.t } - em, err = sb.ScheduleAnytime(em.d, next, opts.Priority) + em, err = sb.ScheduleAnytime(em.d, next, opts.Priority, now) if err != nil { return Emission{}, 0, err } diff --git a/pkg/gatewayserver/scheduling/scheduler_internal_test.go b/pkg/gatewayserver/scheduling/scheduler_internal_test.go index fcf67cbfa0..a24d8da668 100644 --- a/pkg/gatewayserver/scheduling/scheduler_internal_test.go +++ b/pkg/gatewayserver/scheduling/scheduler_internal_test.go @@ -15,8 +15,9 @@ package scheduling var ( - ErrConflict = errConflict - ErrDwellTime = errDwellTime - ErrTooLate = errTooLate - ErrDutyCycle = errDutyCycle + ErrConflict = errConflict + ErrDwellTime = errDwellTime + ErrTooLate = errTooLate + ErrDutyCycle = errDutyCycle + ErrScheduleTooFarAhead = errScheduleTooFarAhead ) diff --git a/pkg/gatewayserver/scheduling/sub_band.go b/pkg/gatewayserver/scheduling/sub_band.go index 90bba22daf..0851373814 100644 --- a/pkg/gatewayserver/scheduling/sub_band.go +++ b/pkg/gatewayserver/scheduling/sub_band.go @@ -27,6 +27,11 @@ import ( // A lower value results in balancing capacity in time, while a higher value allows for bursts. var DutyCycleWindow = 1 * time.Hour +// MaxScheduleAhead is the maximum duration into the future that ScheduleAnytime may place an emission. +// Emissions beyond this are rejected to avoid scheduling downlinks that would never actually be transmitted +// in practice. +var MaxScheduleAhead = 2 * DutyCycleWindow + // DutyCycleStyle represents the of duty cycle algorithm to be used by a sub band. type DutyCycleStyle int @@ -144,6 +149,10 @@ var ( "blocked", "sub band is blocked for `{duration}`", ) + errScheduleTooFarAhead = errors.DefineResourceExhausted( + "schedule_too_far_ahead", + "schedule time is too far in the future", + ) ) // checkDutyCycle verifies if the emission complies with the duty cycle limitations, based on the style. @@ -211,7 +220,12 @@ func (sb *SubBand) Schedule(em Emission, p ttnpb.TxSchedulePriority) error { // ScheduleAnytime schedules the given duration at a time when there is availability by accounting for duty-cycle. // The given next callback should return the next option that does not conflict with other scheduled downlinks. // If there is no duty-cycle limitation, this method returns the first option. -func (sb *SubBand) ScheduleAnytime(d time.Duration, next func() ConcentratorTime, p ttnpb.TxSchedulePriority) (Emission, error) { +func (sb *SubBand) ScheduleAnytime( + d time.Duration, + next func() ConcentratorTime, + p ttnpb.TxSchedulePriority, + now ConcentratorTime, +) (Emission, error) { sb.mu.Lock() defer sb.mu.Unlock() em := NewEmission(next(), d) @@ -241,7 +255,13 @@ func (sb *SubBand) ScheduleAnytime(d time.Duration, next func() ConcentratorTime other := sb.emissions[i] used += float32(other.d) / float32(DutyCycleWindow) if used > usable { - em.t = other.Ends() + ConcentratorTime(DutyCycleWindow) - ConcentratorTime(em.d) + newT := other.Ends() + ConcentratorTime(DutyCycleWindow) - ConcentratorTime(em.d) + // If the new time is too far in the future, return an error instead of scheduling an emission that would + // never be transmitted in practice. + if newT > now+ConcentratorTime(MaxScheduleAhead) { + return Emission{}, errScheduleTooFarAhead.New() + } + em.t = newT break } } diff --git a/pkg/gatewayserver/scheduling/sub_band_test.go b/pkg/gatewayserver/scheduling/sub_band_test.go index d4861b14f2..378fa54bb5 100644 --- a/pkg/gatewayserver/scheduling/sub_band_test.go +++ b/pkg/gatewayserver/scheduling/sub_band_test.go @@ -172,6 +172,8 @@ func TestSubBandScheduleRestricted(t *testing.T) { } func TestScheduleAnytimeRestricted(t *testing.T) { + t.Parallel() + a := assertions.New(t) params := scheduling.SubBandParameters{ MinFrequency: 0, @@ -204,7 +206,8 @@ func TestScheduleAnytimeRestricted(t *testing.T) { from += scheduling.ConcentratorTime(time.Second) return res } - em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL) + now := scheduling.ConcentratorTime(time.Now().UnixNano()) + em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now) a.So(err, should.BeNil) a.So(em.Starts(), should.Equal, 16*time.Second) // [ 1 2 4 3 ] @@ -217,7 +220,8 @@ func TestScheduleAnytimeRestricted(t *testing.T) { next := func() scheduling.ConcentratorTime { return scheduling.ConcentratorTime(19 * time.Second) } - em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL) + now := scheduling.ConcentratorTime(time.Now().UnixNano()) + em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now) a.So(err, should.BeNil) a.So(em.Starts(), should.Equal, 26*time.Second) // [ 1 2 4 3 5] @@ -229,11 +233,60 @@ func TestScheduleAnytimeRestricted(t *testing.T) { next := func() scheduling.ConcentratorTime { return scheduling.ConcentratorTime(19 * time.Second) } - _, err := sb.ScheduleAnytime(5*time.Second, next, ttnpb.TxSchedulePriority_NORMAL) + now := scheduling.ConcentratorTime(time.Now().UnixNano()) + _, err := sb.ScheduleAnytime(5*time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now) a.So(err, should.HaveSameErrorDefinitionAs, scheduling.ErrDutyCycle) } } +func TestScheduleAnytimeTooFarAhead(t *testing.T) { + t.Parallel() + // In the test environment DutyCycleWindow = 10s and MaxScheduleAhead = 20s. + a := assertions.New(t) + params := scheduling.SubBandParameters{ + MinFrequency: 0, + MaxFrequency: math.MaxUint64, + DutyCycle: 0.5, + } + clock := &mockClock{} + ceilings := map[ttnpb.TxSchedulePriority]float32{ + ttnpb.TxSchedulePriority_NORMAL: 0.5, // usable = 0.25 + ttnpb.TxSchedulePriority_HIGHEST: 1.0, // usable = 0.50 + } + sb := scheduling.NewSubBand(params, clock, ceilings, scheduling.DefaultDutyCycleStyle) + + // Schedule a 3-second emission at t=9s using HIGHEST priority. + // It occupies 30% of the duty-cycle window (10s), within HIGHEST (50%) but above + // NORMAL (25%). Its window [9s, 12s] falls within the checkDutyCycle range + // [0, 10s] for an emission starting at t=0, causing that check to fail. + err := sb.Schedule( + scheduling.NewEmission(scheduling.ConcentratorTime(9*time.Second), 3*time.Second), + ttnpb.TxSchedulePriority_HIGHEST, + ) + a.So(err, should.BeNil) + + // next() always returns the same ConcentratorTime, so ScheduleAnytime falls back + // to the backwards scan. newT = 9s + 3s + 10s - 1s = 21s, which exceeds + // now (0) + MaxScheduleAhead (20s). + { + next := func() scheduling.ConcentratorTime { return 0 } + now := scheduling.ConcentratorTime(0) + _, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now) + a.So(err, should.HaveSameErrorDefinitionAs, scheduling.ErrScheduleTooFarAhead) + } + + // With now shifted forward by 1 second, newT (21s) equals now + MaxScheduleAhead + // (1s + 20s = 21s) exactly, so the check (strictly greater than) does not fire + // and the emission is accepted. + { + next := func() scheduling.ConcentratorTime { return 0 } + now := scheduling.ConcentratorTime(time.Second) + em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now) + a.So(err, should.BeNil) + a.So(em.Starts(), should.Equal, 21*time.Second) + } +} + func TestBlockingScheduling(t *testing.T) { t.Parallel() diff --git a/pkg/gatewayserver/scheduling/util_test.go b/pkg/gatewayserver/scheduling/util_test.go index 313238e6bf..9a93e6608e 100644 --- a/pkg/gatewayserver/scheduling/util_test.go +++ b/pkg/gatewayserver/scheduling/util_test.go @@ -80,4 +80,5 @@ func float32Ptr(v float32) *float32 { return &v } func init() { scheduling.DutyCycleWindow = 10 * time.Second + scheduling.MaxScheduleAhead = 2 * scheduling.DutyCycleWindow } diff --git a/pkg/webui/locales/ja.json b/pkg/webui/locales/ja.json index 664d196ca0..c25933935d 100644 --- a/pkg/webui/locales/ja.json +++ b/pkg/webui/locales/ja.json @@ -2520,6 +2520,7 @@ "error:pkg/gatewayserver/scheduling:no_absolute_gateway_time": "アブソリュートゲートウェイタイムがありません", "error:pkg/gatewayserver/scheduling:no_clock_sync": "クロックが未同期", "error:pkg/gatewayserver/scheduling:no_server_time": "サーバの時間がありません", + "error:pkg/gatewayserver/scheduling:schedule_too_far_ahead": "", "error:pkg/gatewayserver/scheduling:sub_band_not_found": "周波数 `{frequency}` Hz に対するサブ帯域が見つかりません", "error:pkg/gatewayserver/scheduling:too_late": "予定された時間(`{delta}`)で送信するには遅すぎます", "error:pkg/gatewayserver/upstream/ns:network_server_not_found": "ネットワークサーバが見つかりません", From a56452c4e91ffefdb62135c8d3543939b0f7ed96 Mon Sep 17 00:00:00 2001 From: Imre Halasz Date: Fri, 27 Mar 2026 16:12:01 +0100 Subject: [PATCH 2/2] gs: Stop the timer if context is cancelled --- pkg/gatewayserver/io/io.go | 4 +- pkg/gatewayserver/io/udp/udp.go | 11 +- pkg/gatewayserver/io/udp/udp_test.go | 155 +++++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 3 deletions(-) diff --git a/pkg/gatewayserver/io/io.go b/pkg/gatewayserver/io/io.go index 60666765ed..255577ba69 100644 --- a/pkg/gatewayserver/io/io.go +++ b/pkg/gatewayserver/io/io.go @@ -601,7 +601,7 @@ func (c *Connection) ScheduleDown(path *ttnpb.DownlinkPath, msg *ttnpb.DownlinkM logger := logger.WithFields(log.Fields( "rx_window", i+1, "frequency", rx.frequency, - "data_rate", rx.dataRate, + "data_rate", rx.dataRate.String(), )) logger.Debug("Attempt to schedule downlink in receive window") // The maximum payload size is MACPayload only; for PHYPayload take MHDR (1 byte) and MIC (4 bytes) into account. @@ -610,7 +610,7 @@ func (c *Connection) ScheduleDown(path *ttnpb.DownlinkPath, msg *ttnpb.DownlinkM return false, false, 0, errTooLong.WithAttributes( "payload_length", len(msg.RawPayload), "maximum_length", maxPHYLength, - "data_rate", rx.dataRate, + "data_rate", rx.dataRate.String(), ) } eirp := phy.DefaultMaxEIRP diff --git a/pkg/gatewayserver/io/udp/udp.go b/pkg/gatewayserver/io/udp/udp.go index 24758c9ab5..9523c108ed 100644 --- a/pkg/gatewayserver/io/udp/udp.go +++ b/pkg/gatewayserver/io/udp/udp.go @@ -530,7 +530,16 @@ func (s *srv) handleDown(ctx context.Context, st *state) error { st.clockMu.RUnlock() d := time.Until(serverTime.Add(-s.config.ScheduleLateTime)) logger.WithField("duration", d).Debug("Wait to schedule downlink message late") - time.AfterFunc(d, write) + go func() { + t := time.NewTimer(d) + defer t.Stop() + select { + case <-t.C: + write() + case <-ctx.Done(): + case <-st.io.Context().Done(): + } + }() case <-healthCheck.C: if st.isPullPathActive(s.config.DownlinkPathExpires) { break diff --git a/pkg/gatewayserver/io/udp/udp_test.go b/pkg/gatewayserver/io/udp/udp_test.go index 7636102bfe..c060f2eebb 100644 --- a/pkg/gatewayserver/io/udp/udp_test.go +++ b/pkg/gatewayserver/io/udp/udp_test.go @@ -202,6 +202,161 @@ func TestConnection(t *testing.T) { cancelCtx() } +// TestScheduleLateCancel verifies that cancelling a connection context stops +// a pending late-scheduled write goroutine before it fires, rather than +// letting it write to the gateway after the connection has been torn down. +// This is a regression test for the time.AfterFunc timer leak in handleDown. +func TestScheduleLateCancel(t *testing.T) { + t.Parallel() + + var ( + registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"} + timeout = (1 << 4) * test.Delay + testConfig = Config{ + PacketHandlers: 2, + PacketBuffer: 10, + DownlinkPathExpires: 8 * timeout, + ConnectionExpires: 20 * timeout, + ScheduleLateTime: 0, + } + ) + + a, ctx := test.New(t) + ctx, cancelCtx := context.WithCancel(ctx) + defer cancelCtx() + + is, _, closeIS := mockis.New(ctx) + defer closeIS() + + c := componenttest.NewComponent(t, &component.Config{ + ServiceBase: config.ServiceBase{ + FrequencyPlans: config.FrequencyPlansConfig{ + ConfigSource: "static", + Static: test.StaticFrequencyPlans, + }, + }, + }) + componenttest.StartComponent(t, c) + defer c.Close() + + gs := mock.NewServer(c, is) + addr, _ := net.ResolveUDPAddr("udp", ":0") + lis, err := net.ListenUDP("udp", addr) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + go Serve(ctx, gs, lis, testConfig) // nolint:errcheck + + connections := &sync.Map{} + eui := types.EUI64{0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05} + + udpConn, err := net.Dial("udp", lis.LocalAddr().String()) + if !a.So(err, should.BeNil) { + t.FailNow() + } + defer udpConn.Close() + + // Establish a downlink path by sending PULL_DATA. + pullPacket := generatePullData(eui) + pullPacket.Token = [2]byte{0x00, 0x01} + pullBuf, err := pullPacket.MarshalBinary() + if !a.So(err, should.BeNil) { + t.FailNow() + } + _, err = udpConn.Write(pullBuf) + if !a.So(err, should.BeNil) { + t.FailNow() + } + expectAck(t, udpConn, true, encoding.PullAck, pullPacket.Token) + + conn := expectConnection(t, gs, connections, eui, true) + + // Sync the gateway clock by sending PUSH_DATA with a known concentrator timestamp. + syncConcentratorTime := 300 * test.Delay + pushPacket := generatePushData(eui, false, syncConcentratorTime) + pushPacket.Token = [2]byte{0x00, 0x02} + pushBuf, err := pushPacket.MarshalBinary() + if !a.So(err, should.BeNil) { + t.FailNow() + } + _, err = udpConn.Write(pushBuf) + if !a.So(err, should.BeNil) { + t.FailNow() + } + clockSynced := time.Now() + expectAck(t, udpConn, true, encoding.PushAck, pushPacket.Token) + time.Sleep(timeout) // ensure the clock sync is processed before scheduling + + // Schedule a Class A downlink. No TxAck has been received yet, so + // canImmediate=false; with a synced clock, handleDown takes the late-schedule + // path and starts a goroutine with a timer for d = time.Until(serverTime). + path := &ttnpb.DownlinkPath{ + Path: &ttnpb.DownlinkPath_UplinkToken{ + UplinkToken: io.MustUplinkToken( + &ttnpb.GatewayAntennaIdentifiers{GatewayIds: ®isteredGatewayID}, + uint32(syncConcentratorTime/time.Microsecond), // nolint:gosec + scheduling.ConcentratorTime(syncConcentratorTime), + time.Unix(0, int64(syncConcentratorTime)), + nil, + ), + }, + } + msg := &ttnpb.DownlinkMessage{ + RawPayload: []byte{0x01}, + Settings: &ttnpb.DownlinkMessage_Request{ + Request: &ttnpb.TxRequest{ + Class: ttnpb.Class_CLASS_A, + Priority: ttnpb.TxSchedulePriority_NORMAL, + Rx1Delay: ttnpb.RxDelay_RX_DELAY_1, + Rx1DataRate: &ttnpb.DataRate{ + Modulation: &ttnpb.DataRate_Lora{ + Lora: &ttnpb.LoRaDataRate{ + SpreadingFactor: 7, + Bandwidth: 125000, + CodingRate: band.Cr4_5, + }, + }, + }, + Rx1Frequency: 868100000, + FrequencyPlanId: test.EUFrequencyPlanID, + }, + }, + } + _, _, _, err = conn.ScheduleDown(path, msg) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + // Compute the wall-clock time at which the timer goroutine would call write(). + // serverTime(T) = clockSynced + (T - syncConcentratorTime); with ScheduleLateTime=0, + // d = time.Until(serverTime(scheduledTimestamp)). + scheduledTimestamp := time.Duration(msg.GetScheduled().Timestamp) * time.Microsecond + expectedFireTime := clockSynced.Add(-syncConcentratorTime).Add(scheduledTimestamp) + + // Give handleDown time to dequeue the message and start the timer goroutine. + time.Sleep(timeout) + + // Cancel the connection. The goroutine must observe ctx.Done() and exit + // without calling write(), so no PULL_RESP should be sent to the gateway. + conn.Disconnect(context.Canceled) + + // Read from the UDP connection until expectedFireTime + margin. A broken + // implementation (time.AfterFunc) would deliver a PULL_RESP near + // expectedFireTime. With the fix the goroutine exits on cancel and nothing + // is written. + var buf [65507]byte + udpConn.SetReadDeadline(expectedFireTime.Add(2 * timeout)) // nolint:errcheck,gosec + n, readErr := udpConn.Read(buf[:]) + if readErr == nil { + var pkt encoding.Packet + if unmarshalErr := pkt.UnmarshalBinary(buf[:n]); unmarshalErr == nil { + a.So(pkt.PacketType, should.NotEqual, encoding.PullResp) + } + } + // A deadline-exceeded error means nothing was written — the expected outcome. +} + func TestFrontend(t *testing.T) { t.Parallel() iotest.Frontend(t, iotest.FrontendConfig{