Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -5687,6 +5687,15 @@
"file": "scheduler.go"
}
},
"error:pkg/gatewayserver/scheduling:schedule_too_far_ahead": {
"translations": {
"en": "schedule time is too far in the future"
},
"description": {
"package": "pkg/gatewayserver/scheduling",
"file": "sub_band.go"
}
},
"error:pkg/gatewayserver/scheduling:sub_band_not_found": {
"translations": {
"en": "sub-band not found for frequency `{frequency}` Hz"
Expand Down
4 changes: 2 additions & 2 deletions pkg/gatewayserver/io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func (c *Connection) ScheduleDown(path *ttnpb.DownlinkPath, msg *ttnpb.DownlinkM
logger := logger.WithFields(log.Fields(
"rx_window", i+1,
"frequency", rx.frequency,
"data_rate", rx.dataRate,
"data_rate", rx.dataRate.String(),
))
logger.Debug("Attempt to schedule downlink in receive window")
// The maximum payload size is MACPayload only; for PHYPayload take MHDR (1 byte) and MIC (4 bytes) into account.
Expand All @@ -610,7 +610,7 @@ func (c *Connection) ScheduleDown(path *ttnpb.DownlinkPath, msg *ttnpb.DownlinkM
return false, false, 0, errTooLong.WithAttributes(
"payload_length", len(msg.RawPayload),
"maximum_length", maxPHYLength,
"data_rate", rx.dataRate,
"data_rate", rx.dataRate.String(),
)
}
eirp := phy.DefaultMaxEIRP
Expand Down
11 changes: 10 additions & 1 deletion pkg/gatewayserver/io/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,16 @@ func (s *srv) handleDown(ctx context.Context, st *state) error {
st.clockMu.RUnlock()
d := time.Until(serverTime.Add(-s.config.ScheduleLateTime))
logger.WithField("duration", d).Debug("Wait to schedule downlink message late")
time.AfterFunc(d, write)
go func() {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-t.C:
write()
case <-ctx.Done():
case <-st.io.Context().Done():
}
}()
case <-healthCheck.C:
if st.isPullPathActive(s.config.DownlinkPathExpires) {
break
Expand Down
155 changes: 155 additions & 0 deletions pkg/gatewayserver/io/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,161 @@ func TestConnection(t *testing.T) {
cancelCtx()
}

// TestScheduleLateCancel verifies that cancelling a connection context stops
// a pending late-scheduled write goroutine before it fires, rather than
// letting it write to the gateway after the connection has been torn down.
// This is a regression test for the time.AfterFunc timer leak in handleDown.
func TestScheduleLateCancel(t *testing.T) {
t.Parallel()

var (
registeredGatewayID = ttnpb.GatewayIdentifiers{GatewayId: "test-gateway"}
timeout = (1 << 4) * test.Delay
testConfig = Config{
PacketHandlers: 2,
PacketBuffer: 10,
DownlinkPathExpires: 8 * timeout,
ConnectionExpires: 20 * timeout,
ScheduleLateTime: 0,
}
)

a, ctx := test.New(t)
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()

is, _, closeIS := mockis.New(ctx)
defer closeIS()

c := componenttest.NewComponent(t, &component.Config{
ServiceBase: config.ServiceBase{
FrequencyPlans: config.FrequencyPlansConfig{
ConfigSource: "static",
Static: test.StaticFrequencyPlans,
},
},
})
componenttest.StartComponent(t, c)
defer c.Close()

gs := mock.NewServer(c, is)
addr, _ := net.ResolveUDPAddr("udp", ":0")
lis, err := net.ListenUDP("udp", addr)
if !a.So(err, should.BeNil) {
t.FailNow()
}

go Serve(ctx, gs, lis, testConfig) // nolint:errcheck

connections := &sync.Map{}
eui := types.EUI64{0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05, 0x05}

udpConn, err := net.Dial("udp", lis.LocalAddr().String())
if !a.So(err, should.BeNil) {
t.FailNow()
}
defer udpConn.Close()

// Establish a downlink path by sending PULL_DATA.
pullPacket := generatePullData(eui)
pullPacket.Token = [2]byte{0x00, 0x01}
pullBuf, err := pullPacket.MarshalBinary()
if !a.So(err, should.BeNil) {
t.FailNow()
}
_, err = udpConn.Write(pullBuf)
if !a.So(err, should.BeNil) {
t.FailNow()
}
expectAck(t, udpConn, true, encoding.PullAck, pullPacket.Token)

conn := expectConnection(t, gs, connections, eui, true)

// Sync the gateway clock by sending PUSH_DATA with a known concentrator timestamp.
syncConcentratorTime := 300 * test.Delay
pushPacket := generatePushData(eui, false, syncConcentratorTime)
pushPacket.Token = [2]byte{0x00, 0x02}
pushBuf, err := pushPacket.MarshalBinary()
if !a.So(err, should.BeNil) {
t.FailNow()
}
_, err = udpConn.Write(pushBuf)
if !a.So(err, should.BeNil) {
t.FailNow()
}
clockSynced := time.Now()
expectAck(t, udpConn, true, encoding.PushAck, pushPacket.Token)
time.Sleep(timeout) // ensure the clock sync is processed before scheduling

// Schedule a Class A downlink. No TxAck has been received yet, so
// canImmediate=false; with a synced clock, handleDown takes the late-schedule
// path and starts a goroutine with a timer for d = time.Until(serverTime).
path := &ttnpb.DownlinkPath{
Path: &ttnpb.DownlinkPath_UplinkToken{
UplinkToken: io.MustUplinkToken(
&ttnpb.GatewayAntennaIdentifiers{GatewayIds: &registeredGatewayID},
uint32(syncConcentratorTime/time.Microsecond), // nolint:gosec
scheduling.ConcentratorTime(syncConcentratorTime),
time.Unix(0, int64(syncConcentratorTime)),
nil,
),
},
}
msg := &ttnpb.DownlinkMessage{
RawPayload: []byte{0x01},
Settings: &ttnpb.DownlinkMessage_Request{
Request: &ttnpb.TxRequest{
Class: ttnpb.Class_CLASS_A,
Priority: ttnpb.TxSchedulePriority_NORMAL,
Rx1Delay: ttnpb.RxDelay_RX_DELAY_1,
Rx1DataRate: &ttnpb.DataRate{
Modulation: &ttnpb.DataRate_Lora{
Lora: &ttnpb.LoRaDataRate{
SpreadingFactor: 7,
Bandwidth: 125000,
CodingRate: band.Cr4_5,
},
},
},
Rx1Frequency: 868100000,
FrequencyPlanId: test.EUFrequencyPlanID,
},
},
}
_, _, _, err = conn.ScheduleDown(path, msg)
if !a.So(err, should.BeNil) {
t.FailNow()
}

// Compute the wall-clock time at which the timer goroutine would call write().
// serverTime(T) = clockSynced + (T - syncConcentratorTime); with ScheduleLateTime=0,
// d = time.Until(serverTime(scheduledTimestamp)).
scheduledTimestamp := time.Duration(msg.GetScheduled().Timestamp) * time.Microsecond
expectedFireTime := clockSynced.Add(-syncConcentratorTime).Add(scheduledTimestamp)

// Give handleDown time to dequeue the message and start the timer goroutine.
time.Sleep(timeout)

// Cancel the connection. The goroutine must observe ctx.Done() and exit
// without calling write(), so no PULL_RESP should be sent to the gateway.
conn.Disconnect(context.Canceled)

// Read from the UDP connection until expectedFireTime + margin. A broken
// implementation (time.AfterFunc) would deliver a PULL_RESP near
// expectedFireTime. With the fix the goroutine exits on cancel and nothing
// is written.
var buf [65507]byte
udpConn.SetReadDeadline(expectedFireTime.Add(2 * timeout)) // nolint:errcheck,gosec
n, readErr := udpConn.Read(buf[:])
if readErr == nil {
var pkt encoding.Packet
if unmarshalErr := pkt.UnmarshalBinary(buf[:n]); unmarshalErr == nil {
a.So(pkt.PacketType, should.NotEqual, encoding.PullResp)
}
}
// A deadline-exceeded error means nothing was written — the expected outcome.
}

func TestFrontend(t *testing.T) {
t.Parallel()
iotest.Frontend(t, iotest.FrontendConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/gatewayserver/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (s *Scheduler) ScheduleAnytime(ctx context.Context, opts Options) (res Emis
}
return em.t
}
em, err = sb.ScheduleAnytime(em.d, next, opts.Priority)
em, err = sb.ScheduleAnytime(em.d, next, opts.Priority, now)
if err != nil {
return Emission{}, 0, err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/gatewayserver/scheduling/scheduler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package scheduling

var (
ErrConflict = errConflict
ErrDwellTime = errDwellTime
ErrTooLate = errTooLate
ErrDutyCycle = errDutyCycle
ErrConflict = errConflict
ErrDwellTime = errDwellTime
ErrTooLate = errTooLate
ErrDutyCycle = errDutyCycle
ErrScheduleTooFarAhead = errScheduleTooFarAhead
)
24 changes: 22 additions & 2 deletions pkg/gatewayserver/scheduling/sub_band.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
// A lower value results in balancing capacity in time, while a higher value allows for bursts.
var DutyCycleWindow = 1 * time.Hour

// MaxScheduleAhead is the maximum duration into the future that ScheduleAnytime may place an emission.
// Emissions beyond this are rejected to avoid scheduling downlinks that would never actually be transmitted
// in practice.
var MaxScheduleAhead = 2 * DutyCycleWindow

// DutyCycleStyle represents the of duty cycle algorithm to be used by a sub band.
type DutyCycleStyle int

Expand Down Expand Up @@ -144,6 +149,10 @@ var (
"blocked",
"sub band is blocked for `{duration}`",
)
errScheduleTooFarAhead = errors.DefineResourceExhausted(
"schedule_too_far_ahead",
"schedule time is too far in the future",
)
)

// checkDutyCycle verifies if the emission complies with the duty cycle limitations, based on the style.
Expand Down Expand Up @@ -211,7 +220,12 @@ func (sb *SubBand) Schedule(em Emission, p ttnpb.TxSchedulePriority) error {
// ScheduleAnytime schedules the given duration at a time when there is availability by accounting for duty-cycle.
// The given next callback should return the next option that does not conflict with other scheduled downlinks.
// If there is no duty-cycle limitation, this method returns the first option.
func (sb *SubBand) ScheduleAnytime(d time.Duration, next func() ConcentratorTime, p ttnpb.TxSchedulePriority) (Emission, error) {
func (sb *SubBand) ScheduleAnytime(
d time.Duration,
next func() ConcentratorTime,
p ttnpb.TxSchedulePriority,
now ConcentratorTime,
) (Emission, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
em := NewEmission(next(), d)
Expand Down Expand Up @@ -241,7 +255,13 @@ func (sb *SubBand) ScheduleAnytime(d time.Duration, next func() ConcentratorTime
other := sb.emissions[i]
used += float32(other.d) / float32(DutyCycleWindow)
if used > usable {
em.t = other.Ends() + ConcentratorTime(DutyCycleWindow) - ConcentratorTime(em.d)
newT := other.Ends() + ConcentratorTime(DutyCycleWindow) - ConcentratorTime(em.d)
// If the new time is too far in the future, return an error instead of scheduling an emission that would
// never be transmitted in practice.
if newT > now+ConcentratorTime(MaxScheduleAhead) {
return Emission{}, errScheduleTooFarAhead.New()
}
em.t = newT
break
}
}
Expand Down
59 changes: 56 additions & 3 deletions pkg/gatewayserver/scheduling/sub_band_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func TestSubBandScheduleRestricted(t *testing.T) {
}

func TestScheduleAnytimeRestricted(t *testing.T) {
t.Parallel()

a := assertions.New(t)
params := scheduling.SubBandParameters{
MinFrequency: 0,
Expand Down Expand Up @@ -204,7 +206,8 @@ func TestScheduleAnytimeRestricted(t *testing.T) {
from += scheduling.ConcentratorTime(time.Second)
return res
}
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL)
now := scheduling.ConcentratorTime(time.Now().UnixNano())
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
a.So(err, should.BeNil)
a.So(em.Starts(), should.Equal, 16*time.Second)
// [ 1 2 4 3 ]
Expand All @@ -217,7 +220,8 @@ func TestScheduleAnytimeRestricted(t *testing.T) {
next := func() scheduling.ConcentratorTime {
return scheduling.ConcentratorTime(19 * time.Second)
}
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL)
now := scheduling.ConcentratorTime(time.Now().UnixNano())
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
a.So(err, should.BeNil)
a.So(em.Starts(), should.Equal, 26*time.Second)
// [ 1 2 4 3 5]
Expand All @@ -229,11 +233,60 @@ func TestScheduleAnytimeRestricted(t *testing.T) {
next := func() scheduling.ConcentratorTime {
return scheduling.ConcentratorTime(19 * time.Second)
}
_, err := sb.ScheduleAnytime(5*time.Second, next, ttnpb.TxSchedulePriority_NORMAL)
now := scheduling.ConcentratorTime(time.Now().UnixNano())
_, err := sb.ScheduleAnytime(5*time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
a.So(err, should.HaveSameErrorDefinitionAs, scheduling.ErrDutyCycle)
}
}

func TestScheduleAnytimeTooFarAhead(t *testing.T) {
t.Parallel()
// In the test environment DutyCycleWindow = 10s and MaxScheduleAhead = 20s.
a := assertions.New(t)
params := scheduling.SubBandParameters{
MinFrequency: 0,
MaxFrequency: math.MaxUint64,
DutyCycle: 0.5,
}
clock := &mockClock{}
ceilings := map[ttnpb.TxSchedulePriority]float32{
ttnpb.TxSchedulePriority_NORMAL: 0.5, // usable = 0.25
ttnpb.TxSchedulePriority_HIGHEST: 1.0, // usable = 0.50
}
sb := scheduling.NewSubBand(params, clock, ceilings, scheduling.DefaultDutyCycleStyle)

// Schedule a 3-second emission at t=9s using HIGHEST priority.
// It occupies 30% of the duty-cycle window (10s), within HIGHEST (50%) but above
// NORMAL (25%). Its window [9s, 12s] falls within the checkDutyCycle range
// [0, 10s] for an emission starting at t=0, causing that check to fail.
err := sb.Schedule(
scheduling.NewEmission(scheduling.ConcentratorTime(9*time.Second), 3*time.Second),
ttnpb.TxSchedulePriority_HIGHEST,
)
a.So(err, should.BeNil)

// next() always returns the same ConcentratorTime, so ScheduleAnytime falls back
// to the backwards scan. newT = 9s + 3s + 10s - 1s = 21s, which exceeds
// now (0) + MaxScheduleAhead (20s).
{
next := func() scheduling.ConcentratorTime { return 0 }
now := scheduling.ConcentratorTime(0)
_, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
a.So(err, should.HaveSameErrorDefinitionAs, scheduling.ErrScheduleTooFarAhead)
}

// With now shifted forward by 1 second, newT (21s) equals now + MaxScheduleAhead
// (1s + 20s = 21s) exactly, so the check (strictly greater than) does not fire
// and the emission is accepted.
{
next := func() scheduling.ConcentratorTime { return 0 }
now := scheduling.ConcentratorTime(time.Second)
em, err := sb.ScheduleAnytime(time.Second, next, ttnpb.TxSchedulePriority_NORMAL, now)
a.So(err, should.BeNil)
a.So(em.Starts(), should.Equal, 21*time.Second)
}
}

func TestBlockingScheduling(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions pkg/gatewayserver/scheduling/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@ func float32Ptr(v float32) *float32 { return &v }

func init() {
scheduling.DutyCycleWindow = 10 * time.Second
scheduling.MaxScheduleAhead = 2 * scheduling.DutyCycleWindow
}
1 change: 1 addition & 0 deletions pkg/webui/locales/ja.json
Original file line number Diff line number Diff line change
Expand Up @@ -2520,6 +2520,7 @@
"error:pkg/gatewayserver/scheduling:no_absolute_gateway_time": "アブソリュートゲートウェイタイムがありません",
"error:pkg/gatewayserver/scheduling:no_clock_sync": "クロックが未同期",
"error:pkg/gatewayserver/scheduling:no_server_time": "サーバの時間がありません",
"error:pkg/gatewayserver/scheduling:schedule_too_far_ahead": "",
"error:pkg/gatewayserver/scheduling:sub_band_not_found": "周波数 `{frequency}` Hz に対するサブ帯域が見つかりません",
"error:pkg/gatewayserver/scheduling:too_late": "予定された時間(`{delta}`)で送信するには遅すぎます",
"error:pkg/gatewayserver/upstream/ns:network_server_not_found": "ネットワークサーバが見つかりません",
Expand Down
Loading