diff --git a/README.md b/README.md new file mode 100644 index 0000000..603307d --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +## TODO + +* [X] filtering based on labels +* [X] Output for prometheus + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config + ~~This should be implemented as a template in /etc/lobby/templates~~ +* [X] labels in directory + /etc/lobby/labels + One file per one label +* [ ] Deregistration +* [ ] Deregister when the daemon exists +* [ ] Separate the NATS code so it can support multiple backend/drivers +* [ ] Documentation +* [ ] Tests diff --git a/daemon/identification.go b/daemon/identification.go index a2860aa..59f990a 100644 --- a/daemon/identification.go +++ b/daemon/identification.go @@ -1,6 +1,14 @@ package main import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strings" + "github.com/rosti-cz/server_lobby/server" "github.com/shirou/gopsutil/v3/host" ) @@ -8,12 +16,63 @@ import ( func getIdentification() (server.Discovery, error) { discovery := server.Discovery{} - info, err := host.Info() + localLabels, err := loadLocalLabels() if err != nil { return discovery, err } - discovery.Hostname = info.Hostname - discovery.Labels = config.Labels + + if len(config.HostName) == 0 { + info, err := host.Info() + if err != nil { + return discovery, err + } + discovery.Hostname = info.Hostname + } else { + discovery.Hostname = config.HostName + } + + discovery.Labels = append(config.Labels, localLabels...) return discovery, nil } + +// loadLocalLabels scans local directory where labels are stored and adds them to the labels configured as environment variables. +// Filename in LabelsPath is not importent and each file can contain multiple labels, one per each line. +func loadLocalLabels() ([]string, error) { + labels := []string{} + + if _, err := os.Stat(config.LabelsPath); !os.IsNotExist(err) { + files, err := ioutil.ReadDir(config.LabelsPath) + if err != nil { + return labels, err + } + + for _, filename := range files { + fullPath := path.Join(config.LabelsPath, filename.Name()) + fp, err := os.OpenFile(fullPath, os.O_RDONLY, os.ModePerm) + if err != nil { + return labels, fmt.Errorf("open file error: %v", err) + + } + defer fp.Close() + + rd := bufio.NewReader(fp) + for { + line, err := rd.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + + return labels, fmt.Errorf("read file line error: %v", err) + } + line = strings.TrimSpace(line) + if len(line) > 0 { + labels = append(labels, line) + } + } + } + } + + return labels, nil +} diff --git a/daemon/main.go b/daemon/main.go index 0382022..8248665 100644 --- a/daemon/main.go +++ b/daemon/main.go @@ -2,6 +2,8 @@ package main import ( "log" + "net/http" + "strconv" "time" "github.com/labstack/echo" @@ -10,10 +12,6 @@ import ( "github.com/rosti-cz/server_lobby/server" ) -const discoveryChannel = "lobby.discovery" -const cleanEvery = 15 // clean discoveredServers every X seconds -const keepAlive = 15 // sends discovery struct every - var discoveryStorage server.Discoveries = server.Discoveries{} var config Config @@ -26,7 +24,7 @@ func init() { func cleanDiscoveryPool() { for { discoveryStorage.Clean() - time.Sleep(cleanEvery * time.Second) + time.Sleep(time.Duration(config.CleanEvery) * time.Second) } } @@ -43,11 +41,11 @@ func sendDisoveryPacket(nc *nats.Conn) { if err != nil { log.Printf("sending discovery formating message error: %v\n", err) } - err = nc.Publish(discoveryChannel, data) + err = nc.Publish(config.NATSDiscoveryChannel, data) if err != nil { log.Printf("sending discovery error: %v\n", err) } - time.Sleep(keepAlive * time.Second) + time.Sleep(time.Duration(config.KeepAlive) * time.Second) } } @@ -65,6 +63,8 @@ func main() { // Closing the logging channel defer close(discoveryStorage.LogChannel) + discoveryStorage.TTL = config.TTL + // Load config from environment variables config = *GetConfig() @@ -83,7 +83,7 @@ func main() { // Subscribe log.Println("> discovery channel") - _, err = nc.Subscribe(discoveryChannel, discoveryHandler) + _, err = nc.Subscribe(config.NATSDiscoveryChannel, discoveryHandler) if err != nil { log.Fatalln(err) } @@ -97,15 +97,50 @@ func main() { e := echo.New() // Middleware + if len(config.Token) > 0 { + e.Use(TokenMiddleware) + } e.Use(middleware.Logger()) e.Use(middleware.Recover()) // Routes e.GET("/", func(c echo.Context) error { - discoveries := discoveryStorage.GetAll() + label := c.QueryParam("label") + + var discoveries []server.Discovery + + if len(label) > 0 { + discoveries = discoveryStorage.Filter(label) + } else { + discoveries = discoveryStorage.GetAll() + } + return c.JSONPretty(200, discoveries, " ") }) + e.GET("/prometheus", func(c echo.Context) error { + services := preparePrometheusOutput(discoveryStorage.GetAll()) + + return c.JSONPretty(http.StatusOK, services, " ") + }) + + // e.GET("/template/:template", func(c echo.Context) error { + // templateName := c.Param("template") + // discoveries := discoveryStorage.GetAll() + // var body bytes.Buffer + + // tmpl, err := template.New("main").ParseFiles(path.Join(config.TemplatesPath, templateName)) + // if err != nil { + // return c.String(http.StatusInternalServerError, err.Error()) + // } + // err = tmpl.Execute(&body, &discoveries) + // if err != nil { + // return c.String(http.StatusInternalServerError, err.Error()) + // } + + // return c.String(http.StatusOK, body.String()) + // }) + // Start server - e.Logger.Fatal(e.Start(":1313")) + e.Logger.Fatal(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port)))) } diff --git a/daemon/middlewares.go b/daemon/middlewares.go new file mode 100644 index 0000000..f7daa6b --- /dev/null +++ b/daemon/middlewares.go @@ -0,0 +1,26 @@ +package main + +import ( + "strings" + + "github.com/labstack/echo" +) + +func TokenMiddleware(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + // Skip selected paths + + tokenHeader := c.Request().Header.Get("Authorization") + token := strings.Replace(tokenHeader, "Token ", "", -1) + + if token != config.Token || config.Token == "" { + return c.JSONPretty(403, map[string]string{"message": "access denied"}, " ") + } + + if err := next(c); err != nil { + c.Error(err) + } + + return nil + } +} diff --git a/daemon/prometheus.go b/daemon/prometheus.go new file mode 100644 index 0000000..f9c7ff1 --- /dev/null +++ b/daemon/prometheus.go @@ -0,0 +1,65 @@ +package main + +import ( + "strconv" + "strings" + + "github.com/rosti-cz/server_lobby/server" +) + +// [ +// { +// "targets": [ "", ... ], +// "labels": { +// "": "", ... +// } +// }, +// ... +// ] + +// PrometheusServices holds multiple PrometheusService structs +type PrometheusServices []PrometheusService + +// PrometheusService represents a single set of targets and labels for Prometheus +type PrometheusService struct { + Targets []string + Labels map[string]string +} + +// preparePrometheusOutput returns PrometheusServices which is struct compatible to what Prometheus expects +// labels starting "ne:" will be used as NodeExporter labels. Label "ne:port:9123" will be used as port +// used in the targets field. Same for "ne:host:1.2.3.4". +func preparePrometheusOutput(discoveries []server.Discovery) PrometheusServices { + services := PrometheusServices{} + + for _, discovery := range discoveries { + port := strconv.Itoa(int(config.NodeExporterPort)) + host := discovery.Hostname + + labels := map[string]string{} + + for _, label := range discovery.FindLabels("ne") { + trimmed := strings.TrimPrefix(label, "ne:") + parts := strings.SplitN(trimmed, ":", 2) + if len(parts) == 2 { + if parts[0] == "port" { + port = parts[1] + } else if parts[0] == "host" { + host = parts[1] + } else { + labels[parts[0]] = parts[1] + } + } + } + + service := PrometheusService{ + Targets: []string{host + ":" + port}, + Labels: labels, + } + + services = append(services, service) + + } + + return services +} diff --git a/server/main.go b/server/main.go index 330c269..1b82ed4 100644 --- a/server/main.go +++ b/server/main.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "fmt" + "strings" "time" ) @@ -20,6 +21,8 @@ type Discovery struct { // For internal use to check if the server is still alive. // Contains timestamp of the last check. LastCheck int64 `json:"last_check"` + + TTL uint `json:"-"` // after how many second consider the server to be off, if 0 then 60 secs is used } // Validate checks all values in the struct if the content is valid @@ -30,7 +33,10 @@ func (d *Discovery) Validate() error { // IsAlive return true if the server should be considered as alive func (d *Discovery) IsAlive() bool { - return time.Now().Unix()-d.LastCheck < TimeToLife + if d.TTL == 0 { + d.TTL = TimeToLife + } + return time.Now().Unix()-d.LastCheck < int64(d.TTL) } func (d *Discovery) Bytes() ([]byte, error) { @@ -38,6 +44,17 @@ func (d *Discovery) Bytes() ([]byte, error) { return data, err } +// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service" +func (d *Discovery) FindLabels(prefix string) []string { + labels := []string{} + for _, label := range d.Labels { + if strings.HasPrefix(label, prefix+":") { + labels = append(labels, label) + } + } + return labels +} + // ----------------- // Discovery storage // ----------------- @@ -46,12 +63,28 @@ func (d *Discovery) Bytes() ([]byte, error) { type Discoveries struct { activeServers []Discovery LogChannel chan string + TTL uint +} + +func (d *Discoveries) hostnameIndex(hostname string) int { + for idx, discovery := range d.activeServers { + if discovery.Hostname == hostname { + return idx + } + } + return -1 } // Add appends a new discovery/server to the storage func (d *Discoveries) Add(discovery Discovery) { if d.Exist(discovery.Hostname) { d.Refresh(discovery.Hostname) + + idx := d.hostnameIndex(discovery.Hostname) + if idx >= 0 { + d.activeServers[idx].Labels = discovery.Labels + } + return } @@ -64,10 +97,9 @@ func (d *Discoveries) Add(discovery Discovery) { // Refresh updates func (d *Discoveries) Refresh(hostname string) { - for idx, discovery := range d.activeServers { - if discovery.Hostname == hostname { - d.activeServers[idx].LastCheck = time.Now().Unix() - } + idx := d.hostnameIndex(hostname) + if idx >= 0 { + d.activeServers[idx].LastCheck = time.Now().Unix() } } @@ -111,10 +143,29 @@ func (d *Discoveries) GetAll() []Discovery { return d.activeServers } +func (d *Discoveries) Filter(labelFilter string) []Discovery { + newSet := []Discovery{} + + if len(labelFilter) > 0 { + for _, discovery := range d.activeServers { + for _, label := range discovery.Labels { + if label == labelFilter { + newSet = append(newSet, discovery) + break + } + } + } + + } + + return newSet +} + // Clean checks loops over last check values for each discovery object and removes it if it's passed func (d *Discoveries) Clean() { newSet := []Discovery{} for _, server := range d.activeServers { + server.TTL = d.TTL if server.IsAlive() { newSet = append(newSet, server) } else {