package scheduler import ( "log" "strings" "time" "gitea.ceperka.net/rosti/incus-sentinel/incus" "github.com/robfig/cron/v3" ) const queueLength = 100 const planReasonBackup = "backup" const planReasonSync = "sync" type schedulerPlan struct { Instance incus.Instance Reason string // backup or sync } // Scheduler is the main struct that handles the scheduling of backups and syncs. // It keeps internal queue of instances to be processed and it runs cron-like // schedules for each instance. type Scheduler struct { driver Driver notifier Notifier planner chan schedulerPlan // Receives instance to be backed up and/or synced queueDumpRequest chan bool // Request to dump the queue queueDumpResponse chan []schedulerPlan // Response with the queue exit chan bool // Exit signal scheduledFootprint string // Footprint of the scheduler, used to identify if the scheduler needs to be changed cron *cron.Cron } func NewScheduler(d Driver, n Notifier) *Scheduler { cron.New(cron.WithSeconds()) s := &Scheduler{ driver: d, notifier: n, planner: make(chan schedulerPlan, 10), queueDumpRequest: make(chan bool, 1), queueDumpResponse: make(chan []schedulerPlan, 1), exit: make(chan bool), } go s.runMainLoop() return s } func (s *Scheduler) do(plan schedulerPlan, done chan schedulerPlan) { defer func() { done <- plan }() // Do the actual work sen := plan.Instance.Sentinel() var err error if plan.Reason == planReasonBackup { start := time.Now() err = s.driver.Backup(plan.Instance.Name, []string{"sentinel"}) if err != nil { log.Printf("Failed to backup %s: %s", plan.Instance.Name, err.Error()) } log.Printf("Backup of %s took %s", plan.Instance.Name, time.Since(start).String()) if sen.BackupNotifyURL != "" && s.notifier != nil { err = s.notifier.Notify(sen.BackupNotifyURL) if err != nil { log.Printf("Failed to notify %s: %s", sen.BackupNotifyURL, err.Error()) } } } if plan.Reason == planReasonSync { start := time.Now() err = s.driver.Sync(plan.Instance.Name, plan.Instance.Name+sen.SyncTargetInstanceSuffix, sen.SyncTargetRemote, sen.SyncTargetPool) if err != nil { log.Printf("Failed to sync %s: %s", plan.Instance.Name, err.Error()) } log.Printf("Sync of %s took %s", plan.Instance.Name, time.Since(start).String()) if sen.SyncNotifyURL != "" && s.notifier != nil { err = s.notifier.Notify(sen.SyncNotifyURL) if err != nil { log.Printf("Failed to notify %s: %s", sen.SyncNotifyURL, err.Error()) } } } } // Loop that does the heavy-lifting func (s *Scheduler) runMainLoop() error { inProgress := false done := make(chan schedulerPlan) instancesToProcess := []schedulerPlan{} queue := make(chan schedulerPlan, queueLength) queuedInstances := make(map[string]schedulerPlan) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { // New instance to plan case newPlan := <-s.planner: if len(queue) >= queueLength { log.Printf("Queue full (%d)), dropping plan for %s of %s", len(queue), newPlan.Reason, newPlan.Instance.Name) continue } // Check if instance is already planned _, found := queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] if !found { queue <- newPlan queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] = newPlan } // Instance is done, remove it from the tracker case plan := <-done: delete(queuedInstances, plan.Reason+":"+plan.Instance.Name) inProgress = false // Dump the queue case <-s.queueDumpRequest: response := []schedulerPlan{} copy(response, instancesToProcess) s.queueDumpResponse <- response case <-s.exit: if inProgress { log.Println("Waiting for the current instance to finish ..") <-done } return nil // Check the queue and start processing case <-ticker.C: if !inProgress && len(queue) > 0 { inProgress = true go s.do(<-queue, done) } } } } func (s *Scheduler) Close() { s.exit <- true } func (s *Scheduler) Run() error { lastQueueDump := "" log.Println("Starting scheduler ..") for { s.refresh() // Print the list of instances in the queue s.queueDumpRequest <- true plans := <-s.queueDumpResponse if len(plans) > 0 { output := []string{} for _, plan := range plans { output = append(output, plan.Reason+":"+plan.Instance.Name) } outputString := strings.Join(output, ", ") if lastQueueDump != outputString { log.Println("Queue:", outputString) lastQueueDump = outputString } } time.Sleep(15 * time.Second) } } func (s *Scheduler) footprint(is []incus.Instance) string { footprint := "" for _, inst := range is { sen := inst.Sentinel() footprint += inst.Name + sen.BackupSchedule + sen.SyncSchedule } return footprint } // Checks if the scheduler needs to be refreshed and does it if needed func (s *Scheduler) refresh() error { instances, err := s.driver.GetInstances("") if err != nil { return err } err = s.setupInstanceSchedules(instances) if err != nil { return err } return nil } // Refresh cron like schedulers for all instances func (s *Scheduler) setupInstanceSchedules(is []incus.Instance) error { if s.scheduledFootprint == s.footprint(is) { return nil } log.Println("Refreshing scheduler ..") if s.cron != nil { s.cron.Stop() } s.cron = cron.New() for _, inst := range is { sen := inst.Sentinel() if sen.Backup { log.Println(".. adding backup schedule for", inst.Name, sen.BackupSchedule) _, err := s.cron.AddFunc(sen.BackupSchedule, func() { s.planner <- schedulerPlan{Instance: inst, Reason: planReasonBackup} }) if err != nil { return err } } if sen.Sync { log.Println(".. adding sync schedule for", inst.Name, sen.SyncSchedule) _, err := s.cron.AddFunc(sen.SyncSchedule, func() { s.planner <- schedulerPlan{Instance: inst, Reason: planReasonSync} }) if err != nil { return err } } } s.scheduledFootprint = s.footprint(is) s.cron.Start() return nil }