Skip to content

Commit e140b68

Browse files
committed
Add ScanNow()
Source: radovskyb#94
1 parent f5989f8 commit e140b68

2 files changed

Lines changed: 142 additions & 3 deletions

File tree

watcher.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ var (
2222
// from previously calling Start and not yet calling Close.
2323
ErrWatcherRunning = errors.New("error: watcher is already running")
2424

25+
// ErrWatcherNotRunning occurs when trying to perform a ScanNow
26+
// when the watcher is not running. It will also occur if Close is called
27+
// whilst a ScanNow() is running / pending.
28+
ErrWatcherNotRunning = errors.New("error: watcher is not running")
29+
2530
// ErrWatchedFileDeleted is an error that occurs when a file or folder that was
2631
// being watched has been deleted.
2732
ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted")
@@ -129,6 +134,7 @@ type Watcher struct {
129134
ops map[Op]struct{} // Op filtering.
130135
ignoreHidden bool // ignore hidden files or not.
131136
maxEvents int // max sent events per cycle
137+
scanNow chan chan struct{} // allows requests for immediate synchronous scans
132138
}
133139

134140
// New creates a new Watcher.
@@ -147,6 +153,7 @@ func New() *Watcher {
147153
files: make(map[string]os.FileInfo),
148154
ignored: make(map[string]struct{}),
149155
names: make(map[string]bool),
156+
scanNow: make(chan chan struct{}),
150157
}
151158
}
152159

@@ -550,6 +557,8 @@ func (w *Watcher) Start(d time.Duration) error {
550557
// Unblock w.Wait().
551558
w.wg.Done()
552559

560+
var scanNowRequest chan struct{}
561+
553562
for {
554563
// done lets the inner polling cycle loop know when the
555564
// current cycle's method has finished executing.
@@ -589,7 +598,7 @@ func (w *Watcher) Start(d time.Duration) error {
589598
}
590599
}
591600
numEvents++
592-
if w.maxEvents > 0 && numEvents > w.maxEvents {
601+
if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents {
593602
close(cancel)
594603
break inner
595604
}
@@ -604,8 +613,52 @@ func (w *Watcher) Start(d time.Duration) error {
604613
w.files = fileList
605614
w.mu.Unlock()
606615

616+
if scanNowRequest != nil {
617+
close(scanNowRequest)
618+
scanNowRequest = nil
619+
}
620+
607621
// Sleep and then continue to the next loop iteration.
608-
time.Sleep(d)
622+
// If a request to do a full scan is received, handle it and then signal to the requester it is complete.
623+
select {
624+
case <-w.close: // break out of wait early if we get a Close
625+
case scanNowRequest = <-w.scanNow: // sync scan request received
626+
case <-time.After(d): // periodic re-roll time elapsed
627+
}
628+
}
629+
}
630+
631+
// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files
632+
// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files
633+
// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events
634+
// for the changes have been gathered before continuing, for example, to better process batched updates to groups of
635+
// files.
636+
// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan
637+
// before going back to sleep.
638+
func (w *Watcher) ScanNow() error {
639+
w.mu.Lock()
640+
if !w.running {
641+
w.mu.Unlock()
642+
return ErrWatcherNotRunning
643+
}
644+
w.mu.Unlock()
645+
646+
scanComplete := make(chan struct{})
647+
select {
648+
case w.scanNow <- scanComplete:
649+
case <-w.close:
650+
// if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return
651+
// an error
652+
return ErrWatcherNotRunning
653+
}
654+
655+
select {
656+
case <-w.close:
657+
// if the watcher is closed whilst we're waiting for our scan to complete, return an error
658+
return ErrWatcherNotRunning
659+
case <-scanComplete:
660+
// scan completed ok
661+
return nil
609662
}
610663
}
611664

@@ -700,6 +753,7 @@ func (w *Watcher) Wait() {
700753
}
701754

702755
// Close stops a Watcher and unlocks its mutex, then sends a close signal.
756+
// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher.
703757
func (w *Watcher) Close() {
704758
w.mu.Lock()
705759
if !w.running {
@@ -711,5 +765,6 @@ func (w *Watcher) Close() {
711765
w.names = make(map[string]bool)
712766
w.mu.Unlock()
713767
// Send a close signal to the Start method.
714-
w.close <- struct{}{}
768+
// Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also.
769+
close(w.close)
715770
}

watcher_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,90 @@ func TestTriggerEvent(t *testing.T) {
539539
wg.Wait()
540540
}
541541

542+
func TestScanNow(t *testing.T) {
543+
testDir, teardown := setup(t)
544+
defer teardown()
545+
546+
w := New()
547+
w.FilterOps(Create)
548+
549+
// Add the testDir to the watchlist.
550+
if err := w.AddRecursive(testDir); err != nil {
551+
t.Fatal(err)
552+
}
553+
554+
// should not be able to ScanNow() before the watcher is started
555+
if err := w.ScanNow(); err != ErrWatcherNotRunning {
556+
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
557+
}
558+
559+
testFilePath := filepath.Join(testDir, "test_file1.txt")
560+
done := make(chan struct{})
561+
go func() {
562+
evt := <-w.Event
563+
if evt.Op == Create && evt.Path == testFilePath {
564+
close(done)
565+
} else {
566+
t.Fatal("unexpected event")
567+
}
568+
}()
569+
570+
// Start scanning with a very long poll duration
571+
go func() {
572+
if err := w.Start(time.Hour); err != nil {
573+
t.Fatal(err)
574+
}
575+
}()
576+
577+
w.Wait()
578+
defer w.Close()
579+
580+
// perform initial scan, which should yield no changes
581+
// this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again
582+
if err := w.ScanNow(); err != nil {
583+
t.Error(err)
584+
}
585+
586+
// wait for a short period just to ensure no unexpected events arrive
587+
select {
588+
case <-time.After(time.Millisecond * 100):
589+
case <-done:
590+
t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed")
591+
}
592+
593+
// create the test file, we will not receive events due to the 1hr poll duration
594+
if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil {
595+
t.Error(err)
596+
}
597+
598+
// wait for a short period just to ensure no unexpected events arrive now we've changed a file
599+
select {
600+
case <-time.After(time.Millisecond * 100):
601+
case <-done:
602+
t.Fatal("should not have received an event as a poll duration of 1 hour is used")
603+
}
604+
605+
// issue a scan now, and we will receive the events while ScanNow() is running.
606+
if err := w.ScanNow(); err != nil {
607+
t.Error(err)
608+
}
609+
610+
// all events should have been received *whilst* ScanNow() was running, so the done channel should already be
611+
// closed
612+
select {
613+
case <-done:
614+
default:
615+
t.Fatal("events from ScanNow() should have been received before ScanNow() returned")
616+
}
617+
618+
w.Close()
619+
620+
// issue a scan now after closing, should error
621+
if err := w.ScanNow(); err != ErrWatcherNotRunning {
622+
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
623+
}
624+
}
625+
542626
func TestEventAddFile(t *testing.T) {
543627
testDir, teardown := setup(t)
544628
defer teardown()

0 commit comments

Comments
 (0)