Compare commits
20 Commits
Author | SHA1 | Date |
---|---|---|
Adam Štrauch | c2ed658aaa | |
Adam Štrauch | 038741b87e | |
Adam Štrauch | 157e795792 | |
Adam Štrauch | 55b3376d4c | |
Adam Štrauch | 0e836b68a8 | |
Adam Štrauch | 322ef2a258 | |
Adam Štrauch | 6a1ecb80a1 | |
Adam Štrauch | 3a7662ef0d | |
Adam Štrauch | 4c0ca7ed6a | |
Adam Štrauch | a4a97ec49d | |
Adam Štrauch | 66683c9496 | |
Adam Štrauch | 24923a305b | |
Adam Štrauch | ab8068e26d | |
Adam Štrauch | 202a60f091 | |
Adam Štrauch | a9d7abbdd5 | |
Adam Štrauch | 709c47af3e | |
Adam Štrauch | e06a5cc94b | |
Adam Štrauch | 6769b903bc | |
Adam Štrauch | 8493c7cb40 | |
Adam Štrauch | a0abda5196 |
|
@ -0,0 +1,10 @@
|
|||
kind: pipeline
|
||||
type: docker
|
||||
name: testing
|
||||
|
||||
steps:
|
||||
- name: test
|
||||
image: golang
|
||||
commands:
|
||||
- go mod tidy
|
||||
- make test
|
|
@ -1,3 +1,5 @@
|
|||
.history/
|
||||
|
||||
# Files with secrets
|
||||
*secret*
|
||||
tmp/
|
||||
|
|
69
README.md
69
README.md
|
@ -52,9 +52,9 @@ Support for NATS is only less than 150 lines.
|
|||
The quickest way how to run lobby on your server is this:
|
||||
|
||||
```shell
|
||||
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.0/lobbyd-1.0-linux-amd64
|
||||
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.2/lobbyd-1.2-linux-amd64
|
||||
chmod +x /usr/local/bin/lobbyd
|
||||
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.0/lobbyctl-1.0-linux-amd64
|
||||
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.2/lobbyctl-1.2-linux-amd64
|
||||
chmod +x /usr/local/bin/lobbyctl
|
||||
|
||||
# Update NATS_URL and LABELS here
|
||||
|
@ -89,31 +89,42 @@ To test if local instance is running call this:
|
|||
|
||||
There are other config directives you can use to fine-tune lobbyd to exactly what you need.
|
||||
|
||||
| Environment variable | Type | Default | Required | Note |
|
||||
| ----------------------- | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
|
||||
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
|
||||
| PORT | int | 1313 | no | Port related to the address above |
|
||||
| DISABLE_API | bool | false | no | If true API interface won't start |
|
||||
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
|
||||
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
|
||||
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
|
||||
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
|
||||
| REDIS_PORT | uint16 | 6379 | no | Redis port |
|
||||
| REDIS_DB | string | 0 | no | Redis DB |
|
||||
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
|
||||
| REDIS_PASSWORD | string | | no | Redis password |
|
||||
| LABELS | string | | no | List of labels, labels should be separated by comma |
|
||||
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
|
||||
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
|
||||
| HOSTNAME | string | | no | Override local machine's hostname |
|
||||
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
|
||||
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
|
||||
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
|
||||
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
|
||||
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
|
||||
| Environment variable | Type | Default | Required | Note |
|
||||
| ------------------------ | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
|
||||
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
|
||||
| PORT | int | 1313 | no | Port related to the address above |
|
||||
| DISABLE_API | bool | false | no | If true API interface won't start |
|
||||
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
|
||||
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
|
||||
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
|
||||
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
|
||||
| REDIS_PORT | uint16 | 6379 | no | Redis port |
|
||||
| REDIS_DB | string | 0 | no | Redis DB |
|
||||
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
|
||||
| REDIS_PASSWORD | string | | no | Redis password |
|
||||
| LABELS | string | | no | List of labels, labels should be separated by comma |
|
||||
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
|
||||
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
|
||||
| HOSTNAME | string | | no | Override local machine's hostname |
|
||||
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
|
||||
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
|
||||
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
|
||||
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
|
||||
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
|
||||
| CALLBACK | string | | no | Path to a script that runs when the the discovery packet records are changed. Not running for first |
|
||||
| CALLBACK_COOLDOWN | int | 15 | no | Cooldown prevents the call back script to run sooner than configured amount of seconds after last run is finished. |
|
||||
| CALLBACK_FIRST_RUN_DELAY | int | 30 | no | Wait for this amount of seconds before callback is run for first time after fresh start of the daemon |
|
||||
|
||||
|
||||
### Callback script
|
||||
|
||||
When your application cannot support Lobbyd's API it can be configured via callback script that runs everytime something has changed in the network. Callback script is run every 15 seconds (configured by CALLBACK_COOLDOWN) but only when something has changed.
|
||||
|
||||
The script runs under the same user as lobbyd. When lobbyd starts first thirty seconds (CALLBACK_FIRST_RUN_DELAY) is ignored and then the script is run for first time. After these thirty seconds everything runs in loop based on the changes in the network.
|
||||
|
||||
All current discovery packets are passed to the callback script via standard input. It's basically the same input you get if you run `lobbyctl discoveries`.
|
||||
|
||||
### Service discovery for Prometheus
|
||||
|
||||
Lobbyd has an API endpoint that returns list of targets for [Prometheus's HTTP SD config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config). That
|
||||
|
@ -126,6 +137,8 @@ Let's check this:
|
|||
|
||||
If you set port to *-* lobby daemon omits port entirely from the output.
|
||||
|
||||
There can be multiple `host` labels but only one `port` label and all prometheus labels (last line) will be common for all hosts labels. If port is omitted then default 9100 is used or port can also be part of the host label.
|
||||
|
||||
When you open URL http://localhost:1313/v1/prometheus/nodeexporter it returns this:
|
||||
|
||||
```json
|
||||
|
@ -220,11 +233,11 @@ too.
|
|||
## TODO
|
||||
|
||||
* [X] Tests
|
||||
* [ ] Command hooks - script or list of scripts that are triggered when discovery status has changed
|
||||
* [X] Command hooks - script or list of scripts that are triggered when discovery status has changed
|
||||
* [ ] Support for multiple active backend drivers
|
||||
* [ ] Redis driver
|
||||
* [X] Redis driver
|
||||
* [X] Remove the 5 secs waiting when daemon is stopped
|
||||
* [X] API to allow add labels at runtime
|
||||
|
||||
* [ ] Check what happens when driver is disconnected
|
||||
|
||||
|
||||
|
|
|
@ -114,6 +114,31 @@ func (l *LobbyClient) GetDiscoveries() ([]server.Discovery, error) {
|
|||
return discoveries, nil
|
||||
}
|
||||
|
||||
// Resolve returns list of hostnames that have given label
|
||||
func (l *LobbyClient) Resolve(label server.Label) ([]string, error) {
|
||||
l.init()
|
||||
|
||||
path := fmt.Sprintf("/v1/resolve?label=%s", label.String())
|
||||
method := "GET"
|
||||
|
||||
var hostnames []string
|
||||
|
||||
status, body, err := l.call(method, path, "")
|
||||
if err != nil {
|
||||
return hostnames, err
|
||||
}
|
||||
if status != 200 {
|
||||
return hostnames, fmt.Errorf("non-200 response: %s", body)
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(body), &hostnames)
|
||||
if err != nil {
|
||||
return hostnames, fmt.Errorf("response parsing error: %v", err)
|
||||
}
|
||||
|
||||
return hostnames, nil
|
||||
}
|
||||
|
||||
// Find discoveries by their labels
|
||||
func (l *LobbyClient) FindByLabels(labels server.Labels) ([]server.Discovery, error) {
|
||||
l.init()
|
||||
|
|
20
ctl/main.go
20
ctl/main.go
|
@ -14,6 +14,7 @@ func Usage() {
|
|||
flag.Usage()
|
||||
fmt.Println("")
|
||||
fmt.Println("Commands:")
|
||||
fmt.Println(" resolve label returns list of hostnames with given label")
|
||||
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
|
||||
fmt.Println(" discoveries returns list of all registered discovery packets")
|
||||
fmt.Println(" discoveries labels [LABEL] ... returns list of all registered discovery packets with given labels (OR)")
|
||||
|
@ -68,6 +69,25 @@ func main() {
|
|||
}
|
||||
|
||||
switch flag.Args()[0] {
|
||||
case "resolve":
|
||||
if len(flag.Args()) == 2 {
|
||||
hostnames, err := client.Resolve(server.Label(flag.Arg(1)))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if *jsonOutput {
|
||||
printJSON(hostnames)
|
||||
} else {
|
||||
for _, hostname := range hostnames {
|
||||
fmt.Println(hostname)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Usage()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
case "discoveries":
|
||||
var discoveries []server.Discovery
|
||||
var err error
|
||||
|
|
|
@ -30,6 +30,9 @@ type Config struct {
|
|||
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
|
||||
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
|
||||
Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly)
|
||||
Callback string `envconfig:"CALLBACK" required:"false" default:""` // path to a script that runs when the is a change in the labels database
|
||||
CallbackCooldown uint `envconfig:"CALLBACK_COOLDOWN" required:"false" default:"15"` // cooldown that prevents to run the config change script too many times in row
|
||||
CallbackFirstRunDelay uint `envconfig:"CALLBACK_FIRST_RUN_DELAY" required:"false" default:"30"` // Wait for this amount of seconds before callback is run for first time after fresh start of the daemon
|
||||
}
|
||||
|
||||
// GetConfig return configuration created based on environment variables
|
||||
|
|
|
@ -28,6 +28,20 @@ func listHandler(c echo.Context) error {
|
|||
return c.JSONPretty(200, discoveries, " ")
|
||||
}
|
||||
|
||||
// resolveHandler returns hostname(s) based on another label
|
||||
func resolveHandler(c echo.Context) error {
|
||||
label := c.QueryParam("label") // This is label we will use to filter discovery packets
|
||||
|
||||
output := []string{}
|
||||
|
||||
discoveries := discoveryStorage.Filter([]string{label})
|
||||
for _, discovery := range discoveries {
|
||||
output = append(output, discovery.Hostname)
|
||||
}
|
||||
|
||||
return c.JSONPretty(http.StatusOK, output, " ")
|
||||
}
|
||||
|
||||
func prometheusHandler(c echo.Context) error {
|
||||
name := c.Param("name")
|
||||
|
||||
|
@ -63,6 +77,9 @@ func addLabelsHandler(c echo.Context) error {
|
|||
return c.String(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
// Update the other nodes with this new change
|
||||
sendDiscoveryPacket()
|
||||
|
||||
return c.String(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
|
@ -84,5 +101,8 @@ func deleteLabelsHandler(c echo.Context) error {
|
|||
return c.String(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
// Update the other nodes with this new change
|
||||
sendDiscoveryPacket()
|
||||
|
||||
return c.String(http.StatusOK, "OK")
|
||||
}
|
||||
|
|
108
daemon/main.go
108
daemon/main.go
|
@ -20,10 +20,12 @@ import (
|
|||
var discoveryStorage server.Discoveries = server.Discoveries{}
|
||||
var driver common.Driver
|
||||
var localHost server.LocalHost
|
||||
var lastLocalHostname string
|
||||
|
||||
var config Config
|
||||
|
||||
var shuttingDown bool
|
||||
var sendDiscoveryPacketTrigger chan bool = make(chan bool)
|
||||
|
||||
func init() {
|
||||
// Load config from environment variables
|
||||
|
@ -33,7 +35,7 @@ func init() {
|
|||
discoveryStorage.LogChannel = make(chan string)
|
||||
discoveryStorage.TTL = config.TTL
|
||||
|
||||
// localhost initization
|
||||
// localhost initiation
|
||||
localHost = server.LocalHost{
|
||||
LabelsPath: config.LabelsPath,
|
||||
HostnameOverride: config.HostName,
|
||||
|
@ -88,21 +90,44 @@ func sendGoodbyePacket() {
|
|||
}
|
||||
}
|
||||
|
||||
// sendDisoveryPacket sends discovery packet regularly so the network know we exist
|
||||
// sendDiscoveryPacket sends a single discovery packet out
|
||||
func sendDiscoveryPacket() {
|
||||
sendDiscoveryPacketTrigger <- true
|
||||
}
|
||||
|
||||
// sendDisoveryPacket sends discovery packet to the driver which passes it to the
|
||||
// other nodes. By this it propagates any change that happens in the local discovery struct.
|
||||
// Every tune trigger is triggered it sends one message.
|
||||
func sendDiscoveryPacketTask(trigger chan bool) {
|
||||
for {
|
||||
discovery, err := localHost.GetIdentification()
|
||||
if err != nil {
|
||||
log.Printf("sending discovery identification error: %v\n", err)
|
||||
}
|
||||
// We are waiting for the trigger
|
||||
<-trigger
|
||||
|
||||
err = driver.SendDiscoveryPacket(discovery)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
|
||||
if !shuttingDown {
|
||||
// Get info about local machine and send to the exchange point
|
||||
discovery, err := localHost.GetIdentification()
|
||||
if err != nil {
|
||||
log.Printf("sending discovery identification error: %v\n", err)
|
||||
}
|
||||
|
||||
if shuttingDown {
|
||||
err = driver.SendDiscoveryPacket(discovery)
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
}
|
||||
|
||||
// If local hostname changes, we will deregister it
|
||||
if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" {
|
||||
log.Println("Hostname change detected, deregistering the old one")
|
||||
err = driver.SendGoodbyePacket(server.Discovery{
|
||||
Hostname: lastLocalHostname,
|
||||
})
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}
|
||||
|
||||
lastLocalHostname = discovery.Hostname
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -127,12 +152,38 @@ func main() {
|
|||
// Server discovering stuff
|
||||
// ------------------------
|
||||
|
||||
// Connect to the NATS service
|
||||
// Setup callback function to register and unregister discovery packets from other servers
|
||||
driver.RegisterSubscribeFunction(func(d server.Discovery) {
|
||||
// Check if the local version and the new version are somehowe changed
|
||||
localVersion := discoveryStorage.Get(d.Hostname)
|
||||
exists := discoveryStorage.Exist(d.Hostname)
|
||||
changed := server.Compare(localVersion, d)
|
||||
|
||||
discoveryStorage.Add(d)
|
||||
|
||||
if changed {
|
||||
// Print this only if the server is already registered
|
||||
if exists {
|
||||
log.Printf("%s has been updated", d.Hostname)
|
||||
}
|
||||
|
||||
go func() {
|
||||
err = discoveryChange(d)
|
||||
if err != nil {
|
||||
log.Printf("discovery changed error: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
})
|
||||
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
|
||||
discoveryStorage.Delete(d.Hostname)
|
||||
go func() {
|
||||
err = discoveryChange(d)
|
||||
if err != nil {
|
||||
log.Printf("discovery changed error: %v", err)
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
err = driver.Init()
|
||||
|
@ -143,13 +194,36 @@ func main() {
|
|||
go printDiscoveryLogs()
|
||||
go cleanDiscoveryPool()
|
||||
|
||||
if len(config.Callback) > 0 {
|
||||
go discoveryChangeLoop()
|
||||
go changeCatcherLoop()
|
||||
}
|
||||
|
||||
// When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any
|
||||
err = discoveryChange(server.Discovery{})
|
||||
if err != nil {
|
||||
log.Printf("discovery changed error: %v", err)
|
||||
}
|
||||
// If config.Register is false this instance won't be registered with other nodes
|
||||
if config.Register {
|
||||
go sendDiscoveryPacket()
|
||||
// This is background process that sends the message
|
||||
go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger)
|
||||
|
||||
// This triggers the process
|
||||
go func() {
|
||||
for {
|
||||
sendDiscoveryPacket()
|
||||
|
||||
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
|
||||
|
||||
if shuttingDown {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Println("standalone mode, I won't register myself")
|
||||
}
|
||||
|
||||
// --------
|
||||
// REST API
|
||||
// --------
|
||||
|
@ -161,10 +235,10 @@ func main() {
|
|||
}
|
||||
e.Use(middleware.Logger())
|
||||
e.Use(middleware.Recover())
|
||||
|
||||
// Routes
|
||||
if !config.DisableAPI {
|
||||
e.GET("/", listHandler)
|
||||
e.GET("/v1/resolve", resolveHandler)
|
||||
e.GET("/v1/discovery", getIdentificationHandler)
|
||||
e.GET("/v1/discoveries", listHandler)
|
||||
e.POST("/v1/labels", addLabelsHandler)
|
||||
|
@ -175,7 +249,6 @@ func main() {
|
|||
// ------------------------------
|
||||
// Termination signals processing
|
||||
// ------------------------------
|
||||
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
|
@ -191,7 +264,6 @@ func main() {
|
|||
}
|
||||
e.Shutdown(context.TODO())
|
||||
}(e, config)
|
||||
|
||||
// Start server
|
||||
// In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error.
|
||||
e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
|
||||
|
|
|
@ -34,19 +34,19 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
|
|||
|
||||
for _, discovery := range discoveries {
|
||||
port := strconv.Itoa(int(config.NodeExporterPort))
|
||||
host := discovery.Hostname
|
||||
hosts := []string{}
|
||||
var add bool // add to the prometheus output when there is at least one prometheus related label
|
||||
|
||||
labels := map[string]string{}
|
||||
labels := map[string]string{} // These are prometheus labels, not Lobby's labels
|
||||
|
||||
for _, label := range discovery.FindLabels("prometheus:" + name + ":") {
|
||||
for _, label := range discovery.FindLabelsByPrefix("prometheus:" + name + ":") {
|
||||
trimmed := strings.TrimPrefix(label.String(), "prometheus:"+name+":")
|
||||
parts := strings.SplitN(trimmed, ":", 2)
|
||||
if len(parts) == 2 {
|
||||
if parts[0] == "port" {
|
||||
port = parts[1]
|
||||
} else if parts[0] == "host" {
|
||||
host = parts[1]
|
||||
hosts = append(hosts, parts[1])
|
||||
} else {
|
||||
labels[parts[0]] = parts[1]
|
||||
}
|
||||
|
@ -65,20 +65,23 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
|
|||
}
|
||||
|
||||
if add {
|
||||
// Omit port part if "-" is set
|
||||
target := host + ":" + port
|
||||
if port == "-" {
|
||||
target = host
|
||||
}
|
||||
targets := []string{}
|
||||
for _, host := range hosts {
|
||||
// Omit port part if "-" is set or port is part of the host
|
||||
target := host + ":" + port
|
||||
if strings.Contains(host, ":") || port == "-" {
|
||||
target = host
|
||||
}
|
||||
targets = append(targets, target)
|
||||
|
||||
}
|
||||
service := PrometheusService{
|
||||
Targets: []string{target},
|
||||
Targets: targets,
|
||||
Labels: labels,
|
||||
}
|
||||
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return services
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestPreparePrometheusOutput(t *testing.T) {
|
||||
discoveries := []server.Discovery{
|
||||
{
|
||||
Hostname: "test.server",
|
||||
Labels: server.Labels{
|
||||
// test1
|
||||
"prometheus:test1:label1:l1",
|
||||
"prometheus:test1:host:srv1:1234",
|
||||
"prometheus:test1:host:srv1:1235",
|
||||
"prometheus:test1:host:srv1",
|
||||
"prometheus:test1:label2:l2",
|
||||
// test2
|
||||
"prometheus:test2:host:srv2",
|
||||
"prometheus:test2:host:srv2:1235",
|
||||
"prometheus:test2:host:srv2:1236",
|
||||
"prometheus:test2:host:srv2:1237",
|
||||
"prometheus:test2:label1:l3",
|
||||
},
|
||||
},
|
||||
}
|
||||
services := preparePrometheusOutput("test1", discoveries)
|
||||
assert.Equal(t, 1, len(services))
|
||||
assert.Equal(t, 3, len(services[0].Targets))
|
||||
assert.Contains(t, services[0].Targets, "srv1:9100")
|
||||
assert.Contains(t, services[0].Targets, "srv1:1234")
|
||||
assert.Contains(t, services[0].Targets, "srv1:1235")
|
||||
assert.Contains(t, services[0].Labels["label1"], "l1")
|
||||
assert.Contains(t, services[0].Labels["label2"], "l2")
|
||||
|
||||
services = preparePrometheusOutput("test2", discoveries)
|
||||
assert.Equal(t, 1, len(services))
|
||||
assert.Equal(t, 4, len(services[0].Targets))
|
||||
assert.Contains(t, services[0].Targets, "srv2:9100")
|
||||
assert.Contains(t, services[0].Targets, "srv2:1235")
|
||||
assert.Contains(t, services[0].Targets, "srv2:1236")
|
||||
assert.Contains(t, services[0].Targets, "srv2:1237")
|
||||
assert.Contains(t, services[0].Labels["label1"], "l3")
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/by-cx/lobby/server"
|
||||
)
|
||||
|
||||
// These functions are called when something has changed in the storage
|
||||
|
||||
var changeDetectedChannel chan bool = make(chan bool)
|
||||
var changeDetected bool
|
||||
|
||||
// changeCatcherLoop waits for a change signal and switches variable that says if the callback should run or not
|
||||
func changeCatcherLoop() {
|
||||
for {
|
||||
<-changeDetectedChannel
|
||||
changeDetected = true
|
||||
}
|
||||
}
|
||||
|
||||
// discoveryChangeLoop is used to process dynamic configuration changes.
|
||||
// When there is a change detected a shell script or given process is triggered
|
||||
// which does some operations with the new data. Usually it generates the
|
||||
// configuration.
|
||||
// This function has internal loop and won't allow to run the command more
|
||||
// often than it's the configured amount of time. That prevents
|
||||
func discoveryChangeLoop() {
|
||||
// This other loop tics in strict intervals and prevents the callback script to run more often than it's configured
|
||||
var cmd *exec.Cmd
|
||||
|
||||
// Delay first run of the callback script a little so everything can set up
|
||||
log.Printf("Delaying start of discovery change loop (%d seconds)\n", config.CallbackFirstRunDelay)
|
||||
time.Sleep(time.Duration(config.CallbackFirstRunDelay) * time.Second)
|
||||
log.Println("Starting discovery change loop")
|
||||
|
||||
for {
|
||||
if changeDetected {
|
||||
// We switch this at the beginning so we can detect new changes while the callback script is running
|
||||
changeDetected = false
|
||||
|
||||
log.Println("Running callback function")
|
||||
|
||||
// TODO: this is not the best way
|
||||
callbackCommandSlice := strings.Split(config.Callback, " ")
|
||||
|
||||
if len(callbackCommandSlice) == 1 {
|
||||
cmd = exec.Command(callbackCommandSlice[0])
|
||||
} else if len(callbackCommandSlice) > 1 {
|
||||
cmd = exec.Command(callbackCommandSlice[0], callbackCommandSlice[1:]...)
|
||||
} else {
|
||||
log.Println("wrong number of parts of the callback command")
|
||||
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
discoveriesJSON, err := json.Marshal(discoveryStorage.GetAll())
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = stdin.Write([]byte(discoveriesJSON))
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
err = stdin.Close()
|
||||
if err != nil {
|
||||
log.Println("stdin writing error: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
stdout, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
log.Println("Callback error: ", err.Error())
|
||||
}
|
||||
log.Println("Callback output: ", string(stdout))
|
||||
}
|
||||
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// discoveryChange is called when daemon detects that a newly arrived discovery
|
||||
// packet is somehow different than the localone. This can be used to trigger
|
||||
// some action in the local machine.
|
||||
func discoveryChange(discovery server.Discovery) error {
|
||||
if len(config.Callback) > 0 {
|
||||
changeDetectedChannel <- true
|
||||
}
|
||||
return nil
|
||||
}
|
1
go.mod
1
go.mod
|
@ -7,6 +7,7 @@ require (
|
|||
github.com/fatih/color v1.12.0
|
||||
github.com/go-redis/redis v6.15.9+incompatible
|
||||
github.com/go-resty/resty/v2 v2.6.0
|
||||
github.com/google/go-cmp v0.5.5
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/labstack/echo v3.3.10+incompatible
|
||||
github.com/labstack/gommon v0.3.0 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -31,6 +31,7 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
|
|||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
|
||||
|
@ -148,6 +149,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
|
|||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
|
|
|
@ -3,6 +3,9 @@ package nats_driver
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/by-cx/lobby/common"
|
||||
"github.com/by-cx/lobby/server"
|
||||
|
@ -52,15 +55,20 @@ func (d *Driver) Init() error {
|
|||
return fmt.Errorf("please initiate LogChannel variable")
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(d.NATSUrl)
|
||||
if err != nil {
|
||||
return err
|
||||
for {
|
||||
nc, err := nats.Connect(d.NATSUrl)
|
||||
if err != nil {
|
||||
log.Printf("Can't connect to the NATS server, waiting for 5 seconds before I try it again. (%v)\n", err)
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
}
|
||||
d.nc = nc
|
||||
break
|
||||
}
|
||||
d.nc = nc
|
||||
|
||||
_, err = nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
|
||||
_, err := d.nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("subscribe error: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -93,7 +101,14 @@ func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
|
|||
return fmt.Errorf("sending discovery formating message error: %v", err)
|
||||
}
|
||||
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
|
||||
if err != nil {
|
||||
// In case the connection is down we will try to reconnect
|
||||
if err != nil && strings.Contains(err.Error(), "connection closed") {
|
||||
d.nc.Close()
|
||||
err = d.Init()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sending discovery reconnect error: %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("sending discovery error: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -45,9 +45,9 @@ func (d *Discovery) Bytes() ([]byte, error) {
|
|||
return data, err
|
||||
}
|
||||
|
||||
// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
|
||||
// FindLabelsByPrefix returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
|
||||
// It doesn't have to be prefix, but for example "service:test" will match "service:test" and also "service:test2".
|
||||
func (d *Discovery) FindLabels(prefix string) Labels {
|
||||
func (d *Discovery) FindLabelsByPrefix(prefix string) Labels {
|
||||
labels := Labels{}
|
||||
for _, label := range d.Labels {
|
||||
if strings.HasPrefix(label.String(), prefix) {
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestDiscovery(t *testing.T) {
|
|||
assert.False(t, discovery.IsAlive(), "discovery not suppose to be alive")
|
||||
discovery.LastCheck = now
|
||||
|
||||
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabels("service"))
|
||||
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabelsByPrefix("service"))
|
||||
assert.Equal(t, nil, discovery.Validate()) // TODO: This needs more love
|
||||
|
||||
content, err := json.Marshal(&discovery)
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package server
|
||||
|
||||
import "github.com/google/go-cmp/cmp"
|
||||
|
||||
// Compare compares discovery A and B and returns true if those two are different.
|
||||
func Compare(discoveryA, discoveryB Discovery) bool {
|
||||
discoveryA.LastCheck = 0
|
||||
discoveryB.LastCheck = 0
|
||||
discoveryA.TTL = 0
|
||||
discoveryB.TTL = 0
|
||||
return !cmp.Equal(discoveryA, discoveryB)
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCompare(t *testing.T) {
|
||||
discoveryA := Discovery{
|
||||
Hostname: "abcd.com",
|
||||
Labels: Labels{
|
||||
Label("label1"),
|
||||
},
|
||||
LastCheck: 52,
|
||||
}
|
||||
discoveryAB := Discovery{
|
||||
Hostname: "abcd.com",
|
||||
Labels: Labels{
|
||||
Label("label1"),
|
||||
},
|
||||
LastCheck: 56,
|
||||
}
|
||||
discoveryB := Discovery{
|
||||
Hostname: "efgh.com",
|
||||
Labels: Labels{
|
||||
Label("label2"),
|
||||
},
|
||||
LastCheck: 56,
|
||||
}
|
||||
discoveryC := Discovery{
|
||||
Hostname: "abcd.com",
|
||||
Labels: Labels{
|
||||
Label("label2"),
|
||||
},
|
||||
LastCheck: 60,
|
||||
}
|
||||
|
||||
assert.True(t, Compare(discoveryA, discoveryB))
|
||||
assert.True(t, Compare(discoveryB, discoveryC))
|
||||
assert.True(t, Compare(discoveryA, discoveryC)) // Test different labels and same hostname
|
||||
assert.False(t, Compare(discoveryA, discoveryA))
|
||||
assert.False(t, Compare(discoveryB, discoveryB))
|
||||
assert.False(t, Compare(discoveryA, discoveryAB)) // Test that last check is zeroed
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
package server
|
||||
|
||||
import "strings"
|
||||
|
||||
// Label keeps one piece of information about a single server
|
||||
type Label string
|
||||
|
||||
|
@ -7,6 +9,34 @@ func (l Label) String() string {
|
|||
return string(l)
|
||||
}
|
||||
|
||||
// GetPart returns specific part of the label, if part index is higher than last available index it returns empty string.
|
||||
func (l Label) GetPart(idx int) string {
|
||||
parts := strings.Split(l.String(), ":")
|
||||
|
||||
if idx < 0 {
|
||||
return ""
|
||||
}
|
||||
if len(parts) >= idx {
|
||||
return ""
|
||||
}
|
||||
|
||||
return parts[idx]
|
||||
}
|
||||
|
||||
// GetPart1 is exactly same as GetPart but it splits the label only once, this is good for IPv6 addresses
|
||||
func (l Label) GetPart1(idx int) string {
|
||||
parts := strings.SplitN(l.String(), ":", 2)
|
||||
|
||||
if idx < 0 {
|
||||
return ""
|
||||
}
|
||||
if len(parts) >= idx {
|
||||
return ""
|
||||
}
|
||||
|
||||
return parts[idx]
|
||||
}
|
||||
|
||||
// Labels stores multiple Label records
|
||||
type Labels []Label
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Templater is used to generate config files from predefined
|
||||
// templates based on content of gathered discovery packets.
|
||||
// It can for example configure Nginx's backend or database
|
||||
// replication.
|
||||
//
|
||||
// It reads templates from /var/lib/lobby/templates (default)
|
||||
// which are YAML files cotaining the template itself and command(s)
|
||||
// that needs to be run when the template changes.
|
||||
|
||||
const defaultTemplatesPath = "/var/lib/lobby/templates"
|
||||
|
||||
var templatesPath *string
|
||||
|
||||
func init() {
|
||||
templatesPath = flag.String("templates-path", defaultTemplatesPath, "path of where templates are stored")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
err := os.MkdirAll(*templatesPath, 0750)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println(*templatesPath)
|
||||
}
|
Loading…
Reference in New Issue