Compare commits

...

32 Commits
v1.0 ... main

Author SHA1 Message Date
Adam Štrauch c2ed658aaa
Multiple prometheus services support
continuous-integration/drone/push Build is passing Details
* Prometheus export test
* Version bump to 1.5
2022-02-21 10:20:06 +01:00
Adam Štrauch 038741b87e
Version bump to 1.4
continuous-integration/drone/push Build is passing Details
2021-12-31 09:08:22 +01:00
Adam Štrauch 157e795792
Invalid NATS connection fix when subscribing to a subject
continuous-integration/drone/push Build is passing Details
2021-12-31 08:09:44 +01:00
Adam Štrauch 55b3376d4c
Reconnect instead of exit when NATS is not available
continuous-integration/drone/push Build is passing Details
2021-12-25 10:38:13 +01:00
Adam Štrauch 0e836b68a8
NATS driver: try to reconnect if the connection is down
continuous-integration/drone/push Build is passing Details
continuous-integration/drone Build is passing Details
2021-09-27 00:10:16 +02:00
Adam Štrauch 322ef2a258
Version bump to 1.3 2021-09-20 16:41:14 +02:00
Adam Štrauch 6a1ecb80a1
Resolver
continuous-integration/drone/push Build is passing Details
Resolve API endpoint and cli command return list of hostnames
base on given label.
2021-09-20 16:10:52 +02:00
Adam Štrauch 3a7662ef0d
README update
continuous-integration/drone/push Build is passing Details
2021-09-18 19:43:13 +02:00
Adam Štrauch 4c0ca7ed6a
Callback script docs
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/tag Build is passing Details
2021-09-18 19:41:26 +02:00
Adam Štrauch a4a97ec49d
Version bump to 1.2
continuous-integration/drone/push Build is passing Details
2021-09-18 19:34:18 +02:00
Adam Štrauch 66683c9496
Update drone.yml file format
continuous-integration/drone/push Build is passing Details
2021-09-17 19:06:37 +02:00
Adam Štrauch 24923a305b
Renaming drone/woodpecker file for compatibility reasons
continuous-integration/drone/push Build is passing Details
2021-09-17 08:19:41 +02:00
Adam Štrauch ab8068e26d
Testing pipeline
continuous-integration/drone Build is passing Details
2021-09-17 01:39:48 +02:00
Adam Štrauch 202a60f091
Initial woodpeck pipeline 2021-09-17 01:36:43 +02:00
Adam Štrauch a9d7abbdd5
Don't run callback stuff when callback is not configured 2021-09-17 01:36:20 +02:00
Adam Štrauch 709c47af3e
Callback script
Callback is run when there some discovery packet changes.
It can be an update, adding a new one or deleting the old one.
This can be used to perform dynamic configuration of services that
don't support lobby's API.
2021-09-15 16:58:09 +02:00
Adam Štrauch e06a5cc94b
Implementation of change detection
If there is an update in discovery an function is triggered
that can pick it up.
2021-09-11 11:58:27 +02:00
Adam Štrauch 6769b903bc
Mini refactoring 2021-09-09 18:35:32 +02:00
Adam Štrauch 8493c7cb40
Send discovery packet every time a new label is added 2021-09-09 18:25:22 +02:00
Adam Štrauch a0abda5196
README update 2021-09-07 14:12:29 +02:00
Adam Štrauch 58495d59c1
Labels sorting by default and enhancements
Faster building,
disable API fix.
2021-09-07 01:18:17 +02:00
Adam Štrauch 2f4203e7c3
README update 2021-09-07 00:50:13 +02:00
Adam Štrauch 7010316f8f
Redis driver, changing name of the package 2021-09-07 00:46:48 +02:00
Adam Štrauch f7340728be
Dropping 5s waiting to finish everything to 1s 2021-09-06 18:34:38 +02:00
Adam Štrauch f2c3ef49e9
README update 2021-09-06 12:33:08 +02:00
Adam Štrauch 128ed765e7
Daemon: don't return not matched labels when searching 2021-09-05 22:00:08 +02:00
Adam Štrauch f60f92d667
Makefile update 2021-09-05 21:44:11 +02:00
Adam Štrauch 65c6aef5dc
CTL: color output 2021-09-05 21:43:19 +02:00
Adam Štrauch 753aaf8377
CTL: JSON output 2021-09-05 21:35:10 +02:00
Adam Štrauch c6cb674053
Search by labels and prefixes
Implemented in API and ctl.
2021-09-05 17:30:21 +02:00
Adam Štrauch b120970054
Makefile and README updates
Update description text and fix installation instructions.

Makefile can build binaries for multiple archs now.
2021-09-05 17:10:47 +02:00
Adam Štrauch b19002ec84
README update 2021-09-05 14:38:32 +02:00
28 changed files with 1083 additions and 139 deletions

10
.drone.yml Normal file
View File

@ -0,0 +1,10 @@
kind: pipeline
type: docker
name: testing
steps:
- name: test
image: golang
commands:
- go mod tidy
- make test

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
.history/
# Files with secrets
*secret*
tmp/

View File

@ -1,3 +1,6 @@
VERSION=1.5
.PHONY: all
all: build
@ -9,9 +12,39 @@ clean:
test:
go test -v server/*.go
.PHONY: build
build: test
init:
mkdir -p ./bin
export CGO_ENABLED=0 && go build -o ./bin/lobbyd daemon/*.go
export CGO_ENABLED=0 && go build -o ./bin/lobbyctl ctl/*.go
.PHONY: build
build: test clean init linux-amd64 linux-arm linux-arm64 darwin-amd64 darwin-arm64
.PHONY: linux-amd64
linux-amd64: clean init
# linux amd64
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/lobbyd-${VERSION}-linux-amd64 daemon/*.go
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/lobbyctl-${VERSION}-linux-amd64 ctl/*.go
.PHONY: linux-arm
linux-arm: clean init
# linux arm
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o ./bin/lobbyd-${VERSION}-linux-arm daemon/*.go
env CGO_ENABLED=0 GOOS=linux GOARCH=arm go build -o ./bin/lobbyctl-${VERSION}-linux-arm ctl/*.go
.PHONY: linux-arm64
linux-arm64: clean init
# linux arm64
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./bin/lobbyd-${VERSION}-linux-arm64 daemon/*.go
env CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o ./bin/lobbyctl-${VERSION}-linux-arm64 ctl/*.go
.PHONY: darwin-amd64
darwin-amd64: clean init
# darwin amd64
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./bin/lobbyd-${VERSION}-darwin-amd64 daemon/*.go
env CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o ./bin/lobbyctl-${VERSION}-darwin-amd64 ctl/*.go
.PHONY: darwin-arm64
darwin-arm64: clean init
# darwin arm64
env CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o ./bin/lobbyd-${VERSION}-darwin-arm64 daemon/*.go
env CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o ./bin/lobbyctl-${VERSION}-darwin-arm64 ctl/*.go

129
README.md
View File

@ -1,11 +1,15 @@
# Lobby - simple server/service discovery service
TLDR: This a labeling tool for your servers. Like AWS resource tags but available everywhere.
In one of ours projects we needed service discovery that doesn't need complicated setup just to share
a simple information about running services and checking if they are still alive. So we came up with
this small service we call Lobby. It's like a lobby in games but in this case there are servers. Each
server runs one or more instances of lobby daemon and it regularly sends how it's configured.
this small service we called Lobby. It's like a lobby for users in games but in this case there are
servers instead. Each server runs one or more instances of lobby daemon and it regularly sends info
about its hostname and configured labels.
We call the information about the server and services running on it *labels*. Every server shares
Labels are similar what you could know from AWS. It's basically alternative to resources' tags feature
with the only different that you can use this anywhere including AWS. Every server sends something called
"discovery packet" which is basically a json that looks like this:
```json
@ -15,39 +19,42 @@ We call the information about the server and services running on it *labels*. Ev
"service:smtp",
"public_ip4:1.2.3.4",
"public_ip6:2a03::1"
],
"last_check": 1630612478
]
}
```
The packet contains information what's the server hostname and then list of labels describing
what's running on it and what are the IP addresses. What's in the labels is completely up to you
but in some use-cases (Node Exporter API endpoint) it expects "NAME:VALUE" format.
The packet contains information what's the server's hostname and then list of labels describing
what's running on it, what are the IP addresses, services, directories to backup or server's location for example.
What's in the labels is completely up to you but in some use-cases (Node Exporter API endpoint) it
expects "NAME:VALUE" format.
The labels can be configured via environment variables but also as files located in
*/etc/lobby/labels* (configurable path) so it can dynamically change.
*/etc/lobby/labels* (configurable path) so it can dynamically change. Another way is to use
*lobbyctl* which can add new labels at runtime.
When everything is running just call your favorite http client against "http://localhost:1313/"
on any of the running instances and lobby returns you list of all available servers and
on any of the running instances and lobby returns a list of all available servers and
their labels. You can hook it to Prometheus, deployment scripts, CI/CD automations or
your internal system that sends emails and it needs to know where is the SMTP server for
example.
Lobby doesn't care if you have a one or thousand instances of it running. Each instance
is connected to a common point which is a [NATS server](https://nats.io/) in this case. NATS is super fast and reliable
is connected to a common point which is a [NATS server](https://nats.io/) or Redis. NATS is super fast and reliable
messaging system which handles the communication part but also the high availability part.
NATS is easy to run and it offloads a huge part of the problem from lobby itself.
NATS is easy to run and it offloads a huge part of the problem from lobby itself. But Redis
is not a bad choice either in some cases.
The code is open to support multiple backends and it's not that hard to add a new one.
Support for NATS is only less than 150 lines.
## Quickstart guide
The quickest way how to run lobbyd on your server is this:
The quickest way how to run lobby on your server is this:
```shell
wget -O /usr/local/bin/lobbyd https://....
wget -O /usr/local/bin/lobbyd https://github.com/by-cx/lobby/releases/download/v1.2/lobbyd-1.2-linux-amd64
chmod +x /usr/local/bin/lobbyd
wget -O /usr/local/bin/lobbyctl https://....
wget -O /usr/local/bin/lobbyctl https://github.com/by-cx/lobby/releases/download/v1.2/lobbyctl-1.2-linux-amd64
chmod +x /usr/local/bin/lobbyctl
# Update NATS_URL and LABELS here
@ -72,7 +79,7 @@ systemctl enable lobbyd
```
If you run lobbyd in production, consider to create its own system user and group and add both into this
service file. It doesn't need to access almost anything in your system.
service file. It doesn't need to access everything in your system.
To test if local instance is running call this:
@ -82,24 +89,42 @@ To test if local instance is running call this:
There are other config directives you can use to fine-tune lobbyd to exactly what you need.
| Environment variable | Type | Default | Required | Note |
| ----------------------- | ------ | ----------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
| PORT | int | 1313 | no | Port related to the address above |
| NATS_URL | string | | yes | NATS URL used to connect to the NATS server |
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
| LABELS | string | | no | List of labels, labels should be separated by comma |
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
| HOSTNAME | string | | no | Override local machine's hostname |
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
| Environment variable | Type | Default | Required | Note |
| ------------------------ | ------ | ----------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| TOKEN | string | | no | Authentication token for API, if empty auth is disabled |
| HOST | string | 127.0.0.1 | no | IP address used for the REST server to listen |
| PORT | int | 1313 | no | Port related to the address above |
| DISABLE_API | bool | false | no | If true API interface won't start |
| DRIVER | string | NATS | yes | Selects which driver is used to exchange the discovery packets. |
| NATS_URL | string | | yes (NATS driver) | NATS URL used to connect to the NATS server |
| NATS_DISCOVERY_CHANNEL | string | lobby.discovery | no | Channel where the keep-alive packets are sent |
| REDIS_HOST | string | 127.0.0.1" | no | Redis host |
| REDIS_PORT | uint16 | 6379 | no | Redis port |
| REDIS_DB | string | 0 | no | Redis DB |
| REDIS_CHANNEL | string | lobby:discovery | no | Redis channel |
| REDIS_PASSWORD | string | | no | Redis password |
| LABELS | string | | no | List of labels, labels should be separated by comma |
| LABELS_PATH | string | /etc/lobby/labels | no | Path where filesystem based labels are located, one label per line, filename is not important for lobby |
| RUNTIME_LABELS_FILENAME | string | _runtime | no | Filename for file created in LabelsPath where runtime labels will be added |
| HOSTNAME | string | | no | Override local machine's hostname |
| CLEAN_EVERY | int | 15 | no | How often to clean the list of discovered servers to get rid of the not alive ones [secs] |
| KEEP_ALIVE | int | 5 | no | how often to send the keep-alive discovery message with all available information [secs] |
| TTL | int | 30 | no | After how many secs is discovery record considered as invalid |
| NODE_EXPORTER_PORT | int | 9100 | no | Default port where node_exporter listens on all registered servers, this is used when the special prometheus labels doesn't contain port |
| REGISTER | bool | true | no | If true (default) then local instance is registered with other instance (discovery packet is sent regularly), if false the daemon runs only as a client |
| CALLBACK | string | | no | Path to a script that runs when the the discovery packet records are changed. Not running for first |
| CALLBACK_COOLDOWN | int | 15 | no | Cooldown prevents the call back script to run sooner than configured amount of seconds after last run is finished. |
| CALLBACK_FIRST_RUN_DELAY | int | 30 | no | Wait for this amount of seconds before callback is run for first time after fresh start of the daemon |
### Callback script
When your application cannot support Lobbyd's API it can be configured via callback script that runs everytime something has changed in the network. Callback script is run every 15 seconds (configured by CALLBACK_COOLDOWN) but only when something has changed.
The script runs under the same user as lobbyd. When lobbyd starts first thirty seconds (CALLBACK_FIRST_RUN_DELAY) is ignored and then the script is run for first time. After these thirty seconds everything runs in loop based on the changes in the network.
All current discovery packets are passed to the callback script via standard input. It's basically the same input you get if you run `lobbyctl discoveries`.
### Service discovery for Prometheus
Lobbyd has an API endpoint that returns list of targets for [Prometheus's HTTP SD config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config). That
@ -112,6 +137,8 @@ Let's check this:
If you set port to *-* lobby daemon omits port entirely from the output.
There can be multiple `host` labels but only one `port` label and all prometheus labels (last line) will be common for all hosts labels. If port is omitted then default 9100 is used or port can also be part of the host label.
When you open URL http://localhost:1313/v1/prometheus/nodeexporter it returns this:
```json
@ -175,24 +202,42 @@ It uses Go client library also located in this repository.
So far the REST API is super simple and it has only two endpoints:
```
GET / # Same as /v1/discoveries
GET /v1/discovery # Returns current local discovery packet
GET /v1/discoveries # Returns list of all discovered servers and their labels.
GET /v1/discoveries?labels=LABELS # output will be filtered based on one or multiple labels separated by comma
GET /v1/prometheus/:name # Generates output for Prometheus's SD config, name is group of the monitoring services described above.
POST /v1/labels # Add runtime labels that will persist over daemon restarts. Labels should be in the body of the request, one line per one label.
DELETE /v1/labels # Delete runtime labels. One label per line. Can't affect the labels from environment variables or labels added from the LabelPath.
GET / # Same as /v1/discoveries
GET /v1/discovery # Returns current local discovery packet
GET /v1/discoveries # Returns list of all discovered servers and their labels.
GET /v1/discoveries?labels=LABELS&prefixes=PREFIXES # output will be filtered based on one or multiple labels separated by comma or it can search for given prefixes, only one of those will be used
GET /v1/prometheus/:name # Generates output for Prometheus's SD config, name is group of the monitoring services described above.
POST /v1/labels # Add runtime labels that will persist over daemon restarts. Labels should be in the body of the request, one line per one label.
DELETE /v1/labels # Delete runtime labels. One label per line. Can't affect the labels from environment variables or labels added from the LabelPath.
```
If there is an error the error message is returned as plain text.
## API clients
* Golang client is part of this repository.
* There is also [Python client](https://github.com/by-cx/lobby-python) available.
## Notes
I wanted to use SQS or SNS as backend but when I checked the services I found out
it wouldn't work. SNS would require open HTTP server whicn is hard to do in our
infrastructure and I couldn't find a way how SQS could deliver every message
to all instances of lobbyd.
Instead I decided to implement Redis because it's much easier to use for
development and testing. But there is no reason why it couldn't work in production
too.
## TODO
* [X] Tests
* [ ] Command hooks - script or list of scripts that are triggered when discovery status has changed
* [X] Command hooks - script or list of scripts that are triggered when discovery status has changed
* [ ] Support for multiple active backend drivers
* [ ] SNS driver
* [X] Redis driver
* [X] Remove the 5 secs waiting when daemon is stopped
* [X] API to allow add labels at runtime
* [ ] Check what happens when driver is disconnected

View File

@ -6,8 +6,8 @@ import (
"fmt"
"strings"
"github.com/by-cx/lobby/server"
"github.com/go-resty/resty/v2"
"github.com/rosti-cz/server_lobby/server"
)
// Encapsulation of Lobby's client code
@ -114,14 +114,64 @@ func (l *LobbyClient) GetDiscoveries() ([]server.Discovery, error) {
return discoveries, nil
}
// Resolve returns list of hostnames that have given label
func (l *LobbyClient) Resolve(label server.Label) ([]string, error) {
l.init()
path := fmt.Sprintf("/v1/resolve?label=%s", label.String())
method := "GET"
var hostnames []string
status, body, err := l.call(method, path, "")
if err != nil {
return hostnames, err
}
if status != 200 {
return hostnames, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &hostnames)
if err != nil {
return hostnames, fmt.Errorf("response parsing error: %v", err)
}
return hostnames, nil
}
// Find discoveries by their labels
func (l *LobbyClient) FindByLabels(labels server.Labels) (server.Discoveries, error) {
func (l *LobbyClient) FindByLabels(labels server.Labels) ([]server.Discovery, error) {
l.init()
path := fmt.Sprintf("/v1/discoveries?labels=%s", strings.Join(labels.StringSlice(), ","))
method := "GET"
var discoveries server.Discoveries
var discoveries []server.Discovery
status, body, err := l.call(method, path, "")
if err != nil {
return discoveries, err
}
if status != 200 {
return discoveries, fmt.Errorf("non-200 response: %s", body)
}
err = json.Unmarshal([]byte(body), &discoveries)
if err != nil {
return discoveries, fmt.Errorf("response parsing error: %v", err)
}
return discoveries, nil
}
// Find discoveries by label prefixes
func (l *LobbyClient) FindByPrefixes(prefixes []string) ([]server.Discovery, error) {
l.init()
path := fmt.Sprintf("/v1/discoveries?prefixes=%s", strings.Join(prefixes, ","))
method := "GET"
var discoveries []server.Discovery
status, body, err := l.call(method, path, "")
if err != nil {

View File

@ -1,6 +1,6 @@
package common
import "github.com/rosti-cz/server_lobby/server"
import "github.com/by-cx/lobby/server"
// Listener is a function that returns received discovery
type Listener func(server.Discovery)

View File

@ -1,14 +1,18 @@
package main
import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"github.com/rosti-cz/server_lobby/server"
"github.com/by-cx/lobby/server"
"github.com/fatih/color"
)
func printDiscovery(discovery server.Discovery) {
fmt.Printf("Hostname:\n %s\n", discovery.Hostname)
color.Yellow("Hostname:\n %s\n", discovery.Hostname)
if len(discovery.Labels) > 0 {
fmt.Printf("Labels:\n")
@ -18,6 +22,15 @@ func printDiscovery(discovery server.Discovery) {
}
}
func colorLabel(label server.Label) string {
parts := strings.Split(label.String(), ":")
if len(parts) == 1 {
return color.GreenString(parts[0])
}
return color.GreenString(parts[0]) + ":" + color.MagentaString((strings.Join(parts[1:], ":")))
}
func printDiscoveries(discoveries []server.Discovery) {
maxHostnameWidth := 0
for _, discovery := range discoveries {
@ -28,16 +41,29 @@ func printDiscoveries(discoveries []server.Discovery) {
for _, discovery := range discoveries {
if len(discovery.Labels) == 0 {
fmt.Println(discovery.Hostname)
// fmt.Println(discovery.Hostname)
color.Yellow(discovery.Hostname)
} else {
hostname := fmt.Sprintf("%"+strconv.Itoa(maxHostnameWidth)+"s", discovery.Hostname)
fmt.Printf("%s %s\n", hostname, discovery.Labels[0].String())
fmt.Printf("%s %s\n", color.YellowString(hostname), colorLabel(discovery.Labels[0]))
if len(discovery.Labels) > 1 {
for _, label := range discovery.Labels[1:] {
fmt.Printf("%"+strconv.Itoa(maxHostnameWidth+4)+"s%s\n", " ", label)
fmt.Printf("%"+strconv.Itoa(maxHostnameWidth+4)+"s%s\n", " ", colorLabel(label))
}
}
}
fmt.Println()
}
}
func printJSON(data interface{}) {
body, err := json.Marshal(data)
if err != nil {
fmt.Println("error occurred while formating the output into JSON:", err.Error())
os.Exit(3)
}
fmt.Println(string(body))
}

View File

@ -6,18 +6,21 @@ import (
"os"
"strings"
"github.com/rosti-cz/server_lobby/client"
"github.com/rosti-cz/server_lobby/server"
"github.com/by-cx/lobby/client"
"github.com/by-cx/lobby/server"
)
func Usage() {
flag.Usage()
fmt.Println("")
fmt.Println("Commands:")
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
fmt.Println(" discoveries returns list of all registered discovery packets")
fmt.Println(" labels add LABEL [LABEL] ... adds new runtime labels")
fmt.Println(" labels del LABEL [LABEL] ... deletes runtime labels")
fmt.Println(" resolve label returns list of hostnames with given label")
fmt.Println(" discovery returns discovery packet of the server where the client is connected to")
fmt.Println(" discoveries returns list of all registered discovery packets")
fmt.Println(" discoveries labels [LABEL] ... returns list of all registered discovery packets with given labels (OR)")
fmt.Println(" discoveries search [LABEL] ... returns list of all registered discovery packets with given label prefixes (OR)")
fmt.Println(" labels add LABEL [LABEL] ... adds new runtime labels")
fmt.Println(" labels del LABEL [LABEL] ... deletes runtime labels")
}
func main() {
@ -28,6 +31,7 @@ func main() {
host := flag.String("host", "", "Hostname or IP address of lobby daemon")
port := flag.Uint("port", 0, "Port of lobby daemon")
token := flag.String("token", "", "Token needed to communicate lobby daemon, if empty auth is disabled")
jsonOutput := flag.Bool("json", false, "set output to JSON, error will be still in plain text")
flag.Parse()
@ -65,18 +69,77 @@ func main() {
}
switch flag.Args()[0] {
case "discoveries":
discoveries, err := client.GetDiscoveries()
if err != nil {
fmt.Println(err)
case "resolve":
if len(flag.Args()) == 2 {
hostnames, err := client.Resolve(server.Label(flag.Arg(1)))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if *jsonOutput {
printJSON(hostnames)
} else {
for _, hostname := range hostnames {
fmt.Println(hostname)
}
}
} else {
Usage()
os.Exit(0)
}
case "discoveries":
var discoveries []server.Discovery
var err error
if len(flag.Args()) > 2 {
if flag.Arg(1) == "labels" {
labels := []server.Label{}
for _, label := range flag.Args()[2:] {
labels = append(labels, server.Label(label))
}
discoveries, err = client.FindByLabels(labels)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} else if flag.Arg(1) == "search" {
discoveries, err = client.FindByPrefixes(flag.Args()[2:])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} else {
fmt.Println("ERROR: unknown usage of discoveries arguments")
fmt.Println("")
Usage()
os.Exit(0)
}
} else {
discoveries, err = client.GetDiscoveries()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
if *jsonOutput {
printJSON(discoveries)
} else {
printDiscoveries(discoveries)
}
printDiscoveries(discoveries)
case "discovery":
discovery, err := client.GetDiscovery()
if err != nil {
fmt.Println(err)
}
printDiscovery(discovery)
if *jsonOutput {
printJSON(discovery)
} else {
printDiscovery(discovery)
}
case "labels":
if len(flag.Args()) < 3 {
fmt.Println("ERROR: not enough arguments for labels command")

View File

@ -3,8 +3,8 @@ package main
import (
"log"
"github.com/by-cx/lobby/server"
"github.com/kelseyhightower/envconfig"
"github.com/rosti-cz/server_lobby/server"
)
// Config keeps info about configuration of this daemon
@ -12,8 +12,15 @@ type Config struct {
Token string `envconfig:"TOKEN" required:"false"` // Authentication token, if empty auth is disabled
Host string `envconfig:"HOST" required:"false" default:"127.0.0.1"` // IP address used for the REST server to listen
Port uint16 `envconfig:"PORT" required:"false" default:"1313"` // Port related to the address above
NATSURL string `envconfig:"NATS_URL" required:"true"` // NATS URL used to connect to the NATS server
DisableAPI bool `envconfig:"DISABLE_API" required:"false" default:"false"` // If true API interface won't start
Driver string `envconfig:"DRIVER" required:"false" default:"NATS"` // Select driver to use to communicate with the group of nodes. The possible values are NATS and Redis
NATSURL string `envconfig:"NATS_URL" required:"false"` // NATS URL used to connect to the NATS server
NATSDiscoveryChannel string `envconfig:"NATS_DISCOVERY_CHANNEL" required:"false" default:"lobby.discovery"` // Channel where the kepp alive packets are sent
RedisHost string `envconfig:"REDIS_HOST" required:"false" default:"127.0.0.1"` // Redis host
RedisPort uint16 `envconfig:"REDIS_PORT" required:"false" default:"6379"` // Redis port
RedisDB uint `envconfig:"REDIS_DB" required:"false" default:"0"` // Redis DB
RedisChannel string `envconfig:"REDIS_CHANNEL" required:"false" default:"lobby:discovery"` // Redis channel
RedisPassword string `envconfig:"REDIS_PASSWORD" required:"false" default:""` // Redis password
Labels server.Labels `envconfig:"LABELS" required:"false" default:""` // List of labels
LabelsPath string `envconfig:"LABELS_PATH" required:"false" default:"/etc/lobby/labels"` // Path where filesystem based labels are located
RuntimeLabelsFilename string `envconfig:"RUNTIME_LABELS_FILENAME" required:"false" default:"_runtime"` // Filename for file created in LabelsPath where runtime labels will be added
@ -23,6 +30,9 @@ type Config struct {
TTL uint `envconfig:"TTL" required:"false" default:"30"` // After how many secs is discovery record considered as invalid
NodeExporterPort uint `envconfig:"NODE_EXPORTER_PORT" required:"false" default:"9100"` // Default port where node_exporter listens on all registered servers
Register bool `envconfig:"REGISTER" required:"false" default:"true"` // If true (default) then local instance is registered with other instance (discovery packet is sent regularly)
Callback string `envconfig:"CALLBACK" required:"false" default:""` // path to a script that runs when the is a change in the labels database
CallbackCooldown uint `envconfig:"CALLBACK_COOLDOWN" required:"false" default:"15"` // cooldown that prevents to run the config change script too many times in row
CallbackFirstRunDelay uint `envconfig:"CALLBACK_FIRST_RUN_DELAY" required:"false" default:"30"` // Wait for this amount of seconds before callback is run for first time after fresh start of the daemon
}
// GetConfig return configuration created based on environment variables
@ -34,5 +44,13 @@ func GetConfig() *Config {
log.Fatal(err.Error())
}
if config.Driver != "Redis" && config.Driver != "NATS" {
log.Fatal("ERROR: the only supported drivers are Redis and NATS (default)")
}
if config.Driver == "NATS" && len(config.NATSURL) == 0 {
log.Fatal("ERROR: NATS_URL cannot be empty when driver is set to NATS")
}
return &config
}

View File

@ -6,18 +6,21 @@ import (
"net/http"
"strings"
"github.com/by-cx/lobby/server"
"github.com/labstack/echo"
"github.com/rosti-cz/server_lobby/server"
)
func listHandler(c echo.Context) error {
labels := c.QueryParam("labels")
prefixes := c.QueryParam("prefixes")
var discoveries []server.Discovery
if len(labels) > 0 {
labelsFilterSlice := strings.Split(labels, ",")
discoveries = discoveryStorage.Filter(labelsFilterSlice)
} else if len(prefixes) > 0 {
discoveries = discoveryStorage.FilterPrefix(strings.Split(prefixes, ","))
} else {
discoveries = discoveryStorage.GetAll()
}
@ -25,6 +28,20 @@ func listHandler(c echo.Context) error {
return c.JSONPretty(200, discoveries, " ")
}
// resolveHandler returns hostname(s) based on another label
func resolveHandler(c echo.Context) error {
label := c.QueryParam("label") // This is label we will use to filter discovery packets
output := []string{}
discoveries := discoveryStorage.Filter([]string{label})
for _, discovery := range discoveries {
output = append(output, discovery.Hostname)
}
return c.JSONPretty(http.StatusOK, output, " ")
}
func prometheusHandler(c echo.Context) error {
name := c.Param("name")
@ -60,6 +77,9 @@ func addLabelsHandler(c echo.Context) error {
return c.String(http.StatusBadRequest, err.Error())
}
// Update the other nodes with this new change
sendDiscoveryPacket()
return c.String(http.StatusOK, "OK")
}
@ -81,5 +101,8 @@ func deleteLabelsHandler(c echo.Context) error {
return c.String(http.StatusBadRequest, err.Error())
}
// Update the other nodes with this new change
sendDiscoveryPacket()
return c.String(http.StatusOK, "OK")
}

View File

@ -9,20 +9,23 @@ import (
"syscall"
"time"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/nats_driver"
"github.com/by-cx/lobby/redis_driver"
"github.com/by-cx/lobby/server"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/rosti-cz/server_lobby/common"
"github.com/rosti-cz/server_lobby/nats_driver"
"github.com/rosti-cz/server_lobby/server"
)
var discoveryStorage server.Discoveries = server.Discoveries{}
var driver common.Driver
var localHost server.LocalHost
var lastLocalHostname string
var config Config
var shuttingDown bool
var sendDiscoveryPacketTrigger chan bool = make(chan bool)
func init() {
// Load config from environment variables
@ -32,7 +35,7 @@ func init() {
discoveryStorage.LogChannel = make(chan string)
discoveryStorage.TTL = config.TTL
// localhost initization
// localhost initiation
localHost = server.LocalHost{
LabelsPath: config.LabelsPath,
HostnameOverride: config.HostName,
@ -41,12 +44,27 @@ func init() {
}
// Setup driver
driver = &nats_driver.Driver{
NATSUrl: config.NATSURL,
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
if config.Driver == "NATS" {
driver = &nats_driver.Driver{
NATSUrl: config.NATSURL,
NATSDiscoveryChannel: config.NATSDiscoveryChannel,
LogChannel: discoveryStorage.LogChannel,
LogChannel: discoveryStorage.LogChannel,
}
} else if config.Driver == "Redis" {
driver = &redis_driver.Driver{
Host: config.RedisHost,
Port: uint(config.RedisPort),
Password: config.RedisPassword,
Channel: config.RedisChannel,
DB: uint(config.RedisDB),
LogChannel: discoveryStorage.LogChannel,
}
} else {
log.Fatalf("unsupported driver %s", config.Driver)
}
}
// cleanDiscoveryPool clears the local server map and keeps only the alive servers
@ -72,21 +90,44 @@ func sendGoodbyePacket() {
}
}
// sendDisoveryPacket sends discovery packet regularly so the network know we exist
// sendDiscoveryPacket sends a single discovery packet out
func sendDiscoveryPacket() {
sendDiscoveryPacketTrigger <- true
}
// sendDisoveryPacket sends discovery packet to the driver which passes it to the
// other nodes. By this it propagates any change that happens in the local discovery struct.
// Every tune trigger is triggered it sends one message.
func sendDiscoveryPacketTask(trigger chan bool) {
for {
discovery, err := localHost.GetIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
// We are waiting for the trigger
<-trigger
err = driver.SendDiscoveryPacket(discovery)
if err != nil {
log.Println(err)
}
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if !shuttingDown {
// Get info about local machine and send to the exchange point
discovery, err := localHost.GetIdentification()
if err != nil {
log.Printf("sending discovery identification error: %v\n", err)
}
if shuttingDown {
err = driver.SendDiscoveryPacket(discovery)
if err != nil {
log.Println(err.Error())
}
// If local hostname changes, we will deregister it
if discovery.Hostname != lastLocalHostname && lastLocalHostname != "" {
log.Println("Hostname change detected, deregistering the old one")
err = driver.SendGoodbyePacket(server.Discovery{
Hostname: lastLocalHostname,
})
if err != nil {
log.Println(err)
}
}
lastLocalHostname = discovery.Hostname
} else {
break
}
}
@ -111,12 +152,38 @@ func main() {
// Server discovering stuff
// ------------------------
// Connect to the NATS service
// Setup callback function to register and unregister discovery packets from other servers
driver.RegisterSubscribeFunction(func(d server.Discovery) {
// Check if the local version and the new version are somehowe changed
localVersion := discoveryStorage.Get(d.Hostname)
exists := discoveryStorage.Exist(d.Hostname)
changed := server.Compare(localVersion, d)
discoveryStorage.Add(d)
if changed {
// Print this only if the server is already registered
if exists {
log.Printf("%s has been updated", d.Hostname)
}
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
}
})
driver.RegisterUnsubscribeFunction(func(d server.Discovery) {
discoveryStorage.Delete(d.Hostname)
go func() {
err = discoveryChange(d)
if err != nil {
log.Printf("discovery changed error: %v", err)
}
}()
})
err = driver.Init()
@ -127,13 +194,36 @@ func main() {
go printDiscoveryLogs()
go cleanDiscoveryPool()
if len(config.Callback) > 0 {
go discoveryChangeLoop()
go changeCatcherLoop()
}
// When the daemon boots up we trigger discovery change event so the config can be setup via callback script if there is any
err = discoveryChange(server.Discovery{})
if err != nil {
log.Printf("discovery changed error: %v", err)
}
// If config.Register is false this instance won't be registered with other nodes
if config.Register {
go sendDiscoveryPacket()
// This is background process that sends the message
go sendDiscoveryPacketTask(sendDiscoveryPacketTrigger)
// This triggers the process
go func() {
for {
sendDiscoveryPacket()
time.Sleep(time.Duration(config.KeepAlive) * time.Second)
if shuttingDown {
break
}
}
}()
} else {
log.Println("standalone mode, I won't register myself")
}
// --------
// REST API
// --------
@ -145,19 +235,20 @@ func main() {
}
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routes
e.GET("/", listHandler)
e.GET("/v1/discovery", getIdentificationHandler)
e.GET("/v1/discoveries", listHandler)
e.POST("/v1/labels", addLabelsHandler)
e.DELETE("/v1/labels", deleteLabelsHandler)
e.GET("/v1/prometheus/:name", prometheusHandler)
if !config.DisableAPI {
e.GET("/", listHandler)
e.GET("/v1/resolve", resolveHandler)
e.GET("/v1/discovery", getIdentificationHandler)
e.GET("/v1/discoveries", listHandler)
e.POST("/v1/labels", addLabelsHandler)
e.DELETE("/v1/labels", deleteLabelsHandler)
e.GET("/v1/prometheus/:name", prometheusHandler)
}
// ------------------------------
// Termination signals processing
// ------------------------------
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
@ -167,13 +258,13 @@ func main() {
if config.Register {
log.Printf("%s signal received, sending goodbye packet\n", sig.String())
sendGoodbyePacket()
time.Sleep(5 * time.Second) // we wait for a few seconds to let background jobs to finish their job
time.Sleep(1 * time.Second) // we wait for a few seconds to let background jobs to finish their job
} else {
log.Printf("%s signal received", sig.String())
}
e.Shutdown(context.TODO())
}(e, config)
// Start server
e.Logger.Error(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
// In most cases this will end expectedly so it doesn't make sense to use the echo's approach to treat this message as an error.
e.Logger.Info(e.Start(config.Host + ":" + strconv.Itoa(int(config.Port))))
}

View File

@ -4,7 +4,7 @@ import (
"strconv"
"strings"
"github.com/rosti-cz/server_lobby/server"
"github.com/by-cx/lobby/server"
)
// [
@ -34,19 +34,19 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
for _, discovery := range discoveries {
port := strconv.Itoa(int(config.NodeExporterPort))
host := discovery.Hostname
hosts := []string{}
var add bool // add to the prometheus output when there is at least one prometheus related label
labels := map[string]string{}
labels := map[string]string{} // These are prometheus labels, not Lobby's labels
for _, label := range discovery.FindLabels("prometheus:" + name + ":") {
for _, label := range discovery.FindLabelsByPrefix("prometheus:" + name + ":") {
trimmed := strings.TrimPrefix(label.String(), "prometheus:"+name+":")
parts := strings.SplitN(trimmed, ":", 2)
if len(parts) == 2 {
if parts[0] == "port" {
port = parts[1]
} else if parts[0] == "host" {
host = parts[1]
hosts = append(hosts, parts[1])
} else {
labels[parts[0]] = parts[1]
}
@ -65,20 +65,23 @@ func preparePrometheusOutput(name string, discoveries []server.Discovery) Promet
}
if add {
// Omit port part if "-" is set
target := host + ":" + port
if port == "-" {
target = host
}
targets := []string{}
for _, host := range hosts {
// Omit port part if "-" is set or port is part of the host
target := host + ":" + port
if strings.Contains(host, ":") || port == "-" {
target = host
}
targets = append(targets, target)
}
service := PrometheusService{
Targets: []string{target},
Targets: targets,
Labels: labels,
}
services = append(services, service)
}
}
return services

47
daemon/prometheus_test.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"testing"
"github.com/by-cx/lobby/server"
"github.com/stretchr/testify/assert"
)
func TestPreparePrometheusOutput(t *testing.T) {
discoveries := []server.Discovery{
{
Hostname: "test.server",
Labels: server.Labels{
// test1
"prometheus:test1:label1:l1",
"prometheus:test1:host:srv1:1234",
"prometheus:test1:host:srv1:1235",
"prometheus:test1:host:srv1",
"prometheus:test1:label2:l2",
// test2
"prometheus:test2:host:srv2",
"prometheus:test2:host:srv2:1235",
"prometheus:test2:host:srv2:1236",
"prometheus:test2:host:srv2:1237",
"prometheus:test2:label1:l3",
},
},
}
services := preparePrometheusOutput("test1", discoveries)
assert.Equal(t, 1, len(services))
assert.Equal(t, 3, len(services[0].Targets))
assert.Contains(t, services[0].Targets, "srv1:9100")
assert.Contains(t, services[0].Targets, "srv1:1234")
assert.Contains(t, services[0].Targets, "srv1:1235")
assert.Contains(t, services[0].Labels["label1"], "l1")
assert.Contains(t, services[0].Labels["label2"], "l2")
services = preparePrometheusOutput("test2", discoveries)
assert.Equal(t, 1, len(services))
assert.Equal(t, 4, len(services[0].Targets))
assert.Contains(t, services[0].Targets, "srv2:9100")
assert.Contains(t, services[0].Targets, "srv2:1235")
assert.Contains(t, services[0].Targets, "srv2:1236")
assert.Contains(t, services[0].Targets, "srv2:1237")
assert.Contains(t, services[0].Labels["label1"], "l3")
}

102
daemon/update.go Normal file
View File

@ -0,0 +1,102 @@
package main
import (
"encoding/json"
"log"
"os/exec"
"strings"
"time"
"github.com/by-cx/lobby/server"
)
// These functions are called when something has changed in the storage
var changeDetectedChannel chan bool = make(chan bool)
var changeDetected bool
// changeCatcherLoop waits for a change signal and switches variable that says if the callback should run or not
func changeCatcherLoop() {
for {
<-changeDetectedChannel
changeDetected = true
}
}
// discoveryChangeLoop is used to process dynamic configuration changes.
// When there is a change detected a shell script or given process is triggered
// which does some operations with the new data. Usually it generates the
// configuration.
// This function has internal loop and won't allow to run the command more
// often than it's the configured amount of time. That prevents
func discoveryChangeLoop() {
// This other loop tics in strict intervals and prevents the callback script to run more often than it's configured
var cmd *exec.Cmd
// Delay first run of the callback script a little so everything can set up
log.Printf("Delaying start of discovery change loop (%d seconds)\n", config.CallbackFirstRunDelay)
time.Sleep(time.Duration(config.CallbackFirstRunDelay) * time.Second)
log.Println("Starting discovery change loop")
for {
if changeDetected {
// We switch this at the beginning so we can detect new changes while the callback script is running
changeDetected = false
log.Println("Running callback function")
// TODO: this is not the best way
callbackCommandSlice := strings.Split(config.Callback, " ")
if len(callbackCommandSlice) == 1 {
cmd = exec.Command(callbackCommandSlice[0])
} else if len(callbackCommandSlice) > 1 {
cmd = exec.Command(callbackCommandSlice[0], callbackCommandSlice[1:]...)
} else {
log.Println("wrong number of parts of the callback command")
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
continue
}
stdin, err := cmd.StdinPipe()
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
discoveriesJSON, err := json.Marshal(discoveryStorage.GetAll())
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
_, err = stdin.Write([]byte(discoveriesJSON))
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
err = stdin.Close()
if err != nil {
log.Println("stdin writing error: ", err.Error())
continue
}
stdout, err := cmd.CombinedOutput()
if err != nil {
log.Println("Callback error: ", err.Error())
}
log.Println("Callback output: ", string(stdout))
}
time.Sleep(time.Duration(config.CallbackCooldown) * time.Second)
}
}
// discoveryChange is called when daemon detects that a newly arrived discovery
// packet is somehow different than the localone. This can be used to trigger
// some action in the local machine.
func discoveryChange(discovery server.Discovery) error {
if len(config.Callback) > 0 {
changeDetectedChannel <- true
}
return nil
}

8
go.mod
View File

@ -1,18 +1,22 @@
module github.com/rosti-cz/server_lobby
module github.com/by-cx/lobby
go 1.16
require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/fatih/color v1.12.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-resty/resty/v2 v2.6.0
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.5
github.com/kelseyhightower/envconfig v1.4.0
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
github.com/nats-io/nats-server/v2 v2.4.0 // indirect
github.com/nats-io/nats.go v1.12.0
github.com/onsi/gomega v1.16.0 // indirect
github.com/shirou/gopsutil/v3 v3.21.7
github.com/stretchr/testify v1.7.0
github.com/valyala/fasttemplate v1.2.1 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
google.golang.org/protobuf v1.27.1 // indirect
)

73
go.sum
View File

@ -1,13 +1,23 @@
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4=
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -21,7 +31,9 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
@ -30,11 +42,13 @@ github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8
github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s=
github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
@ -50,12 +64,24 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shirou/gopsutil/v3 v3.21.7 h1:PnTqQamUjwEDSgn+nBGu0qSDV/CfvyiR/gwTH3i7HTU=
github.com/shirou/gopsutil/v3 v3.21.7/go.mod h1:RGl11Y7XMTQPmHh8F0ayC6haKNBgH4PXMJuTAcMOlz4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tklauser/go-sysconf v0.3.7 h1:HT7h4+536gjqeq1ZIJPgOl1rg1XFatQGVZWp7Py53eg=
@ -67,34 +93,64 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@ -107,6 +163,13 @@ google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+Rur
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -3,12 +3,16 @@ package nats_driver
import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/server"
"github.com/nats-io/nats.go"
"github.com/rosti-cz/server_lobby/common"
"github.com/rosti-cz/server_lobby/server"
)
// NATS drivers is used to send discovery packet to other nodes into the group via NATS messenging protocol.
type Driver struct {
NATSUrl string
NATSDiscoveryChannel string
@ -51,32 +55,41 @@ func (d *Driver) Init() error {
return fmt.Errorf("please initiate LogChannel variable")
}
nc, err := nats.Connect(d.NATSUrl)
if err != nil {
return err
for {
nc, err := nats.Connect(d.NATSUrl)
if err != nil {
log.Printf("Can't connect to the NATS server, waiting for 5 seconds before I try it again. (%v)\n", err)
time.Sleep(time.Second * 5)
continue
}
d.nc = nc
break
}
d.nc = nc
_, err = nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
_, err := d.nc.Subscribe(d.NATSDiscoveryChannel, d.handler)
if err != nil {
return err
return fmt.Errorf("subscribe error: %v", err)
}
return nil
}
// Close is called when all is done.
func (d *Driver) Close() error {
return d.nc.Drain()
}
// RegisterSubscribeFunction sets the function that will process the incoming messages
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
d.subscribeListener = listener
}
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
d.unsubscribeListener = listener
}
// SendDiscoveryPacket send discovery packet to the group.
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
@ -88,12 +101,20 @@ func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.nc.Publish(d.NATSDiscoveryChannel, data)
if err != nil {
// In case the connection is down we will try to reconnect
if err != nil && strings.Contains(err.Error(), "connection closed") {
d.nc.Close()
err = d.Init()
if err != nil {
return fmt.Errorf("sending discovery reconnect error: %v", err)
}
} else if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,

View File

@ -3,7 +3,7 @@ package nats_driver
import (
"encoding/json"
"github.com/rosti-cz/server_lobby/server"
"github.com/by-cx/lobby/server"
)
// discoveryEnvelope adds a message to the standard discovery format. The message

142
redis_driver/main.go Normal file
View File

@ -0,0 +1,142 @@
package redis_driver
import (
"encoding/json"
"github.com/by-cx/lobby/common"
"github.com/by-cx/lobby/server"
"github.com/go-redis/redis"
"fmt"
)
// Redis drivers is used to send discovery packet to other nodes into the group via Redis's pubsub protocol.
type Driver struct {
Host string
Port uint
Password string
Channel string
DB uint
LogChannel chan string
subscribeListener common.Listener
unsubscribeListener common.Listener
redis *redis.Client
}
// handler is called asynchronously so and because it cannot log directly to stderr there
// is channel called LogChannel that can be used to log error messages from this
func (d *Driver) handler(payload string) {
message := discoveryEnvelope{}
err := json.Unmarshal([]byte(payload), &message)
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("decoding message error: %v", err).Error()
}
err = message.Discovery.Validate()
if err != nil && d.LogChannel != nil {
d.LogChannel <- fmt.Errorf("validation error: %v", err).Error()
}
if message.Message == "hi" {
d.subscribeListener(message.Discovery)
} else if message.Message == "goodbye" {
d.unsubscribeListener(message.Discovery)
} else {
if d.LogChannel != nil {
d.LogChannel <- "incompatible message"
}
}
}
// Init connect to Redis, subscribes to the channel and waits for the messages.
// It runs goroutine in background that listens for new messages.
func (d *Driver) Init() error {
if d.LogChannel == nil {
return fmt.Errorf("please initiate LogChannel variable")
}
if len(d.Host) == 0 {
return fmt.Errorf("parameter Host cannot be empty")
}
if len(d.Channel) == 0 {
return fmt.Errorf("pattern cannot be empty")
}
if d.Port <= 0 || d.Port > 65536 {
return fmt.Errorf("port has to be in range of 0-65536")
}
d.redis = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", d.Host, d.Port),
Password: d.Password,
DB: int(d.DB),
})
pubsub := d.redis.Subscribe(d.Channel)
go func(pubsub *redis.PubSub, d *Driver) {
channel := pubsub.Channel()
for message := range channel {
d.handler(message.Payload)
}
}(pubsub, d)
return nil
}
// Close is called when all is done.
func (d *Driver) Close() error {
return d.redis.Close()
}
// RegisterSubscribeFunction sets the function that will process the incoming messages
func (d *Driver) RegisterSubscribeFunction(listener common.Listener) {
d.subscribeListener = listener
}
// RegisterUnsubscribeFunction sets the function that will process the goodbye incoming messages
func (d *Driver) RegisterUnsubscribeFunction(listener common.Listener) {
d.unsubscribeListener = listener
}
// SendDiscoveryPacket send discovery packet to the group.
func (d *Driver) SendDiscoveryPacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "hi",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.redis.Publish(d.Channel, data).Err()
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}
// SendGoodbyePacket deregister node from the group. It tells everybody that it's going to die.
func (d *Driver) SendGoodbyePacket(discovery server.Discovery) error {
envelope := discoveryEnvelope{
Discovery: discovery,
Message: "goodbye",
}
data, err := envelope.Bytes()
if err != nil {
return fmt.Errorf("sending discovery formating message error: %v", err)
}
err = d.redis.Publish(d.Channel, data).Err()
if err != nil {
return fmt.Errorf("sending discovery error: %v", err)
}
return nil
}

20
redis_driver/types.go Normal file
View File

@ -0,0 +1,20 @@
package redis_driver
import (
"encoding/json"
"github.com/by-cx/lobby/server"
)
// discoveryEnvelope adds a message to the standard discovery format. The message
// can be "hi" or "goodbye" where "hi" is used when the node is sending keep alive
// packets and "goodbye" means the node is leaving.
type discoveryEnvelope struct {
Discovery server.Discovery `json:"discovery"`
Message string `json:"message"` // can be hi or goodbye
}
func (e *discoveryEnvelope) Bytes() ([]byte, error) {
body, err := json.Marshal(e)
return body, err
}

View File

@ -3,6 +3,7 @@ package server
import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)
@ -44,9 +45,9 @@ func (d *Discovery) Bytes() ([]byte, error) {
return data, err
}
// FindLabels returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
// FindLabelsByPrefix returns list of labels with given prefix. For example "service:ns" has prefix "service" or "service:".
// It doesn't have to be prefix, but for example "service:test" will match "service:test" and also "service:test2".
func (d *Discovery) FindLabels(prefix string) Labels {
func (d *Discovery) FindLabelsByPrefix(prefix string) Labels {
labels := Labels{}
for _, label := range d.Labels {
if strings.HasPrefix(label.String(), prefix) {
@ -56,6 +57,18 @@ func (d *Discovery) FindLabels(prefix string) Labels {
return labels
}
func (d *Discovery) SortLabels() {
labelStrings := d.Labels.StringSlice()
sort.Strings(labelStrings)
labels := Labels{}
for _, label := range labelStrings {
labels = append(labels, Label(label))
}
d.Labels = labels
}
// -----------------
// Discovery storage
// -----------------
@ -148,25 +161,67 @@ func (d *Discoveries) GetAll() []Discovery {
return d.activeServers
}
// Filter returns list of discoveries based on given labels
func (d *Discoveries) Filter(labelsFilter []string) []Discovery {
newSet := []Discovery{}
var found bool
if len(labelsFilter) > 0 {
for _, discovery := range d.activeServers {
newDiscovery := discovery
newDiscovery.Labels = Labels{}
found = false
for _, label := range discovery.Labels {
for _, labelFilter := range labelsFilter {
if label.String() == labelFilter {
newSet = append(newSet, discovery)
found = true
newDiscovery.Labels = append(newDiscovery.Labels, label)
break
}
}
if found {
break
}
if found {
newSet = append(newSet, newDiscovery)
}
}
}
return newSet
}
// Filter returns list of discoveries based on given label prefixes.
func (d *Discoveries) FilterPrefix(prefixes []string) []Discovery {
newSet := []Discovery{}
var found bool
if len(prefixes) > 0 {
for _, discovery := range d.activeServers {
newDiscovery := discovery
newDiscovery.Labels = Labels{}
found = false
if found {
newSet = append(newSet, newDiscovery)
}
for _, label := range discovery.Labels {
for _, prefix := range prefixes {
if strings.HasPrefix(label.String(), prefix) {
found = true
newDiscovery.Labels = append(newDiscovery.Labels, label)
break
}
}
}
if found {
newSet = append(newSet, newDiscovery)
}
}
}

View File

@ -27,7 +27,7 @@ func TestDiscovery(t *testing.T) {
assert.False(t, discovery.IsAlive(), "discovery not suppose to be alive")
discovery.LastCheck = now
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabels("service"))
assert.Equal(t, Labels{Label("service:test")}, discovery.FindLabelsByPrefix("service"))
assert.Equal(t, nil, discovery.Validate()) // TODO: This needs more love
content, err := json.Marshal(&discovery)

View File

@ -138,6 +138,7 @@ func (l *LocalHost) GetIdentification() (Discovery, error) {
}
discovery.Labels = append(l.InitialLabels, localLabels...)
discovery.SortLabels()
return discovery, nil
}

View File

@ -32,7 +32,7 @@ func TestGetIdentification(t *testing.T) {
discovery, err = localHost.GetIdentification()
assert.Nil(t, err)
assert.Equal(t, Label("public_ip:1.2.3.4"), discovery.Labels[3])
assert.Equal(t, Label("test:1"), discovery.Labels[3])
os.RemoveAll(tmpPath)
}

12
server/tools.go Normal file
View File

@ -0,0 +1,12 @@
package server
import "github.com/google/go-cmp/cmp"
// Compare compares discovery A and B and returns true if those two are different.
func Compare(discoveryA, discoveryB Discovery) bool {
discoveryA.LastCheck = 0
discoveryB.LastCheck = 0
discoveryA.TTL = 0
discoveryB.TTL = 0
return !cmp.Equal(discoveryA, discoveryB)
}

45
server/tools_test.go Normal file
View File

@ -0,0 +1,45 @@
package server
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCompare(t *testing.T) {
discoveryA := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label1"),
},
LastCheck: 52,
}
discoveryAB := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label1"),
},
LastCheck: 56,
}
discoveryB := Discovery{
Hostname: "efgh.com",
Labels: Labels{
Label("label2"),
},
LastCheck: 56,
}
discoveryC := Discovery{
Hostname: "abcd.com",
Labels: Labels{
Label("label2"),
},
LastCheck: 60,
}
assert.True(t, Compare(discoveryA, discoveryB))
assert.True(t, Compare(discoveryB, discoveryC))
assert.True(t, Compare(discoveryA, discoveryC)) // Test different labels and same hostname
assert.False(t, Compare(discoveryA, discoveryA))
assert.False(t, Compare(discoveryB, discoveryB))
assert.False(t, Compare(discoveryA, discoveryAB)) // Test that last check is zeroed
}

View File

@ -1,5 +1,7 @@
package server
import "strings"
// Label keeps one piece of information about a single server
type Label string
@ -7,6 +9,34 @@ func (l Label) String() string {
return string(l)
}
// GetPart returns specific part of the label, if part index is higher than last available index it returns empty string.
func (l Label) GetPart(idx int) string {
parts := strings.Split(l.String(), ":")
if idx < 0 {
return ""
}
if len(parts) >= idx {
return ""
}
return parts[idx]
}
// GetPart1 is exactly same as GetPart but it splits the label only once, this is good for IPv6 addresses
func (l Label) GetPart1(idx int) string {
parts := strings.SplitN(l.String(), ":", 2)
if idx < 0 {
return ""
}
if len(parts) >= idx {
return ""
}
return parts[idx]
}
// Labels stores multiple Label records
type Labels []Label

38
templater/main.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"flag"
"fmt"
"os"
)
// Templater is used to generate config files from predefined
// templates based on content of gathered discovery packets.
// It can for example configure Nginx's backend or database
// replication.
//
// It reads templates from /var/lib/lobby/templates (default)
// which are YAML files cotaining the template itself and command(s)
// that needs to be run when the template changes.
const defaultTemplatesPath = "/var/lib/lobby/templates"
var templatesPath *string
func init() {
templatesPath = flag.String("templates-path", defaultTemplatesPath, "path of where templates are stored")
flag.Parse()
err := os.MkdirAll(*templatesPath, 0750)
if err != nil {
fmt.Println(err)
flag.Usage()
os.Exit(1)
}
}
func main() {
fmt.Println(*templatesPath)
}