incus-sentinel/scheduler/main.go

250 lines
6 KiB
Go
Raw Normal View History

2025-01-05 09:34:06 +00:00
package scheduler
import (
"log"
"strings"
"time"
"gitea.ceperka.net/rosti/incus-sentinel/incus"
"github.com/robfig/cron/v3"
)
const queueLength = 100
const planReasonBackup = "backup"
const planReasonSync = "sync"
type schedulerPlan struct {
Instance incus.Instance
Reason string // backup or sync
}
// Scheduler is the main struct that handles the scheduling of backups and syncs.
// It keeps internal queue of instances to be processed and it runs cron-like
// schedules for each instance.
type Scheduler struct {
driver Driver
notifier Notifier
planner chan schedulerPlan // Receives instance to be backed up and/or synced
queueDumpRequest chan bool // Request to dump the queue
queueDumpResponse chan []schedulerPlan // Response with the queue
exit chan bool // Exit signal
scheduledFootprint string // Footprint of the scheduler, used to identify if the scheduler needs to be changed
cron *cron.Cron
}
func NewScheduler(d Driver, n Notifier) *Scheduler {
cron.New(cron.WithSeconds())
s := &Scheduler{
driver: d,
notifier: n,
planner: make(chan schedulerPlan, 10),
queueDumpRequest: make(chan bool, 1),
queueDumpResponse: make(chan []schedulerPlan, 1),
exit: make(chan bool),
}
go s.runMainLoop()
return s
}
func (s *Scheduler) do(plan schedulerPlan, done chan schedulerPlan) {
defer func() {
done <- plan
}()
// Do the actual work
sen := plan.Instance.Sentinel()
var err error
if plan.Reason == planReasonBackup {
start := time.Now()
err = s.driver.Backup(plan.Instance.Name, []string{"sentinel"})
if err != nil {
log.Printf("Failed to backup %s: %s", plan.Instance.Name, err.Error())
}
log.Printf("Backup of %s took %s", plan.Instance.Name, time.Since(start).String())
if sen.BackupNotifyURL != "" && s.notifier != nil {
err = s.notifier.Notify(sen.BackupNotifyURL)
if err != nil {
log.Printf("Failed to notify %s: %s", sen.BackupNotifyURL, err.Error())
}
}
}
if plan.Reason == planReasonSync {
start := time.Now()
err = s.driver.Sync(plan.Instance.Name, plan.Instance.Name+sen.SyncTargetInstanceSuffix, sen.SyncTargetRemote, sen.SyncTargetPool)
if err != nil {
log.Printf("Failed to sync %s: %s", plan.Instance.Name, err.Error())
}
log.Printf("Sync of %s took %s", plan.Instance.Name, time.Since(start).String())
if sen.SyncNotifyURL != "" && s.notifier != nil {
err = s.notifier.Notify(sen.SyncNotifyURL)
if err != nil {
log.Printf("Failed to notify %s: %s", sen.SyncNotifyURL, err.Error())
}
}
}
}
// Loop that does the heavy-lifting
func (s *Scheduler) runMainLoop() error {
inProgress := false
done := make(chan schedulerPlan)
instancesToProcess := []schedulerPlan{}
queue := make(chan schedulerPlan, queueLength)
queuedInstances := make(map[string]schedulerPlan)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
// New instance to plan
case newPlan := <-s.planner:
if len(queue) >= queueLength {
log.Printf("Queue full (%d)), dropping plan for %s of %s", len(queue), newPlan.Reason, newPlan.Instance.Name)
continue
}
// Check if instance is already planned
_, found := queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name]
if !found {
queue <- newPlan
queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] = newPlan
}
// Instance is done, remove it from the tracker
case plan := <-done:
delete(queuedInstances, plan.Reason+":"+plan.Instance.Name)
inProgress = false
// Dump the queue
case <-s.queueDumpRequest:
response := []schedulerPlan{}
copy(response, instancesToProcess)
s.queueDumpResponse <- response
case <-s.exit:
if inProgress {
log.Println("Waiting for the current instance to finish ..")
<-done
}
return nil
// Check the queue and start processing
case <-ticker.C:
if !inProgress && len(queue) > 0 {
inProgress = true
go s.do(<-queue, done)
}
}
}
}
func (s *Scheduler) Close() {
s.exit <- true
}
func (s *Scheduler) Run() error {
lastQueueDump := ""
log.Println("Starting scheduler ..")
for {
s.refresh()
// Print the list of instances in the queue
s.queueDumpRequest <- true
plans := <-s.queueDumpResponse
if len(plans) > 0 {
output := []string{}
for _, plan := range plans {
output = append(output, plan.Reason+":"+plan.Instance.Name)
}
outputString := strings.Join(output, ", ")
if lastQueueDump != outputString {
log.Println("Queue:", outputString)
lastQueueDump = outputString
}
}
time.Sleep(15 * time.Second)
}
}
func (s *Scheduler) footprint(is []incus.Instance) string {
footprint := ""
for _, inst := range is {
sen := inst.Sentinel()
footprint += inst.Name + sen.BackupSchedule + sen.SyncSchedule
}
return footprint
}
// Checks if the scheduler needs to be refreshed and does it if needed
func (s *Scheduler) refresh() error {
instances, err := s.driver.GetInstances("")
if err != nil {
return err
}
err = s.setupInstanceSchedules(instances)
if err != nil {
return err
}
return nil
}
// Refresh cron like schedulers for all instances
func (s *Scheduler) setupInstanceSchedules(is []incus.Instance) error {
if s.scheduledFootprint == s.footprint(is) {
return nil
}
log.Println("Refreshing scheduler ..")
if s.cron != nil {
s.cron.Stop()
}
s.cron = cron.New()
for _, inst := range is {
sen := inst.Sentinel()
if sen.Backup {
log.Println(".. adding backup schedule for", inst.Name, sen.BackupSchedule)
_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
s.planner <- schedulerPlan{Instance: inst, Reason: planReasonBackup}
})
if err != nil {
return err
}
}
if sen.Sync {
log.Println(".. adding sync schedule for", inst.Name, sen.SyncSchedule)
_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
s.planner <- schedulerPlan{Instance: inst, Reason: planReasonSync}
})
if err != nil {
return err
}
}
}
s.scheduledFootprint = s.footprint(is)
s.cron.Start()
return nil
}