package scheduler import ( "log" "strings" "time" "gitea.ceperka.net/rosti/incus-sentinel/incus" "github.com/robfig/cron/v3" ) const queueLength = 100 const planReasonBackup = "backup" const planReasonBackupVolume = "backup-volume" const planReasonSync = "sync" const planReasonSyncVolume = "sync-volume" var defaultTags = []string{"sentinel"} type schedulerPlan struct { Instance incus.Instance Volume incus.Volume Reason string // backup or sync } // Scheduler is the main struct that handles the scheduling of backups and syncs. // It keeps internal queue of instances to be processed and it runs cron-like // schedules for each instance. type Scheduler struct { driver Driver notifier Notifier planner chan schedulerPlan // Receives instance to be backed up and/or synced queueDumpRequest chan bool // Request to dump the queue queueDumpResponse chan []schedulerPlan // Response with the queue exit chan bool // Exit signal scheduledFootprint string // Footprint of the scheduler, used to identify if the scheduler needs to be changed cron *cron.Cron } func NewScheduler(d Driver, n Notifier) *Scheduler { cron.New(cron.WithSeconds()) s := &Scheduler{ driver: d, notifier: n, planner: make(chan schedulerPlan, 10), queueDumpRequest: make(chan bool, 1), queueDumpResponse: make(chan []schedulerPlan, 1), exit: make(chan bool), } go s.runMainLoop() return s } func (s *Scheduler) do(plan schedulerPlan, done chan schedulerPlan) { defer func() { done <- plan }() // Do the actual work sen := plan.Instance.Sentinel() var err error start := time.Now() notifyURL := "" switch plan.Reason { case planReasonBackup: log.Printf(".. backup of %s/%s instance started", plan.Instance.Project, plan.Instance.Name) err = s.driver.Backup(plan.Instance.Project, plan.Instance.Name, defaultTags) if err != nil { log.Printf(".. failed to backup %s: %s", plan.Instance.Name, err.Error()) return } log.Printf(".. backup of %s took %s", plan.Instance.Name, time.Since(start).String()) notifyURL = sen.BackupNotifyURL case planReasonSync: log.Printf(".. syncing of %s/%s instance started", plan.Instance.Project, plan.Instance.Name) err = s.driver.Sync(plan.Instance.Project, plan.Instance.Name, plan.Instance.Name+sen.SyncTargetInstanceSuffix, sen.SyncTargetRemote, sen.SyncTargetPool) if err != nil { log.Printf(".. failed to sync %s: %s", plan.Instance.Name, err.Error()) return } log.Printf(".. sync of %s took %s", plan.Instance.Name, time.Since(start).String()) notifyURL = sen.SyncNotifyURL case planReasonBackupVolume: log.Printf(".. backup of %s/%s/%s volume started", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name) sen := plan.Volume.Sentinel() if sen.BackupMode == "dir" { err = s.driver.BackupVolumeDir(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags) } else if sen.BackupMode == "native" { err = s.driver.BackupVolumeNative(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags) } else { log.Printf(".. invalid backup mode for volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.BackupMode) return } if err != nil { log.Printf(".. failed to backup volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error()) return } log.Printf(".. backup of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String()) notifyURL = sen.BackupNotifyURL case planReasonSyncVolume: log.Printf(".. syncing of %s/%s/%s volume started", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name) sen := plan.Volume.Sentinel() targetPool := plan.Volume.Pool if sen.SyncTargetPool != "" { targetPool = sen.SyncTargetPool } err = s.driver.SyncVolume(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.SyncTargetRemote, targetPool, plan.Volume.Name+sen.SyncTargetVolumeSuffix) if err != nil { log.Printf(".. failed to sync volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error()) return } log.Printf(".. sync of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String()) notifyURL = sen.SyncNotifyURL } if notifyURL != "" && s.notifier != nil { err = s.notifier.Notify(notifyURL) if err != nil { log.Printf(".. failed to notify %s: %s", notifyURL, err.Error()) } else { log.Printf(".. notification sent to %s", notifyURL) } } else if notifyURL != "" && s.notifier == nil { log.Println("Warning: No notifier configured, skipping notification") } } // Loop that does the heavy-lifting func (s *Scheduler) runMainLoop() error { inProgress := false done := make(chan schedulerPlan) instancesToProcess := []schedulerPlan{} queue := make(chan schedulerPlan, queueLength) queuedInstances := make(map[string]schedulerPlan) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() lastQueueLog := 0 for { select { // New instance to plan case newPlan := <-s.planner: if len(queue) >= queueLength { log.Printf("Queue full (%d)), dropping plan for %s of %s", len(queue), newPlan.Reason, newPlan.Instance.Name) continue } // Check if instance is already planned _, found := queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] if !found { queue <- newPlan queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] = newPlan } // Instance is done, remove it from the tracker case plan := <-done: delete(queuedInstances, plan.Reason+":"+plan.Instance.Name) inProgress = false // Dump the queue case <-s.queueDumpRequest: response := []schedulerPlan{} copy(response, instancesToProcess) s.queueDumpResponse <- response case <-s.exit: if inProgress { log.Println("Waiting for the current instance to finish ..") <-done } return nil // Check the queue and start processing case <-ticker.C: if !inProgress && len(queue) > 0 { inProgress = true go s.do(<-queue, done) } if len(queue) > 0 && lastQueueLog != len(queue) { log.Printf("Queue length: %d", len(queue)) } else if len(queue) == 0 && lastQueueLog != 0 { log.Println("Queue empty") } lastQueueLog = len(queue) } } } func (s *Scheduler) Close() { s.exit <- true } func (s *Scheduler) Run() error { lastQueueDump := "" log.Println("Starting scheduler ..") for { s.refresh() // Print the list of instances in the queue s.queueDumpRequest <- true plans := <-s.queueDumpResponse if len(plans) > 0 { output := []string{} for _, plan := range plans { output = append(output, plan.Reason+":"+plan.Instance.Name) } outputString := strings.Join(output, ", ") if lastQueueDump != outputString { log.Println("Queue:", outputString) lastQueueDump = outputString } } time.Sleep(15 * time.Second) } } // Footprint is something like a hash of current config that is used to determine if the scheduler needs to be refreshed func (s *Scheduler) footprint(is []incus.Instance, vols []incus.Volume) string { footprint := "" for _, inst := range is { sen := inst.Sentinel() footprint += inst.Project + inst.Name + sen.BackupSchedule + sen.SyncSchedule } for _, vol := range vols { sen := vol.Sentinel() footprint += vol.Project + vol.Name + vol.Pool + sen.BackupSchedule + sen.SyncSchedule } return footprint } // Checks if the scheduler needs to be refreshed and does it if needed func (s *Scheduler) refresh() error { instances, err := s.driver.GetInstances("") if err != nil { return err } vols, err := s.driver.GetVolumes("") if err != nil { return err } err = s.setupInstanceSchedules(instances, vols) if err != nil { return err } return nil } // Refresh cron like schedulers for all instances func (s *Scheduler) setupInstanceSchedules(is []incus.Instance, vols []incus.Volume) error { if s.scheduledFootprint == s.footprint(is, vols) { return nil } log.Println("Refreshing scheduler ..") if s.cron != nil { for _, e := range s.cron.Entries() { s.cron.Remove(e.ID) } s.cron.Stop() } s.cron = cron.New() // Instances for _, inst := range is { sen := inst.Sentinel() if sen.Backup { log.Println(".. adding backup schedule for", inst.Project, inst.Name, sen.BackupSchedule) _, err := s.cron.AddFunc(sen.BackupSchedule, func() { log.Printf(".. placing backup of %s/%s into the queue", inst.Project, inst.Name) s.planner <- schedulerPlan{Instance: inst, Reason: planReasonBackup} }) if err != nil { return err } } if sen.Sync { log.Println(".. adding sync schedule for", inst.Project, inst.Name, sen.SyncSchedule) _, err := s.cron.AddFunc(sen.SyncSchedule, func() { log.Printf(".. placing sync of %s/%s into the queue", inst.Project, inst.Name) s.planner <- schedulerPlan{Instance: inst, Reason: planReasonSync} }) if err != nil { return err } } } // Volumes for _, vol := range vols { sen := vol.Sentinel() if sen.Backup { log.Println(".. adding backup schedule for", vol.Project, vol.Pool, vol.Name, sen.BackupSchedule) _, err := s.cron.AddFunc(sen.BackupSchedule, func() { log.Printf(".. placing volume backup of %s/%s/%s into the queue", vol.Project, vol.Pool, vol.Name) s.planner <- schedulerPlan{Volume: vol, Reason: planReasonBackupVolume} }) if err != nil { return err } } if sen.Sync { log.Println(".. adding sync schedule for", vol.Project, vol.Pool, vol.Name, sen.SyncSchedule) _, err := s.cron.AddFunc(sen.SyncSchedule, func() { log.Printf(".. placing volume sync of %s/%s/%s into the queue", vol.Project, vol.Pool, vol.Name) s.planner <- schedulerPlan{Volume: vol, Reason: planReasonSyncVolume} }) if err != nil { return err } } } s.scheduledFootprint = s.footprint(is, vols) s.cron.Start() return nil }