Fixes
continuous-integration/drone/push Build is passing Details

*Fix get action
*Taking common out of modules
* initial glue tests
* dynamic docker sock for testing
* Stats rewroked to be more encapsulated
* Fix of stats gathering in background
This commit is contained in:
Adam Štrauch 2022-02-07 17:00:16 +01:00
parent 722d2a1c86
commit 676ddf2136
Signed by: cx
GPG Key ID: 018304FFA8988F8D
11 changed files with 323 additions and 91 deletions

View File

@ -3,7 +3,7 @@ test:
go test -v apps/*.go
go test -v apps/drivers/*.go
go test -v detector/*.go
# go test -v containers/*.go # Doesn't work in Drone right now
# env DOCKER_SOCKET="unix:///var/run/docker.sock" go test -v containers/*.go # Doesn't work in Drone right now
build:
#podman run --rm --privileged -ti -v ${shell pwd}:/srv docker.io/library/golang:1.14-stretch /bin/sh -c "cd /srv && go build"

View File

@ -5,8 +5,6 @@ import (
"strings"
"time"
// This is line from GORM documentation that imports database dialect
_ "github.com/jinzhu/gorm/dialects/sqlite"
"github.com/rosti-cz/node-api/detector"
)
@ -55,6 +53,8 @@ type App struct {
// Datetime of deletion
DeletedAt *time.Time `sql:"index" json:"deleted_at"`
// ####################################################
// This part is used in app template
// Name of the application
// Example: test_1234
Name string `json:"name" gorm:"unique,index,not_null"`
@ -74,6 +74,7 @@ type App struct {
Memory int `json:"memory"` // Limit in MB
// Custom labels
Labels []Label `json:"labels" gorm:"foreignkey:AppID"` // username:cx or user_id:1
// ####################################################
// Current status of the application (underlaying container)
State string `json:"state"`

View File

@ -1,16 +1,24 @@
package containers
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func getTestDockerSock() string {
dockerSocket := os.Getenv("DOCKER_SOCKET")
if dockerSocket == "" {
return "unix:///run/user/1000/podman/podman.sock"
}
return dockerSocket
}
func TestGetProcesses(t *testing.T) {
driver := Driver{
//DockerSock: "unix:///run/user/1000/podman/podman.sock",
DockerSock: "unix:///var/run/docker.sock",
DockerSock: getTestDockerSock(),
BindIPHTTP: "127.0.0.1",
BindIPSSH: "127.0.0.1",
}

View File

@ -20,6 +20,7 @@ type Processor struct {
AppName string
DB *gorm.DB
SnapshotProcessor *apps.SnapshotProcessor
NodeProcessor *node.Processor
WaitForAppLoops uint // each loop is five seconds
DockerSock string
BindIPHTTP string
@ -31,9 +32,7 @@ type Processor struct {
func (p *Processor) getContainer() (containers.Container, error) {
container := containers.Container{}
processor := apps.AppsProcessor{
DB: p.DB,
}
processor := p.getAppProcessor()
app, err := processor.Get(p.AppName)
if err != nil {
return container, err
@ -52,9 +51,11 @@ func (p *Processor) getContainer() (containers.Container, error) {
// returns instance of getAppProcessor
func (p *Processor) getAppProcessor() apps.AppsProcessor {
return apps.AppsProcessor{
processor := apps.AppsProcessor{
DB: p.DB,
}
processor.Init()
return processor
}
// waits until app is ready
@ -65,8 +66,12 @@ func (p *Processor) waitForApp() error {
loops = int(p.WaitForAppLoops)
}
statsProcessor := StatsProcessor{
DB: p.DB,
}
for i := 0; i < loops; i++ {
err := updateState(p.AppName)
err := statsProcessor.UpdateState(p.AppName)
if err != nil {
time.Sleep(sleepFor)
continue
@ -95,14 +100,16 @@ func (p *Processor) waitForApp() error {
func (p *Processor) List() (apps.Apps, error) {
appList := apps.Apps{}
err := gatherStates()
statsProcessor := StatsProcessor{
DB: p.DB,
}
err := statsProcessor.GatherStates()
if err != nil {
return appList, fmt.Errorf("backend error: %v", err)
}
processor := apps.AppsProcessor{
DB: p.DB,
}
processor := p.getAppProcessor()
appList, err = processor.List()
if err != nil {
@ -116,14 +123,16 @@ func (p *Processor) List() (apps.Apps, error) {
func (p *Processor) Get() (apps.App, error) {
app := apps.App{}
err := updateState(p.AppName)
statsProcessor := StatsProcessor{
DB: p.DB,
}
err := statsProcessor.UpdateState(p.AppName)
if err != nil {
return app, err
}
processor := apps.AppsProcessor{
DB: p.DB,
}
processor := p.getAppProcessor()
app, err = processor.Get(p.AppName)
if err != nil {
return app, err
@ -206,9 +215,7 @@ func (p *Processor) Create(appTemplate apps.App) error {
// Register registers app without creating a container for it
// Returns app name and an error
func (p *Processor) Register(appTemplate apps.App) error {
processor := apps.AppsProcessor{
DB: p.DB,
}
processor := p.getAppProcessor()
err := processor.New(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
@ -222,9 +229,7 @@ func (p *Processor) Register(appTemplate apps.App) error {
// Update updates application
func (p *Processor) Update(appTemplate apps.App) error {
processor := apps.AppsProcessor{
DB: p.DB,
}
processor := p.getAppProcessor()
app, err := processor.Update(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
@ -265,9 +270,7 @@ func (p *Processor) Update(appTemplate apps.App) error {
// Delete removes app from the system
func (p *Processor) Delete() error {
processor := apps.AppsProcessor{
DB: p.DB,
}
processor := p.getAppProcessor()
container, err := p.getContainer()
if err != nil {
log.Println("ERROR: delete app:", err.Error())
@ -486,7 +489,7 @@ func (p *Processor) RemoveLabel(label string) error {
// GetNote returns node's info
func (p *Processor) GetNode() (*node.Node, error) {
node, err := node.GetNodeInfo()
node, err := p.NodeProcessor.GetNodeInfo()
if err != nil {
return nil, err
}

194
glue/main_test.go Normal file
View File

@ -0,0 +1,194 @@
package glue
import (
"fmt"
"log"
"os"
"path"
"testing"
"github.com/jinzhu/gorm"
"github.com/rosti-cz/node-api/apps"
"github.com/stretchr/testify/assert"
// This is line from GORM documentation that imports database dialect
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
const testDBPath = "file::memory:?cache=shared"
var testAppTemplate apps.App = apps.App{
Name: "test_1234",
SSHPort: 10000,
HTTPPort: 10001,
Image: "harbor.hq.rosti.cz/public/runtime:2022.01-1",
CPU: 50,
Memory: 256,
}
func getPWD() string {
dir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}
return dir
}
func testDB() *gorm.DB {
db, err := gorm.Open("sqlite3", testDBPath)
if err != nil {
log.Fatalln(err)
}
return db
}
func getTestDockerSock() string {
dockerSocket := os.Getenv("DOCKER_SOCKET")
if dockerSocket == "" {
return "unix:///run/user/1000/podman/podman.sock"
}
return dockerSocket
}
var processor Processor
func TestMain(m *testing.M) {
// This processor is used to test all methods
processor = Processor{
AppName: testAppTemplate.Name,
DB: testDB(),
// SnapshotProcessor *apps.SnapshotProcessor
DockerSock: getTestDockerSock(),
BindIPHTTP: "127.0.0.1",
BindIPSSH: "127.0.0.1",
AppsPath: path.Join(getPWD(), "tmp/apps"),
}
exitVal := m.Run()
// We don't care about output here because we can't do anything about it
//! If testing fails and this won't be performed you have to remove the test container manually
fmt.Println("Removing test container")
processor.Delete()
os.Exit(exitVal)
}
func TestProcessorCreate(t *testing.T) {
err := processor.Create(testAppTemplate)
assert.Nil(t, err)
processor.Delete()
assert.Nil(t, err)
}
func TestProcessorList(t *testing.T) {
}
func TestProcessorGet(t *testing.T) {
err := processor.Create(testAppTemplate)
assert.Nil(t, err)
app, err := processor.Get()
assert.Nil(t, err)
fmt.Println("State", app.State)
processor.Delete()
assert.Nil(t, err)
}
func TestProcessorRegister(t *testing.T) {
}
func TestProcessorUpdate(t *testing.T) {
}
func TestProcessorDelete(t *testing.T) {
err := processor.Create(testAppTemplate)
assert.Nil(t, err)
processor.Delete()
assert.Nil(t, err)
}
func TestProcessorStop(t *testing.T) {
}
func TestProcessorStart(t *testing.T) {
}
func TestProcessorRestart(t *testing.T) {
}
func TestProcessorUpdateKeys(t *testing.T) {
}
func TestProcessorSetPassword(t *testing.T) {
}
func TestProcessorProcesses(t *testing.T) {
}
func TestProcessorEnableTech(t *testing.T) {
}
func TestProcessorRebuild(t *testing.T) {
}
func TestProcessorAddLabel(t *testing.T) {
}
func TestProcessorRemoveLabel(t *testing.T) {
}
func TestProcessorGetNode(t *testing.T) {
}
func TestProcessorCreateSnapshot(t *testing.T) {
}
func TestProcessorRestoreFromSnapshot(t *testing.T) {
}
func TestProcessorListSnapshots(t *testing.T) {
}
func TestProcessorListAppsSnapshots(t *testing.T) {
}
func TestProcessorListSnapshotsByLabel(t *testing.T) {
}
func TestProcessorGetSnapshot(t *testing.T) {
}
func TestProcessorGetSnapshotDownloadLink(t *testing.T) {
}
func TestProcessorDeleteSnapshot(t *testing.T) {
}
func TestProcessorDeleteAppSnapshots(t *testing.T) {
}

View File

@ -3,16 +3,29 @@ package glue
import (
"log"
"github.com/jinzhu/gorm"
"github.com/rosti-cz/node-api/apps"
"github.com/rosti-cz/node-api/common"
docker "github.com/rosti-cz/node-api/containers"
)
// updateUsage updates various resource usage of the container/app in the database
func updateUsage(name string) error {
// StatsProcessor covers all methods that are needed
// to gather information about application containers.
type StatsProcessor struct {
DB *gorm.DB
}
// returns instance of getAppProcessor
func (s *StatsProcessor) getAppProcessor() apps.AppsProcessor {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
DB: s.DB,
}
processor.Init()
return processor
}
// updateUsage updates various resource usage of the container/app in the database
func (s *StatsProcessor) UpdateUsage(name string) error {
processor := s.getAppProcessor()
app, err := processor.Get(name)
if err != nil {
@ -42,10 +55,8 @@ func updateUsage(name string) error {
}
// Updates only container's state. Check current status of the container and saves it into the database.
func updateState(name string) error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
func (s *StatsProcessor) UpdateState(name string) error {
processor := s.getAppProcessor()
app, err := processor.Get(name)
if err != nil {
return err
@ -67,17 +78,15 @@ func updateState(name string) error {
}
// gatherStats loops over all applications and calls updateUsage to write various metric into the database.
func gatherStats() error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
func (s *StatsProcessor) GatherStats() error {
processor := s.getAppProcessor()
appList, err := processor.List()
if err != nil {
return err
}
for _, app := range appList {
err := updateUsage(app.Name)
err := s.UpdateUsage(app.Name)
if err != nil {
log.Println("STATS ERROR:", err.Error())
}
@ -87,17 +96,15 @@ func gatherStats() error {
}
// gatherStates loops over all apps and updates their container state
func gatherStates() error {
processor := apps.AppsProcessor{
DB: common.GetDBConnection(),
}
func (s *StatsProcessor) GatherStates() error {
processor := s.getAppProcessor()
appList, err := processor.List()
if err != nil {
return err
}
for _, app := range appList {
err := updateState(app.Name)
err := s.UpdateState(app.Name)
if err != nil {
log.Println("STATE ERROR:", err.Error())
}

View File

@ -364,11 +364,12 @@ func getOrphansHander(c echo.Context) error {
// Return info about the node including performance index
func getNodeInfoHandler(c echo.Context) error {
processor := glue.Processor{
DB: common.GetDBConnection(),
DockerSock: config.DockerSocket,
BindIPHTTP: config.AppsBindIPHTTP,
BindIPSSH: config.AppsBindIPSSH,
AppsPath: config.AppsPath,
DB: common.GetDBConnection(),
DockerSock: config.DockerSocket,
BindIPHTTP: config.AppsBindIPHTTP,
BindIPSSH: config.AppsBindIPSSH,
AppsPath: config.AppsPath,
NodeProcessor: &nodeProcessor,
}
node, err := processor.GetNode()
if err != nil {
@ -404,11 +405,12 @@ func metricsHandler(c echo.Context) error {
// Node indexes
processor := glue.Processor{
DB: common.GetDBConnection(),
DockerSock: config.DockerSocket,
BindIPHTTP: config.AppsBindIPHTTP,
BindIPSSH: config.AppsBindIPSSH,
AppsPath: config.AppsPath,
DB: common.GetDBConnection(),
DockerSock: config.DockerSocket,
BindIPHTTP: config.AppsBindIPHTTP,
BindIPSSH: config.AppsBindIPSSH,
AppsPath: config.AppsPath,
NodeProcessor: &nodeProcessor,
}
node, err := processor.GetNode()
if err != nil {

View File

@ -126,7 +126,12 @@ func listEventHandler(m *nats.Msg, message *RequestMessage) error {
// Returns one app
func getEventHandler(m *nats.Msg, message *RequestMessage) error {
processor := glue.Processor{
DB: common.GetDBConnection(),
AppName: message.AppName,
DB: common.GetDBConnection(),
DockerSock: config.DockerSocket,
BindIPHTTP: config.AppsBindIPHTTP,
BindIPSSH: config.AppsBindIPSSH,
AppsPath: config.AppsPath,
}
app, err := processor.Get()
@ -597,6 +602,7 @@ func getNodeEventHandler(m *nats.Msg, message *RequestMessage) error {
BindIPHTTP: config.AppsBindIPHTTP,
BindIPSSH: config.AppsBindIPSSH,
AppsPath: config.AppsPath,
NodeProcessor: &nodeProcessor,
}
node, err := processor.GetNode()

38
main.go
View File

@ -10,6 +10,7 @@ import (
"github.com/rosti-cz/node-api/apps"
"github.com/rosti-cz/node-api/apps/drivers"
"github.com/rosti-cz/node-api/common"
"github.com/rosti-cz/node-api/glue"
"github.com/rosti-cz/node-api/node"
)
@ -19,6 +20,7 @@ const JSONIndent = " "
var config common.Config
var nc *nats.Conn
var snapshotProcessor apps.SnapshotProcessor
var nodeProcessor node.Processor
func _init() {
var err error
@ -52,6 +54,10 @@ func _init() {
Bucket: config.SnapshotsS3Bucket,
},
}
nodeProcessor = node.Processor{
DB: common.GetDBConnection(),
}
}
func main() {
@ -66,24 +72,28 @@ func main() {
t := &Template{}
// Stats loop
// go func() {
// for {
// log.Println("Stats gathering started")
// start := time.Now()
// err := gatherStats()
// if err != nil {
// log.Println("LOOP ERROR:", err.Error())
// }
// elapsed := time.Since(start)
// log.Printf("Stats gathering elapsed time: %.2fs\n", elapsed.Seconds())
// time.Sleep(300 * time.Second)
// }
// }()
go func() {
statsProcessor := glue.StatsProcessor{
DB: common.GetDBConnection(),
}
for {
log.Println("Stats gathering started")
start := time.Now()
err := statsProcessor.GatherStats()
if err != nil {
log.Println("LOOP ERROR:", err.Error())
}
elapsed := time.Since(start)
log.Printf("Stats gathering elapsed time: %.2fs\n", elapsed.Seconds())
time.Sleep(300 * time.Second)
}
}()
// Node stats
go func() {
for {
err := node.Log()
err := nodeProcessor.Log()
if err != nil {
log.Println("NODE PERFORMANCE LOG ERROR:", err.Error())
}

View File

@ -1,23 +1,33 @@
package node
import (
"github.com/rosti-cz/node-api/common"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem"
"github.com/jinzhu/gorm"
)
const history = 72 * 3600 / 300 // 3 days, one record every five minutes
func init() {
db := common.GetDBConnection()
db.AutoMigrate(PerformanceLog{})
// Processor covers Node related methods for monitoring and calculating performance indexes.
type Processor struct {
DB *gorm.DB
}
func (p *Processor) Init() {
p.DB.AutoMigrate(PerformanceLog{})
}
// GetNodeInfo returns information about this node
func (p *Processor) GetNodeInfo() (*Node, error) {
node, err := p.Index()
return node, err
}
// Log creates a record for all important metrics used as
func Log() error {
func (p *Processor) Log() error {
performanceLog := PerformanceLog{}
// Load
@ -45,8 +55,7 @@ func Log() error {
performanceLog.DiskSpaceUsage = diskUsage.UsedPercent / 100.0
// Save
db := common.GetDBConnection()
err = db.Create(&performanceLog).Error
err = p.DB.Create(&performanceLog).Error
if err != nil {
return err
}
@ -54,13 +63,13 @@ func Log() error {
// and clean
// we have to use this stupid approach because DELETE doesn't support ORDER BY and LIMIT
toDeleteLogs := []PerformanceLog{}
err = db.Order("id DESC").Limit("99").Offset(history).Find(&toDeleteLogs).Error
err = p.DB.Order("id DESC").Limit("99").Offset(history).Find(&toDeleteLogs).Error
if err != nil {
return err
}
for _, toDeleteLog := range toDeleteLogs {
err = db.Delete(&toDeleteLog).Error
err = p.DB.Delete(&toDeleteLog).Error
if err != nil {
return err
}
@ -71,7 +80,7 @@ func Log() error {
// Index returns number from 0 to 1 where 0 means least loaded and 1 maximally loaded.
// It uses history of last 72 hours
func index() (*Node, error) {
func (p *Processor) Index() (*Node, error) {
node := Node{
Index: 1.0,
}
@ -82,11 +91,9 @@ func index() (*Node, error) {
return &node, err
}
db := common.GetDBConnection()
logs := []PerformanceLog{}
err = db.Find(&logs).Error
err = p.DB.Find(&logs).Error
if err != nil {
return &node, err
}

View File

@ -1,7 +1 @@
package node
// GetNodeInfo returns information about this node
func GetNodeInfo() (*Node, error) {
node, err := index()
return node, err
}