Fileserver/internal/filesystem/watchmanager.go
2025-08-13 09:44:02 +02:00

131 lines
3.1 KiB
Go

package filesystem
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
"github.com/fsnotify/fsnotify"
)
const FallbackWatcherQuota = 8192
// WatchManager manages filesystem watchers and triggers indexer reloads.
type WatchManager struct {
watcher *fsnotify.Watcher
root string
maxWatches int
usedWatches int
indexers []*FileIndexer
watched map[string]*FileNode
pending []*FileNode
mu sync.Mutex
}
// NewWatchManager creates a manager that allocates as many watches as possible,
// up to the given quota. If the given quota is 0 or less, system quota is used.
func NewWatchManager(indexers []*FileIndexer, userQuota int) (*WatchManager, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("create watcher: %v", err)
}
systemLimit := detectSystemWatchLimit()
quota := systemLimit
if userQuota > 0 && userQuota < quota {
quota = userQuota
}
wm := &WatchManager{
watcher: w,
maxWatches: quota,
indexers: indexers,
watched: make(map[string]*FileNode),
pending: make([]*FileNode, 0),
}
fmt.Printf("[watcher] system max_user_watches=%d, using quota=%d", systemLimit, quota)
return wm, nil
}
// addWatchLocked adds a watch for the given path. Note that caller manages mutex.
func (wm *WatchManager) addWatchLocked(path string, node *FileNode) error {
if wm.usedWatches >= wm.maxWatches {
wm.pending = append(wm.pending, node)
return fmt.Errorf("quota exhausted, deferring watch of %s", path)
}
if _, isWatched := wm.watched[path]; isWatched {
return nil
}
if err := wm.watcher.Add(path); err != nil {
return fmt.Errorf("add watched: %v", err)
}
wm.watched[path] = node
wm.usedWatches++
fmt.Printf("[watcher] watching %s (used %d/%d)", path, wm.usedWatches, wm.maxWatches)
return nil
}
// watchRecursive attempts to watch the node and its children, with given amount of available watches.
// Must also pass the FileIndexer which manages the node.
func (wm *WatchManager) watchRecursive(indexer FileIndexer, node *FileNode, available int) error {
wm.mu.Lock()
defer wm.mu.Unlock()
path := indexer.GetPath(node)
if !node.IsDir {
return nil
}
if _, isWatched := wm.watched[path]; !isWatched {
if err := wm.addWatchLocked(path, node); err != nil {
fmt.Printf("[watcher] watch failed: %v", err)
} else {
available--
}
}
for _, child := range node.Children {
if available <= 0 {
}
}
}
func detectSystemWatchLimit() int {
if runtime.GOOS != "linux" {
fmt.Printf("[watcher] cannot read system watcher quota, defaulting to %d.\n", FallbackWatcherQuota)
return FallbackWatcherQuota
}
f, err := os.Open("/proc/sys/fs/inotify/max_user_watches")
if err != nil {
fmt.Printf("[watcher] cannot read system watcher quota, defaulting to %d.\n", FallbackWatcherQuota)
return FallbackWatcherQuota
}
defer f.Close()
scanner := bufio.NewScanner(f)
if scanner.Scan() {
if v, err := strconv.Atoi(strings.TrimSpace(scanner.Text())); err == nil && v > 0 {
return v
}
}
fmt.Printf("[watcher] cannot read system watcher quota, defaulting to %d.\n", FallbackWatcherQuota)
return FallbackWatcherQuota
}