Prometheus support, label based filtering fix

This commit is contained in:
Adam Štrauch 2021-09-01 23:18:52 +02:00
parent 2f3698430b
commit 53da96ec10
Signed by: cx
GPG Key ID: 018304FFA8988F8D
6 changed files with 268 additions and 18 deletions

14
README.md Normal file
View File

@ -0,0 +1,14 @@
## TODO
* [X] filtering based on labels
* [X] Output for prometheus
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config
~~This should be implemented as a template in /etc/lobby/templates~~
* [X] labels in directory
/etc/lobby/labels
One file per one label
* [ ] Deregistration
* [ ] Deregister when the daemon exists
* [ ] Separate the NATS code so it can support multiple backend/drivers
* [ ] Documentation
* [ ] Tests

View File

@ -1,6 +1,14 @@
package main
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"github.com/rosti-cz/server_lobby/server"
"github.com/shirou/gopsutil/v3/host"
)
@ -8,12 +16,63 @@ import (
func getIdentification() (server.Discovery, error) {
discovery := server.Discovery{}
info, err := host.Info()
localLabels, err := loadLocalLabels()
if err != nil {
return discovery, err
}
discovery.Hostname = info.Hostname
discovery.Labels = config.Labels
if len(config.HostName) == 0 {
info, err := host.Info()
if err != nil {
return discovery, err
}
discovery.Hostname = info.Hostname
} else {
discovery.Hostname = config.HostName
}
discovery.Labels = append(config.Labels, localLabels...)
return discovery, nil
}
// loadLocalLabels scans local directory where labels are stored and adds them to the labels configured as environment variables.
// Filename in LabelsPath is not importent and each file can contain multiple labels, one per each line.
func loadLocalLabels() ([]string, error) {
labels := []string{}
if _, err := os.Stat(config.LabelsPath); !os.IsNotExist(err) {
files, err := ioutil.ReadDir(config.LabelsPath)
if err != nil {
return labels, err
}
for _, filename := range files {
fullPath := path.Join(config.LabelsPath, filename.Name())
fp, err := os.OpenFile(fullPath, os.O_RDONLY, os.ModePerm)
if err != nil {
return labels, fmt.Errorf("open file error: %v", err)
}
defer fp.Close()
rd := bufio.NewReader(fp)
for {
line, err := rd.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
return labels, fmt.Errorf("read file line error: %v", err)
}
line = strings.TrimSpace(line)
if len(line) > 0 {
labels = append(labels, line)
}
}
}
}
return labels, nil
}

View File

@ -2,6 +2,8 @@ package main
import (
"log"
"net/http"
"strconv"
"time"
"github.com/labstack/echo"
@ -10,10 +12,6 @@ import (
"github.com/rosti-cz/server_lobby/server"
)
const discoveryChannel = "lobby.discovery"
const cleanEvery = 15 // clean discoveredServers every X seconds
const keepAlive = 15 // sends discovery struct every
var discoveryStorage server.Discoveries = server.Discoveries{}
var config Config
@ -26,7 +24,7 @@ func init() {
func cleanDiscoveryPool() {
for {
discoveryStorage.Clean()
time.Sleep(cleanEvery * time.Second)
time.Sleep(time.Duration(config.CleanEvery) * time.Second)
}
}
@ -43,11 +41,11 @@ func sendDisoveryPacket(nc *nats.Conn) {
if err != nil {
log.Printf("sending discovery formating message error: %v\n", err)
}
err = nc.Publish(discoveryChannel, data)
err = nc.Publish(config.NATSDiscoveryChannel, data)
if err != nil {
log.Printf("sending discovery error: %v\n", err)
}
time.Sleep(keepAlive * time.Second)
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
}
}
@ -65,6 +63,8 @@ func main() {
// Closing the logging channel
defer close(discoveryStorage.LogChannel)
discoveryStorage.TTL = config.TTL
// Load config from environment variables
config = *GetConfig()
@ -83,7 +83,7 @@ func main() {
// Subscribe
log.Println("> discovery channel")
_, err = nc.Subscribe(discoveryChannel, discoveryHandler)
_, err = nc.Subscribe(config.NATSDiscoveryChannel, discoveryHandler)
if err != nil {
log.Fatalln(err)
}
@ -97,15 +97,50 @@ func main() {
e := echo.New()
// Middleware
if len(config.Token) > 0 {
e.Use(TokenMiddleware)
}
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routes
e.GET("/", func(c echo.Context) error {
discoveries := discoveryStorage.GetAll()
label := c.QueryParam("label")
var discoveries []server.Discovery
if len(label) > 0 {
discoveries = discoveryStorage.Filter(label)
} else {
discoveries = discoveryStorage.GetAll()
}
return c.JSONPretty(200, discoveries, " ")
})
e.GET("/prometheus", func(c echo.Context) error {
services := preparePrometheusOutput(discoveryStorage.GetAll())
return c.JSONPretty(http.StatusOK, services, " ")
})
// e.GET("/template/:template", func(c echo.Context) error {
// templateName := c.Param("template")
// discoveries := discoveryStorage.GetAll()
// var body bytes.Buffer
// tmpl, err := template.New("main").ParseFiles(path.Join(config.TemplatesPath, templateName))
// if err != nil {
// return c.String(http.StatusInternalServerError, err.Error())
// }
// err = tmpl.Execute(&body, &discoveries)
// if err != nil {
// return c.String(http.StatusInternalServerError, err.Error())
// }
// return c.String(http.StatusOK, body.String())
// })
// Start server
e.Logger.Fatal(e.Start(":1313"))
e.Logger.Fatal(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
}

26
daemon/middlewares.go Normal file
View File

@ -0,0 +1,26 @@
package main
import (
"strings"
"github.com/labstack/echo"
)
func TokenMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
// Skip selected paths
tokenHeader := c.Request().Header.Get("Authorization")
token := strings.Replace(tokenHeader, "Token ", "", -1)
if token != config.Token || config.Token == "" {
return c.JSONPretty(403, map[string]string{"message": "access denied"}, " ")
}
if err := next(c); err != nil {
c.Error(err)
}
return nil
}
}

65
daemon/prometheus.go Normal file
View File

@ -0,0 +1,65 @@
package main
import (
"strconv"
"strings"
"github.com/rosti-cz/server_lobby/server"
)
// [
// {
// "targets": [ "<host>", ... ],
// "labels": {
// "<labelname>": "<labelvalue>", ...
// }
// },
// ...
// ]
// PrometheusServices holds multiple PrometheusService structs
type PrometheusServices []PrometheusService
// PrometheusService represents a single set of targets and labels for Prometheus
type PrometheusService struct {
Targets []string
Labels map[string]string
}
// preparePrometheusOutput returns PrometheusServices which is struct compatible to what Prometheus expects
// labels starting "ne:" will be used as NodeExporter labels. Label "ne:port:9123" will be used as port
// used in the targets field. Same for "ne:host:1.2.3.4".
func preparePrometheusOutput(discoveries []server.Discovery) PrometheusServices {
services := PrometheusServices{}
for _, discovery := range discoveries {
port := strconv.Itoa(int(config.NodeExporterPort))
host := discovery.Hostname
labels := map[string]string{}
for _, label := range discovery.FindLabels("ne") {
trimmed := strings.TrimPrefix(label, "ne:")
parts := strings.SplitN(trimmed, ":", 2)
if len(parts) == 2 {
if parts[0] == "port" {
port = parts[1]
} else if parts[0] == "host" {
host = parts[1]
} else {
labels[parts[0]] = parts[1]
}
}
}
service := PrometheusService{
Targets: []string{host + ":" + port},
Labels: labels,
}
services = append(services, service)
}
return services
}

View File

@ -3,6 +3,7 @@ package server
import (
"encoding/json"
"fmt"
"strings"
"time"
)
@ -20,6 +21,8 @@ type Discovery struct {
// For internal use to check if the server is still alive.
// Contains timestamp of the last check.
LastCheck int64 `json:"last_check"`
TTL uint `json:"-"` // after how many second consider the server to be off, if 0 then 60 secs is used
}
// Validate checks all values in the struct if the content is valid
@ -30,7 +33,10 @@ func (d *Discovery) Validate() error {
// IsAlive return true if the server should be considered as alive
func (d *Discovery) IsAlive() bool {
return time.Now().Unix()-d.LastCheck < TimeToLife
if d.TTL == 0 {
d.TTL = TimeToLife
}
return time.Now().Unix()-d.LastCheck < int64(d.TTL)
}
func (d *Discovery) Bytes() ([]byte, error) {
@ -38,6 +44,17 @@ func (d *Discovery) Bytes() ([]byte, error) {
return data, err
}
// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service"
func (d *Discovery) FindLabels(prefix string) []string {
labels := []string{}
for _, label := range d.Labels {
if strings.HasPrefix(label, prefix+":") {
labels = append(labels, label)
}
}
return labels
}
// -----------------
// Discovery storage
// -----------------
@ -46,12 +63,28 @@ func (d *Discovery) Bytes() ([]byte, error) {
type Discoveries struct {
activeServers []Discovery
LogChannel chan string
TTL uint
}
func (d *Discoveries) hostnameIndex(hostname string) int {
for idx, discovery := range d.activeServers {
if discovery.Hostname == hostname {
return idx
}
}
return -1
}
// Add appends a new discovery/server to the storage
func (d *Discoveries) Add(discovery Discovery) {
if d.Exist(discovery.Hostname) {
d.Refresh(discovery.Hostname)
idx := d.hostnameIndex(discovery.Hostname)
if idx >= 0 {
d.activeServers[idx].Labels = discovery.Labels
}
return
}
@ -64,10 +97,9 @@ func (d *Discoveries) Add(discovery Discovery) {
// Refresh updates
func (d *Discoveries) Refresh(hostname string) {
for idx, discovery := range d.activeServers {
if discovery.Hostname == hostname {
d.activeServers[idx].LastCheck = time.Now().Unix()
}
idx := d.hostnameIndex(hostname)
if idx >= 0 {
d.activeServers[idx].LastCheck = time.Now().Unix()
}
}
@ -111,10 +143,29 @@ func (d *Discoveries) GetAll() []Discovery {
return d.activeServers
}
func (d *Discoveries) Filter(labelFilter string) []Discovery {
newSet := []Discovery{}
if len(labelFilter) > 0 {
for _, discovery := range d.activeServers {
for _, label := range discovery.Labels {
if label == labelFilter {
newSet = append(newSet, discovery)
break
}
}
}
}
return newSet
}
// Clean checks loops over last check values for each discovery object and removes it if it's passed
func (d *Discoveries) Clean() {
newSet := []Discovery{}
for _, server := range d.activeServers {
server.TTL = d.TTL
if server.IsAlive() {
newSet = append(newSet, server)
} else {