176 lines
3.7 KiB
Go
176 lines
3.7 KiB
Go
package nodes
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/puzpuzpuz/xsync/v3"
|
|
)
|
|
|
|
const dumpIntervalSeconds = 120
|
|
|
|
// Base nodes structure that covers all operations above nodes.
|
|
type NodesProcessor struct {
|
|
nodes *xsync.MapOf[string, Node]
|
|
dumpPath string
|
|
garbageCollectAfterSeconds int64
|
|
}
|
|
|
|
func NewNodesProcessor(dumpPath string, garbageCollectAfterSeconds int64) *NodesProcessor {
|
|
p := &NodesProcessor{
|
|
dumpPath: dumpPath,
|
|
garbageCollectAfterSeconds: garbageCollectAfterSeconds,
|
|
nodes: xsync.NewMapOf[string, Node](),
|
|
}
|
|
|
|
err := p.Load()
|
|
if err != nil {
|
|
log.Printf("failed to load nodes: %v\n", err)
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (np *NodesProcessor) DumpLoop() {
|
|
log.Println(".. starting dump loop")
|
|
time.Sleep(time.Second * dumpIntervalSeconds)
|
|
for {
|
|
log.Println(".. dumping nodes")
|
|
err := np.Dump()
|
|
if err != nil {
|
|
log.Printf("failed to dump nodes: %v\n", err)
|
|
}
|
|
|
|
time.Sleep(time.Second * dumpIntervalSeconds)
|
|
}
|
|
}
|
|
|
|
// GarbageCollection removes nodes that haven't been updated for more than garbageCollectAfterSeconds.
|
|
func (np *NodesProcessor) GarbageCollection() {
|
|
for {
|
|
np.garbageCollect()
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
}
|
|
|
|
func (np *NodesProcessor) garbageCollect() {
|
|
np.nodes.Range(func(key string, value Node) bool {
|
|
if time.Now().Unix()-value.LastUpdate > np.garbageCollectAfterSeconds {
|
|
np.nodes.Delete(value.HostName)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// Dumps content of np.nodes to np.dumpPath.
|
|
func (np *NodesProcessor) Dump() error {
|
|
ns := []Node{}
|
|
np.nodes.Range(func(key string, value Node) bool {
|
|
ns = append(ns, value)
|
|
return true
|
|
})
|
|
|
|
body, err := json.Marshal(ns)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal nodes: %w", err)
|
|
}
|
|
|
|
// Create directory if it doesn't exist.
|
|
if strings.Contains(np.dumpPath, "/") {
|
|
path := np.dumpPath[:strings.LastIndex(np.dumpPath, "/")]
|
|
|
|
if len(path) != 0 {
|
|
err = os.MkdirAll(path, 0755)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create directory for nodes.json: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
err = os.WriteFile(np.dumpPath, body, 0644)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write nodes.json: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Loads content of np.dumpPath to np.nodes.
|
|
func (np *NodesProcessor) Load() error {
|
|
filePath := path.Join(np.dumpPath)
|
|
_, err := os.Stat(filePath)
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
|
|
body, err := os.ReadFile(np.dumpPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read nodes.json: %w", err)
|
|
}
|
|
|
|
np.Reset()
|
|
|
|
ns := []Node{}
|
|
|
|
err = json.Unmarshal(body, &ns)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to unmarshal nodes: %w", err)
|
|
}
|
|
|
|
for _, n := range ns {
|
|
np.nodes.Store(n.HostName, n)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (np *NodesProcessor) Reset() {
|
|
np.nodes.Clear()
|
|
}
|
|
|
|
// Returns string with Prometheus metrics
|
|
// TODO: finish this
|
|
func (np *NodesProcessor) GetMetrics() string {
|
|
return ""
|
|
}
|
|
|
|
// List returns all nodes.
|
|
func (np *NodesProcessor) List() Nodes {
|
|
nodes := Nodes{}
|
|
|
|
np.nodes.Range(func(key string, value Node) bool {
|
|
nodes = append(nodes, value)
|
|
return true
|
|
})
|
|
|
|
return nodes
|
|
}
|
|
|
|
// Get returns a node by hostname.
|
|
func (np *NodesProcessor) Get(hostname string) (Node, bool) {
|
|
node, ok := np.nodes.Load(hostname)
|
|
return node, ok
|
|
}
|
|
|
|
// Refresh updates node with hostname and labels. If it doesn't exists it's created.
|
|
func (np *NodesProcessor) Refresh(hostname string, labels Labels, kv KV) {
|
|
node, ok := np.nodes.Load(hostname)
|
|
if !ok {
|
|
node = Node{}
|
|
}
|
|
node.LastUpdate = time.Now().Unix()
|
|
node.HostName = hostname
|
|
node.Labels = labels
|
|
node.KV = kv
|
|
np.nodes.Store(hostname, node)
|
|
}
|
|
|
|
// Drop removes node with given hostname.
|
|
func (np *NodesProcessor) Drop(hostname string) {
|
|
np.nodes.Delete(hostname)
|
|
}
|