package nodes import ( "encoding/json" "fmt" "log" "os" "path" "strings" "time" "github.com/puzpuzpuz/xsync/v3" ) const dumpIntervalSeconds = 120 // Base nodes structure that covers all operations above nodes. type NodesProcessor struct { nodes *xsync.MapOf[string, Node] dumpPath string garbageCollectAfterSeconds int64 } func NewNodesProcessor(dumpPath string, garbageCollectAfterSeconds int64) *NodesProcessor { p := &NodesProcessor{ dumpPath: dumpPath, garbageCollectAfterSeconds: garbageCollectAfterSeconds, nodes: xsync.NewMapOf[string, Node](), } err := p.Load() if err != nil { log.Printf("failed to load nodes: %v\n", err) } return p } func (np *NodesProcessor) DumpLoop() { log.Println(".. starting dump loop") time.Sleep(time.Second * dumpIntervalSeconds) for { log.Println(".. dumping nodes") err := np.Dump() if err != nil { log.Printf("failed to dump nodes: %v\n", err) } time.Sleep(time.Second * dumpIntervalSeconds) } } // GarbageCollection removes nodes that haven't been updated for more than garbageCollectAfterSeconds. func (np *NodesProcessor) GarbageCollection() { for { np.garbageCollect() time.Sleep(time.Second * 10) } } func (np *NodesProcessor) garbageCollect() { np.nodes.Range(func(key string, value Node) bool { if time.Now().Unix()-value.LastUpdate > np.garbageCollectAfterSeconds { np.nodes.Delete(value.HostName) } return true }) } // Dumps content of np.nodes to np.dumpPath. func (np *NodesProcessor) Dump() error { ns := []Node{} np.nodes.Range(func(key string, value Node) bool { ns = append(ns, value) return true }) body, err := json.MarshalIndent(ns, "", " ") if err != nil { return fmt.Errorf("failed to marshal nodes: %w", err) } // Create directory if it doesn't exist. if strings.Contains(np.dumpPath, "/") { path := np.dumpPath[:strings.LastIndex(np.dumpPath, "/")] if len(path) != 0 { err = os.MkdirAll(path, 0755) if err != nil { return fmt.Errorf("failed to create directory for nodes.json: %w", err) } } } err = os.WriteFile(np.dumpPath, body, 0644) if err != nil { return fmt.Errorf("failed to write nodes.json: %w", err) } return nil } // Loads content of np.dumpPath to np.nodes. func (np *NodesProcessor) Load() error { filePath := path.Join(np.dumpPath) _, err := os.Stat(filePath) if os.IsNotExist(err) { return nil } body, err := os.ReadFile(np.dumpPath) if err != nil { return fmt.Errorf("failed to read nodes.json: %w", err) } np.Reset() ns := []Node{} err = json.Unmarshal(body, &ns) if err != nil { return fmt.Errorf("failed to unmarshal nodes: %w", err) } for _, n := range ns { np.nodes.Store(n.HostName, n) } return nil } func (np *NodesProcessor) Reset() { np.nodes.Clear() } // Returns string with Prometheus metrics // TODO: finish this func (np *NodesProcessor) GetMetrics() string { return "" } // List returns all nodes. func (np *NodesProcessor) List() Nodes { nodes := Nodes{} np.nodes.Range(func(key string, value Node) bool { nodes = append(nodes, value) return true }) return nodes } // Get returns a node by hostname. func (np *NodesProcessor) Get(hostname string) (Node, bool) { node, ok := np.nodes.Load(hostname) return node, ok } // Refresh updates node with hostname and labels. If it doesn't exists it's created. func (np *NodesProcessor) Refresh(hostname string, labels Labels, kv KV) { node, ok := np.nodes.Load(hostname) if !ok { node = Node{} } node.LastUpdate = time.Now().Unix() node.HostName = hostname node.Labels = labels node.KV = kv np.nodes.Store(hostname, node) } // Drop removes node with given hostname. func (np *NodesProcessor) Drop(hostname string) { np.nodes.Delete(hostname) }