incus-sentinel/scheduler/main.go

353 lines
9.8 KiB
Go

package scheduler
import (
"log"
"strings"
"time"
"gitea.ceperka.net/rosti/incus-sentinel/incus"
"github.com/robfig/cron/v3"
)
const queueLength = 100
const planReasonBackup = "backup"
const planReasonBackupVolume = "backup-volume"
const planReasonSync = "sync"
const planReasonSyncVolume = "sync-volume"
var defaultTags = []string{"sentinel"}
type schedulerPlan struct {
Instance incus.Instance
Volume incus.Volume
Reason string // backup or sync
}
// Scheduler is the main struct that handles the scheduling of backups and syncs.
// It keeps internal queue of instances to be processed and it runs cron-like
// schedules for each instance.
type Scheduler struct {
driver Driver
notifier Notifier
planner chan schedulerPlan // Receives instance to be backed up and/or synced
queueDumpRequest chan bool // Request to dump the queue
queueDumpResponse chan []schedulerPlan // Response with the queue
exit chan bool // Exit signal
scheduledFootprint string // Footprint of the scheduler, used to identify if the scheduler needs to be changed
cron *cron.Cron
}
func NewScheduler(d Driver, n Notifier) *Scheduler {
cron.New(cron.WithSeconds())
s := &Scheduler{
driver: d,
notifier: n,
planner: make(chan schedulerPlan, 10),
queueDumpRequest: make(chan bool, 1),
queueDumpResponse: make(chan []schedulerPlan, 1),
exit: make(chan bool),
}
go s.runMainLoop()
return s
}
func (s *Scheduler) do(plan schedulerPlan, done chan schedulerPlan) {
defer func() {
done <- plan
}()
// Do the actual work
sen := plan.Instance.Sentinel()
var err error
start := time.Now()
notifyURL := ""
switch plan.Reason {
case planReasonBackup:
log.Printf(".. backup of %s/%s instance started", plan.Instance.Project, plan.Instance.Name)
err = s.driver.Backup(plan.Instance.Project, plan.Instance.Name, defaultTags)
if err != nil {
log.Printf(".. failed to backup %s: %s", plan.Instance.Name, err.Error())
return
}
log.Printf(".. backup of %s took %s", plan.Instance.Name, time.Since(start).String())
notifyURL = sen.BackupNotifyURL
case planReasonSync:
log.Printf(".. syncing of %s/%s instance started", plan.Instance.Project, plan.Instance.Name)
err = s.driver.Sync(plan.Instance.Project, plan.Instance.Name, plan.Instance.Name+sen.SyncTargetInstanceSuffix, sen.SyncTargetRemote, sen.SyncTargetPool)
if err != nil {
log.Printf(".. failed to sync %s: %s", plan.Instance.Name, err.Error())
return
}
log.Printf(".. sync of %s took %s", plan.Instance.Name, time.Since(start).String())
notifyURL = sen.SyncNotifyURL
case planReasonBackupVolume:
log.Printf(".. backup of %s/%s/%s volume started", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name)
sen := plan.Volume.Sentinel()
if sen.BackupMode == "dir" {
err = s.driver.BackupVolumeDir(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags)
} else if sen.BackupMode == "native" {
err = s.driver.BackupVolumeNative(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags)
} else {
log.Printf(".. invalid backup mode for volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.BackupMode)
return
}
if err != nil {
log.Printf(".. failed to backup volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error())
return
}
log.Printf(".. backup of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String())
notifyURL = sen.BackupNotifyURL
case planReasonSyncVolume:
log.Printf(".. syncing of %s/%s/%s volume started", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name)
sen := plan.Volume.Sentinel()
targetPool := plan.Volume.Pool
if sen.SyncTargetPool != "" {
targetPool = sen.SyncTargetPool
}
err = s.driver.SyncVolume(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.SyncTargetRemote, targetPool, plan.Volume.Name+sen.SyncTargetVolumeSuffix)
if err != nil {
log.Printf(".. failed to sync volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error())
return
}
log.Printf(".. sync of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String())
notifyURL = sen.SyncNotifyURL
}
if notifyURL != "" && s.notifier != nil {
err = s.notifier.Notify(notifyURL)
if err != nil {
log.Printf(".. failed to notify %s: %s", notifyURL, err.Error())
} else {
log.Printf(".. notification sent to %s", notifyURL)
}
} else if notifyURL != "" && s.notifier == nil {
log.Println("Warning: No notifier configured, skipping notification")
}
}
// Loop that does the heavy-lifting
func (s *Scheduler) runMainLoop() error {
inProgress := false
done := make(chan schedulerPlan)
instancesToProcess := []schedulerPlan{}
queue := make(chan schedulerPlan, queueLength)
queuedInstances := make(map[string]schedulerPlan)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
lastQueueLog := 0
for {
select {
// New instance to plan
case newPlan := <-s.planner:
if len(queue) >= queueLength {
log.Printf("Queue full (%d)), dropping plan for %s of %s", len(queue), newPlan.Reason, newPlan.Instance.Name)
continue
}
// Check if instance is already planned
_, found := queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name]
if !found {
queue <- newPlan
queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] = newPlan
}
// Instance is done, remove it from the tracker
case plan := <-done:
delete(queuedInstances, plan.Reason+":"+plan.Instance.Name)
inProgress = false
// Dump the queue
case <-s.queueDumpRequest:
response := []schedulerPlan{}
copy(response, instancesToProcess)
s.queueDumpResponse <- response
case <-s.exit:
if inProgress {
log.Println("Waiting for the current instance to finish ..")
<-done
}
return nil
// Check the queue and start processing
case <-ticker.C:
if !inProgress && len(queue) > 0 {
inProgress = true
go s.do(<-queue, done)
}
if len(queue) > 0 && lastQueueLog != len(queue) {
log.Printf("Queue length: %d", len(queue))
} else if len(queue) == 0 && lastQueueLog != 0 {
log.Println("Queue empty")
}
lastQueueLog = len(queue)
}
}
}
func (s *Scheduler) Close() {
s.exit <- true
}
func (s *Scheduler) Run() error {
lastQueueDump := ""
log.Println("Starting scheduler ..")
for {
s.refresh()
// Print the list of instances in the queue
s.queueDumpRequest <- true
plans := <-s.queueDumpResponse
if len(plans) > 0 {
output := []string{}
for _, plan := range plans {
output = append(output, plan.Reason+":"+plan.Instance.Name)
}
outputString := strings.Join(output, ", ")
if lastQueueDump != outputString {
log.Println("Queue:", outputString)
lastQueueDump = outputString
}
}
time.Sleep(15 * time.Second)
}
}
// Footprint is something like a hash of current config that is used to determine if the scheduler needs to be refreshed
func (s *Scheduler) footprint(is []incus.Instance, vols []incus.Volume) string {
footprint := ""
for _, inst := range is {
sen := inst.Sentinel()
footprint += inst.Project + inst.Name + sen.BackupSchedule + sen.SyncSchedule
}
for _, vol := range vols {
sen := vol.Sentinel()
footprint += vol.Project + vol.Name + vol.Pool + sen.BackupSchedule + sen.SyncSchedule
}
return footprint
}
// Checks if the scheduler needs to be refreshed and does it if needed
func (s *Scheduler) refresh() error {
instances, err := s.driver.GetInstances("")
if err != nil {
return err
}
vols, err := s.driver.GetVolumes("")
if err != nil {
return err
}
err = s.setupInstanceSchedules(instances, vols)
if err != nil {
return err
}
return nil
}
// Refresh cron like schedulers for all instances
func (s *Scheduler) setupInstanceSchedules(is []incus.Instance, vols []incus.Volume) error {
if s.scheduledFootprint == s.footprint(is, vols) {
return nil
}
log.Println("Refreshing scheduler ..")
if s.cron != nil {
for _, e := range s.cron.Entries() {
s.cron.Remove(e.ID)
}
s.cron.Stop()
}
s.cron = cron.New()
// Instances
for _, inst := range is {
sen := inst.Sentinel()
if sen.Backup {
log.Println(".. adding backup schedule for", inst.Project, inst.Name, sen.BackupSchedule)
_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
log.Printf(".. placing backup of %s/%s into the queue", inst.Project, inst.Name)
s.planner <- schedulerPlan{Instance: inst, Reason: planReasonBackup}
})
if err != nil {
return err
}
}
if sen.Sync {
log.Println(".. adding sync schedule for", inst.Project, inst.Name, sen.SyncSchedule)
_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
log.Printf(".. placing sync of %s/%s into the queue", inst.Project, inst.Name)
s.planner <- schedulerPlan{Instance: inst, Reason: planReasonSync}
})
if err != nil {
return err
}
}
}
// Volumes
for _, vol := range vols {
sen := vol.Sentinel()
if sen.Backup {
log.Println(".. adding backup schedule for", vol.Project, vol.Pool, vol.Name, sen.BackupSchedule)
_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
log.Printf(".. placing volume backup of %s/%s/%s into the queue", vol.Project, vol.Pool, vol.Name)
s.planner <- schedulerPlan{Volume: vol, Reason: planReasonBackupVolume}
})
if err != nil {
return err
}
}
if sen.Sync {
log.Println(".. adding sync schedule for", vol.Project, vol.Pool, vol.Name, sen.SyncSchedule)
_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
log.Printf(".. placing volume sync of %s/%s/%s into the queue", vol.Project, vol.Pool, vol.Name)
s.planner <- schedulerPlan{Volume: vol, Reason: planReasonSyncVolume}
})
if err != nil {
return err
}
}
}
s.scheduledFootprint = s.footprint(is, vols)
s.cron.Start()
return nil
}