347 lines
		
	
	
	
		
			9.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			347 lines
		
	
	
	
		
			9.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package scheduler
 | 
						|
 | 
						|
import (
 | 
						|
	"log"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"gitea.ceperka.net/rosti/incus-sentinel/incus"
 | 
						|
	"github.com/robfig/cron/v3"
 | 
						|
)
 | 
						|
 | 
						|
const queueLength = 100
 | 
						|
const planReasonBackup = "backup"
 | 
						|
const planReasonBackupVolume = "backup-volume"
 | 
						|
const planReasonSync = "sync"
 | 
						|
const planReasonSyncVolume = "sync-volume"
 | 
						|
 | 
						|
var defaultTags = []string{"sentinel"}
 | 
						|
 | 
						|
type schedulerPlan struct {
 | 
						|
	Instance incus.Instance
 | 
						|
	Volume   incus.Volume
 | 
						|
	Reason   string // backup or sync
 | 
						|
}
 | 
						|
 | 
						|
// Scheduler is the main struct that handles the scheduling of backups and syncs.
 | 
						|
// It keeps internal queue of instances to be processed and it runs cron-like
 | 
						|
// schedules for each instance.
 | 
						|
type Scheduler struct {
 | 
						|
	driver   Driver
 | 
						|
	notifier Notifier
 | 
						|
 | 
						|
	planner           chan schedulerPlan   // Receives instance to be backed up and/or synced
 | 
						|
	queueDumpRequest  chan bool            // Request to dump the queue
 | 
						|
	queueDumpResponse chan []schedulerPlan // Response with the queue
 | 
						|
	exit              chan bool            // Exit signal
 | 
						|
 | 
						|
	scheduledFootprint string // Footprint of the scheduler, used to identify if the scheduler needs to be changed
 | 
						|
	cron               *cron.Cron
 | 
						|
}
 | 
						|
 | 
						|
func NewScheduler(d Driver, n Notifier) *Scheduler {
 | 
						|
	cron.New(cron.WithSeconds())
 | 
						|
 | 
						|
	s := &Scheduler{
 | 
						|
		driver:            d,
 | 
						|
		notifier:          n,
 | 
						|
		planner:           make(chan schedulerPlan, 10),
 | 
						|
		queueDumpRequest:  make(chan bool, 1),
 | 
						|
		queueDumpResponse: make(chan []schedulerPlan, 1),
 | 
						|
		exit:              make(chan bool),
 | 
						|
	}
 | 
						|
 | 
						|
	go s.runMainLoop()
 | 
						|
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scheduler) do(plan schedulerPlan, done chan schedulerPlan) {
 | 
						|
	defer func() {
 | 
						|
		done <- plan
 | 
						|
	}()
 | 
						|
 | 
						|
	// Do the actual work
 | 
						|
	sen := plan.Instance.Sentinel()
 | 
						|
	var err error
 | 
						|
 | 
						|
	start := time.Now()
 | 
						|
 | 
						|
	notifyURL := ""
 | 
						|
 | 
						|
	switch plan.Reason {
 | 
						|
	case planReasonBackup:
 | 
						|
		log.Printf(".. backup of %s/%s instance started", plan.Instance.Project, plan.Instance.Name)
 | 
						|
 | 
						|
		err = s.driver.Backup(plan.Instance.Project, plan.Instance.Name, defaultTags)
 | 
						|
		if err != nil {
 | 
						|
			log.Printf(".. failed to backup %s: %s", plan.Instance.Name, err.Error())
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		log.Printf(".. backup of %s took %s", plan.Instance.Name, time.Since(start).String())
 | 
						|
 | 
						|
		notifyURL = sen.BackupNotifyURL
 | 
						|
	case planReasonSync:
 | 
						|
		log.Printf(".. syncing of %s/%s instance started", plan.Instance.Project, plan.Instance.Name)
 | 
						|
 | 
						|
		err = s.driver.Sync(plan.Instance.Project, plan.Instance.Name, plan.Instance.Name+sen.SyncTargetInstanceSuffix, sen.SyncTargetRemote, sen.SyncTargetPool)
 | 
						|
		if err != nil {
 | 
						|
			log.Printf(".. failed to sync %s: %s", plan.Instance.Name, err.Error())
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		log.Printf(".. sync of %s took %s", plan.Instance.Name, time.Since(start).String())
 | 
						|
 | 
						|
		notifyURL = sen.SyncNotifyURL
 | 
						|
	case planReasonBackupVolume:
 | 
						|
		log.Printf(".. backup of %s/%s/%s volume started", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name)
 | 
						|
 | 
						|
		sen := plan.Volume.Sentinel()
 | 
						|
		if sen.BackupMode == "dir" {
 | 
						|
			err = s.driver.BackupVolumeDir(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags)
 | 
						|
		} else if sen.BackupMode == "native" {
 | 
						|
			err = s.driver.BackupVolumeNative(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, defaultTags)
 | 
						|
		} else {
 | 
						|
			log.Printf(".. invalid backup mode for volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.BackupMode)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			log.Printf(".. failed to backup volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error())
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		log.Printf(".. backup of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String())
 | 
						|
 | 
						|
		notifyURL = sen.BackupNotifyURL
 | 
						|
	case planReasonSyncVolume:
 | 
						|
		log.Printf(".. syncing of %s/%s/%s volume started", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name)
 | 
						|
 | 
						|
		sen := plan.Volume.Sentinel()
 | 
						|
		targetPool := plan.Volume.Pool
 | 
						|
		if sen.SyncTargetPool != "" {
 | 
						|
			targetPool = sen.SyncTargetPool
 | 
						|
		}
 | 
						|
 | 
						|
		err = s.driver.SyncVolume(plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, sen.SyncTargetRemote, targetPool, plan.Volume.Name+sen.SyncTargetVolumeSuffix)
 | 
						|
		if err != nil {
 | 
						|
			log.Printf(".. failed to sync volume %s/%s/%s: %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, err.Error())
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		log.Printf(".. sync of volume %s/%s/%s took %s", plan.Volume.Project, plan.Volume.Pool, plan.Volume.Name, time.Since(start).String())
 | 
						|
 | 
						|
		notifyURL = sen.SyncNotifyURL
 | 
						|
	}
 | 
						|
 | 
						|
	if notifyURL != "" && s.notifier != nil {
 | 
						|
		err = s.notifier.Notify(notifyURL)
 | 
						|
		if err != nil {
 | 
						|
			log.Printf(".. failed to notify %s: %s", notifyURL, err.Error())
 | 
						|
		}
 | 
						|
	} else if notifyURL != "" && s.notifier == nil {
 | 
						|
		log.Println("Warning: No notifier configured, skipping notification")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Loop that does the heavy-lifting
 | 
						|
func (s *Scheduler) runMainLoop() error {
 | 
						|
	inProgress := false
 | 
						|
	done := make(chan schedulerPlan)
 | 
						|
	instancesToProcess := []schedulerPlan{}
 | 
						|
 | 
						|
	queue := make(chan schedulerPlan, queueLength)
 | 
						|
	queuedInstances := make(map[string]schedulerPlan)
 | 
						|
 | 
						|
	ticker := time.NewTicker(2 * time.Second)
 | 
						|
	defer ticker.Stop()
 | 
						|
 | 
						|
	lastQueueLog := 0
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		// New instance to plan
 | 
						|
		case newPlan := <-s.planner:
 | 
						|
			if len(queue) >= queueLength {
 | 
						|
				log.Printf("Queue full (%d)), dropping plan for %s of %s", len(queue), newPlan.Reason, newPlan.Instance.Name)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// Check if instance is already planned
 | 
						|
			_, found := queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name]
 | 
						|
 | 
						|
			if !found {
 | 
						|
				queue <- newPlan
 | 
						|
				queuedInstances[newPlan.Reason+":"+newPlan.Instance.Name] = newPlan
 | 
						|
			}
 | 
						|
		// Instance is done, remove it from the tracker
 | 
						|
		case plan := <-done:
 | 
						|
			delete(queuedInstances, plan.Reason+":"+plan.Instance.Name)
 | 
						|
			inProgress = false
 | 
						|
		// Dump the queue
 | 
						|
		case <-s.queueDumpRequest:
 | 
						|
			response := []schedulerPlan{}
 | 
						|
			copy(response, instancesToProcess)
 | 
						|
			s.queueDumpResponse <- response
 | 
						|
		case <-s.exit:
 | 
						|
			if inProgress {
 | 
						|
				log.Println("Waiting for the current instance to finish ..")
 | 
						|
				<-done
 | 
						|
			}
 | 
						|
			return nil
 | 
						|
		// Check the queue and start processing
 | 
						|
		case <-ticker.C:
 | 
						|
			if !inProgress && len(queue) > 0 {
 | 
						|
				inProgress = true
 | 
						|
				go s.do(<-queue, done)
 | 
						|
			}
 | 
						|
 | 
						|
			if len(queue) > 0 && lastQueueLog != len(queue) {
 | 
						|
				log.Printf("Queue length: %d", len(queue))
 | 
						|
			} else if len(queue) == 0 && lastQueueLog != 0 {
 | 
						|
				log.Println("Queue empty")
 | 
						|
			}
 | 
						|
 | 
						|
			lastQueueLog = len(queue)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scheduler) Close() {
 | 
						|
	s.exit <- true
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scheduler) Run() error {
 | 
						|
	lastQueueDump := ""
 | 
						|
 | 
						|
	log.Println("Starting scheduler ..")
 | 
						|
	for {
 | 
						|
		s.refresh()
 | 
						|
 | 
						|
		// Print the list of instances in the queue
 | 
						|
		s.queueDumpRequest <- true
 | 
						|
		plans := <-s.queueDumpResponse
 | 
						|
		if len(plans) > 0 {
 | 
						|
			output := []string{}
 | 
						|
			for _, plan := range plans {
 | 
						|
				output = append(output, plan.Reason+":"+plan.Instance.Name)
 | 
						|
			}
 | 
						|
 | 
						|
			outputString := strings.Join(output, ", ")
 | 
						|
			if lastQueueDump != outputString {
 | 
						|
				log.Println("Queue:", outputString)
 | 
						|
				lastQueueDump = outputString
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		time.Sleep(15 * time.Second)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *Scheduler) footprint(is []incus.Instance, vols []incus.Volume) string {
 | 
						|
	footprint := ""
 | 
						|
	for _, inst := range is {
 | 
						|
		sen := inst.Sentinel()
 | 
						|
		footprint += inst.Project + inst.Name + sen.BackupSchedule + sen.SyncSchedule
 | 
						|
	}
 | 
						|
 | 
						|
	for _, vol := range vols {
 | 
						|
		sen := vol.Sentinel()
 | 
						|
		footprint += vol.Project + vol.Name + vol.Pool + sen.BackupSchedule + sen.SyncSchedule
 | 
						|
	}
 | 
						|
 | 
						|
	return footprint
 | 
						|
}
 | 
						|
 | 
						|
// Checks if the scheduler needs to be refreshed and does it if needed
 | 
						|
func (s *Scheduler) refresh() error {
 | 
						|
	instances, err := s.driver.GetInstances("")
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	vols, err := s.driver.GetVolumes("")
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err = s.setupInstanceSchedules(instances, vols)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Refresh cron like schedulers for all instances
 | 
						|
func (s *Scheduler) setupInstanceSchedules(is []incus.Instance, vols []incus.Volume) error {
 | 
						|
	if s.scheduledFootprint == s.footprint(is, vols) {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	log.Println("Refreshing scheduler ..")
 | 
						|
 | 
						|
	if s.cron != nil {
 | 
						|
		s.cron.Stop()
 | 
						|
	}
 | 
						|
 | 
						|
	s.cron = cron.New()
 | 
						|
 | 
						|
	// Instances
 | 
						|
	for _, inst := range is {
 | 
						|
		sen := inst.Sentinel()
 | 
						|
		if sen.Backup {
 | 
						|
			log.Println(".. adding backup schedule for", inst.Project, inst.Name, sen.BackupSchedule)
 | 
						|
			_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
 | 
						|
				log.Printf(".. placing backup of %s/%s into the queue", inst.Project, inst.Name)
 | 
						|
				s.planner <- schedulerPlan{Instance: inst, Reason: planReasonBackup}
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if sen.Sync {
 | 
						|
			log.Println(".. adding sync schedule for", inst.Project, inst.Name, sen.SyncSchedule)
 | 
						|
			_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
 | 
						|
				log.Printf(".. placing sync of %s/%s into the queue", inst.Project, inst.Name)
 | 
						|
				s.planner <- schedulerPlan{Instance: inst, Reason: planReasonSync}
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Volumes
 | 
						|
	for _, vol := range vols {
 | 
						|
		sen := vol.Sentinel()
 | 
						|
		if sen.Backup {
 | 
						|
			log.Println(".. adding backup schedule for", vol.Project, vol.Pool, vol.Name, sen.BackupSchedule)
 | 
						|
			_, err := s.cron.AddFunc(sen.BackupSchedule, func() {
 | 
						|
				log.Printf(".. placing volume backup of %s%s%s into the queue", vol.Project, vol.Pool, vol.Name)
 | 
						|
				s.planner <- schedulerPlan{Volume: vol, Reason: planReasonBackupVolume}
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if sen.Sync {
 | 
						|
			log.Println(".. adding sync schedule for", vol.Project, vol.Pool, vol.Name, sen.SyncSchedule)
 | 
						|
			_, err := s.cron.AddFunc(sen.SyncSchedule, func() {
 | 
						|
				log.Printf(".. placing volume sync of %s%s%s into the queue", vol.Project, vol.Pool, vol.Name)
 | 
						|
				s.planner <- schedulerPlan{Volume: vol, Reason: planReasonSyncVolume}
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	s.scheduledFootprint = s.footprint(is, vols)
 | 
						|
	s.cron.Start()
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |