incus-sentinel/scheduler/main.go

326 lines
8.6 KiB
Go
Raw Normal View History

2025-01-05 09:34:06 +00:00
package scheduler
import (
"log"
"strings"
"time"
"gitea.ceperka.net/rosti/incus-sentinel/incus"
"github.com/robfig/cron/v3"
)
const queueLength = 100
const planReasonBackup = "backup"
2025-01-06 12:53:16 +00:00
const planReasonBackupVolume = "backup-volume"
2025-01-05 09:34:06 +00:00
const planReasonSync = "sync"
2025-01-06 12:53:16 +00:00
const planReasonSyncVolume = "sync-volume"
var defaultTags = []string{"sentinel"}
2025-01-05 09:34:06 +00:00
type schedulerPlan struct {
Instance incus.Instance
2025-01-06 12:53:16 +00:00
Volume incus.Volume
2025-01-05 09:34:06 +00:00
Reason string // backup or sync
}
// Scheduler is the main struct that handles the scheduling of backups and syncs.
// It keeps internal queue of instances to be processed and it runs cron-like
// schedules for each instance.
type Scheduler struct {
driver Driver
notifier Notifier
planner chan schedulerPlan // Receives instance to be backed up and/or synced
queueDumpRequest chan bool // Request to dump the queue
queueDumpResponse chan []schedulerPlan // Response with the queue
exit chan bool // Exit signal
scheduledFootprint string // Footprint of the scheduler, used to identify if the scheduler needs to be changed
cron *cron.Cron
}
func NewScheduler(d Driver, n Notifier) *Scheduler {
cron.New(cron.WithSeconds())
s := &Scheduler{
driver: d,
notifier: n,
planner: make(chan schedulerPlan, 10),
queueDumpRequest: make(chan bool, 1),
queueDumpResponse: make(chan []schedulerPlan, 1),
exit: make(chan bool),
}
go s.runMainLoop()
return s
}
func (s *Scheduler) do(plan schedulerPlan, done chan schedulerPlan) {
defer func() {
done <- plan
}()
// Do the actual work
sen := plan.Instance.Sentinel()
var err error
2025-01-06 12:53:16 +00:00
start := time.Now()
notifyURL := ""
switch plan.Reason {
case planReasonBackup:
err = s.driver.Backup(plan.Instance.Project, plan.Instance.Name, defaultTags)
2025-01-05 09:34:06 +00:00
if err != nil {
log.Printf("Failed to backup %s: %s", plan.Instance.Name, err.Error())
2025-01-06 12:53:16 +00:00
return
2025-01-05 09:34:06 +00:00
}
log.Printf("Backup of %s took %s", plan.Instance.Name, time.Since(start).String())
2025-01-06 12:53:16 +00:00
notifyURL = sen.BackupNotifyURL
case planReasonSync:
err = s.driver.Sync(plan.Instance.Project, plan.Instance.Name, plan.Instance.Name+sen.SyncTargetInstanceSuffix, sen.SyncTargetRemote, sen.SyncTargetPool)
if err != nil {
log.Printf("Failed to sync %s: %s", plan.Instance.Name, err.Error())
return
2025-01-05 09:34:06 +00:00
}
2025-01-06 12:53:16 +00:00
log.Printf("Sync of %s took %s", plan.Instance.Name, time.Since(start).String())
notifyURL = sen.SyncNotifyURL
case planReasonBackupVolume:
sen := plan.Volume.Sentinel()
if sen.BackupMode == "dir" {
err = s.driver.BackupVolumeDir(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags)
} else if sen.BackupMode == "native" {
err = s.driver.BackupVolumeNative(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags)
} else {
log.Printf("Invalid backup mode for volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.BackupMode)
return
}
2025-01-05 09:34:06 +00:00
if err != nil {
2025-01-06 12:53:16 +00:00
log.Printf("Failed to backup volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error())
return
2025-01-05 09:34:06 +00:00
}
2025-01-06 12:53:16 +00:00
log.Printf("Backup of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String())
notifyURL = sen.BackupNotifyURL
case planReasonSyncVolume:
sen := plan.Volume.Sentinel()
targetPool := plan.Volume.Pool
if sen.SyncTargetPool != "" {
targetPool = sen.SyncTargetPool
}
err = s.driver.SyncVolume(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.SyncTargetRemote, targetPool, plan.Volume.Name+sen.SyncTargetVolumeSuffix)
if err != nil {
log.Printf("Failed to sync volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error())
return
}
log.Printf("Sync of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String())
2025-01-05 09:34:06 +00:00
2025-01-06 12:53:16 +00:00
notifyURL = sen.SyncNotifyURL
}
if notifyURL != "" && s.notifier != nil {
err = s.notifier.Notify(notifyURL)
if err != nil {
log.Printf("Failed to notify %s: %s", notifyURL, err.Error())
2025-01-05 09:34:06 +00:00
}
2025-01-06 12:53:16 +00:00
} else if notifyURL != "" && s.notifier == nil {
log.Println("Warning: No notifier configured, skipping notification")
2025-01-05 09:34:06 +00:00
}
}
// Loop that does the heavy-lifting
func (s *Scheduler) runMainLoop() error {
inProgress := false
done := make(chan schedulerPlan)
instancesToProcess := []schedulerPlan{}
queue := make(chan schedulerPlan, queueLength)
queuedInstances := make(map[string]schedulerPlan)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
// New instance to plan
case newPlan := <-s.planner:
if len(queue) >= queueLength {
log.Printf("Queue full (%d)), dropping plan for %s of %s", len(queue), newPlan.Reason, newPlan.Instance.Name)
continue
}
// Check if instance is already planned
_, found := queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name]
if !found {
queue <- newPlan
queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] = newPlan
}
// Instance is done, remove it from the tracker
case plan := <-done:
delete(queuedInstances, plan.Reason+":"+plan.Instance.Name)
inProgress = false
// Dump the queue
case <-s.queueDumpRequest:
response := []schedulerPlan{}
copy(response, instancesToProcess)
s.queueDumpResponse <- response
case <-s.exit:
if inProgress {
log.Println("Waiting for the current instance to finish ..")
<-done
}
return nil
// Check the queue and start processing
case <-ticker.C:
if !inProgress && len(queue) > 0 {
inProgress = true
go s.do(<-queue, done)
}
}
}
}
func (s *Scheduler) Close() {
s.exit <- true
}
func (s *Scheduler) Run() error {
lastQueueDump := ""
log.Println("Starting scheduler ..")
for {
s.refresh()
// Print the list of instances in the queue
s.queueDumpRequest <- true
plans := <-s.queueDumpResponse
if len(plans) > 0 {
output := []string{}
for _, plan := range plans {
output = append(output, plan.Reason+":"+plan.Instance.Name)
}
outputString := strings.Join(output, ", ")
if lastQueueDump != outputString {
log.Println("Queue:", outputString)
lastQueueDump = outputString
}
}
time.Sleep(15 * time.Second)
}
}
2025-01-06 13:56:01 +00:00
func (s *Scheduler) footprint(is []incus.Instance, vols []incus.Volume) string {
2025-01-05 09:34:06 +00:00
footprint := ""
for _, inst := range is {
sen := inst.Sentinel()
2025-01-06 13:56:01 +00:00
footprint += inst.Project + inst.Name + sen.BackupSchedule + sen.SyncSchedule
}
for _, vol := range vols {
sen := vol.Sentinel()
footprint += vol.Project + vol.Name + vol.Pool + sen.BackupSchedule + sen.SyncSchedule
2025-01-05 09:34:06 +00:00
}
return footprint
}
// Checks if the scheduler needs to be refreshed and does it if needed
func (s *Scheduler) refresh() error {
instances, err := s.driver.GetInstances("")
if err != nil {
return err
}
2025-01-06 13:56:01 +00:00
vols, err := s.driver.GetVolumes("")
if err != nil {
return err
}
err = s.setupInstanceSchedules(instances, vols)
2025-01-05 09:34:06 +00:00
if err != nil {
return err
}
return nil
}
// Refresh cron like schedulers for all instances
2025-01-06 13:56:01 +00:00
func (s *Scheduler) setupInstanceSchedules(is []incus.Instance, vols []incus.Volume) error {
if s.scheduledFootprint == s.footprint(is, vols) {
2025-01-05 09:34:06 +00:00
return nil
}
log.Println("Refreshing scheduler ..")
if s.cron != nil {
s.cron.Stop()
}
s.cron = cron.New()
2025-01-06 13:56:01 +00:00
// Instances
2025-01-05 09:34:06 +00:00
for _, inst := range is {
sen := inst.Sentinel()
if sen.Backup {
2025-01-06 13:56:01 +00:00
log.Println(".. adding backup schedule for", inst.Project, inst.Name, sen.BackupSchedule)
2025-01-05 09:34:06 +00:00
_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
s.planner <- schedulerPlan{Instance: inst, Reason: planReasonBackup}
})
if err != nil {
return err
}
}
if sen.Sync {
2025-01-06 13:56:01 +00:00
log.Println(".. adding sync schedule for", inst.Project, inst.Name, sen.SyncSchedule)
2025-01-05 09:34:06 +00:00
_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
s.planner <- schedulerPlan{Instance: inst, Reason: planReasonSync}
})
if err != nil {
return err
}
}
}
2025-01-06 13:56:01 +00:00
// Volumes
for _, vol := range vols {
sen := vol.Sentinel()
if sen.Backup {
log.Println(".. adding backup schedule for", vol.Project, vol.Pool, vol.Name, sen.BackupSchedule)
_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
s.planner <- schedulerPlan{Volume: vol, Reason: planReasonBackupVolume}
})
if err != nil {
return err
}
}
if sen.Sync {
log.Println(".. adding sync schedule for", vol.Project, vol.Pool, vol.Name, sen.SyncSchedule)
_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
s.planner <- schedulerPlan{Volume: vol, Reason: planReasonSyncVolume}
})
if err != nil {
return err
}
}
}
s.scheduledFootprint = s.footprint(is, vols)
2025-01-05 09:34:06 +00:00
s.cron.Start()
return nil
}