From 676ddf21367cd0f82fac41b80f20dc032cb03199 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20=C5=A0trauch?= Date: Mon, 7 Feb 2022 17:00:16 +0100 Subject: [PATCH] Fixes *Fix get action *Taking common out of modules * initial glue tests * dynamic docker sock for testing * Stats rewroked to be more encapsulated * Fix of stats gathering in background --- Makefile | 2 +- apps/types.go | 5 +- containers/docker_test.go | 12 ++- glue/main.go | 49 +++++----- glue/main_test.go | 194 ++++++++++++++++++++++++++++++++++++++ glue/stats.go | 43 +++++---- handlers.go | 22 +++-- handlers_nats.go | 8 +- main.go | 38 +++++--- node/load.go | 35 ++++--- node/main.go | 6 -- 11 files changed, 323 insertions(+), 91 deletions(-) create mode 100644 glue/main_test.go diff --git a/Makefile b/Makefile index 0c145ff..f7dbe2b 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ test: go test -v apps/*.go go test -v apps/drivers/*.go go test -v detector/*.go - # go test -v containers/*.go # Doesn't work in Drone right now + # env DOCKER_SOCKET="unix:///var/run/docker.sock" go test -v containers/*.go # Doesn't work in Drone right now build: #podman run --rm --privileged -ti -v ${shell pwd}:/srv docker.io/library/golang:1.14-stretch /bin/sh -c "cd /srv && go build" diff --git a/apps/types.go b/apps/types.go index 6af81f3..f5d7191 100644 --- a/apps/types.go +++ b/apps/types.go @@ -5,8 +5,6 @@ import ( "strings" "time" - // This is line from GORM documentation that imports database dialect - _ "github.com/jinzhu/gorm/dialects/sqlite" "github.com/rosti-cz/node-api/detector" ) @@ -55,6 +53,8 @@ type App struct { // Datetime of deletion DeletedAt *time.Time `sql:"index" json:"deleted_at"` + // #################################################### + // This part is used in app template // Name of the application // Example: test_1234 Name string `json:"name" gorm:"unique,index,not_null"` @@ -74,6 +74,7 @@ type App struct { Memory int `json:"memory"` // Limit in MB // Custom labels Labels []Label `json:"labels" gorm:"foreignkey:AppID"` // username:cx or user_id:1 + // #################################################### // Current status of the application (underlaying container) State string `json:"state"` diff --git a/containers/docker_test.go b/containers/docker_test.go index 38f985c..d7bd044 100644 --- a/containers/docker_test.go +++ b/containers/docker_test.go @@ -1,16 +1,24 @@ package containers import ( + "os" "testing" "time" "github.com/stretchr/testify/assert" ) +func getTestDockerSock() string { + dockerSocket := os.Getenv("DOCKER_SOCKET") + if dockerSocket == "" { + return "unix:///run/user/1000/podman/podman.sock" + } + return dockerSocket +} + func TestGetProcesses(t *testing.T) { driver := Driver{ - //DockerSock: "unix:///run/user/1000/podman/podman.sock", - DockerSock: "unix:///var/run/docker.sock", + DockerSock: getTestDockerSock(), BindIPHTTP: "127.0.0.1", BindIPSSH: "127.0.0.1", } diff --git a/glue/main.go b/glue/main.go index 3973b35..dceed95 100644 --- a/glue/main.go +++ b/glue/main.go @@ -20,6 +20,7 @@ type Processor struct { AppName string DB *gorm.DB SnapshotProcessor *apps.SnapshotProcessor + NodeProcessor *node.Processor WaitForAppLoops uint // each loop is five seconds DockerSock string BindIPHTTP string @@ -31,9 +32,7 @@ type Processor struct { func (p *Processor) getContainer() (containers.Container, error) { container := containers.Container{} - processor := apps.AppsProcessor{ - DB: p.DB, - } + processor := p.getAppProcessor() app, err := processor.Get(p.AppName) if err != nil { return container, err @@ -52,9 +51,11 @@ func (p *Processor) getContainer() (containers.Container, error) { // returns instance of getAppProcessor func (p *Processor) getAppProcessor() apps.AppsProcessor { - return apps.AppsProcessor{ + processor := apps.AppsProcessor{ DB: p.DB, } + processor.Init() + return processor } // waits until app is ready @@ -65,8 +66,12 @@ func (p *Processor) waitForApp() error { loops = int(p.WaitForAppLoops) } + statsProcessor := StatsProcessor{ + DB: p.DB, + } + for i := 0; i < loops; i++ { - err := updateState(p.AppName) + err := statsProcessor.UpdateState(p.AppName) if err != nil { time.Sleep(sleepFor) continue @@ -95,14 +100,16 @@ func (p *Processor) waitForApp() error { func (p *Processor) List() (apps.Apps, error) { appList := apps.Apps{} - err := gatherStates() + statsProcessor := StatsProcessor{ + DB: p.DB, + } + + err := statsProcessor.GatherStates() if err != nil { return appList, fmt.Errorf("backend error: %v", err) } - processor := apps.AppsProcessor{ - DB: p.DB, - } + processor := p.getAppProcessor() appList, err = processor.List() if err != nil { @@ -116,14 +123,16 @@ func (p *Processor) List() (apps.Apps, error) { func (p *Processor) Get() (apps.App, error) { app := apps.App{} - err := updateState(p.AppName) + statsProcessor := StatsProcessor{ + DB: p.DB, + } + + err := statsProcessor.UpdateState(p.AppName) if err != nil { return app, err } - processor := apps.AppsProcessor{ - DB: p.DB, - } + processor := p.getAppProcessor() app, err = processor.Get(p.AppName) if err != nil { return app, err @@ -206,9 +215,7 @@ func (p *Processor) Create(appTemplate apps.App) error { // Register registers app without creating a container for it // Returns app name and an error func (p *Processor) Register(appTemplate apps.App) error { - processor := apps.AppsProcessor{ - DB: p.DB, - } + processor := p.getAppProcessor() err := processor.New(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory) if err != nil { if validationError, ok := err.(apps.ValidationError); ok { @@ -222,9 +229,7 @@ func (p *Processor) Register(appTemplate apps.App) error { // Update updates application func (p *Processor) Update(appTemplate apps.App) error { - processor := apps.AppsProcessor{ - DB: p.DB, - } + processor := p.getAppProcessor() app, err := processor.Update(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory) if err != nil { if validationError, ok := err.(apps.ValidationError); ok { @@ -265,9 +270,7 @@ func (p *Processor) Update(appTemplate apps.App) error { // Delete removes app from the system func (p *Processor) Delete() error { - processor := apps.AppsProcessor{ - DB: p.DB, - } + processor := p.getAppProcessor() container, err := p.getContainer() if err != nil { log.Println("ERROR: delete app:", err.Error()) @@ -486,7 +489,7 @@ func (p *Processor) RemoveLabel(label string) error { // GetNote returns node's info func (p *Processor) GetNode() (*node.Node, error) { - node, err := node.GetNodeInfo() + node, err := p.NodeProcessor.GetNodeInfo() if err != nil { return nil, err } diff --git a/glue/main_test.go b/glue/main_test.go new file mode 100644 index 0000000..1b62344 --- /dev/null +++ b/glue/main_test.go @@ -0,0 +1,194 @@ +package glue + +import ( + "fmt" + "log" + "os" + "path" + "testing" + + "github.com/jinzhu/gorm" + "github.com/rosti-cz/node-api/apps" + "github.com/stretchr/testify/assert" + + // This is line from GORM documentation that imports database dialect + _ "github.com/jinzhu/gorm/dialects/sqlite" +) + +const testDBPath = "file::memory:?cache=shared" + +var testAppTemplate apps.App = apps.App{ + Name: "test_1234", + SSHPort: 10000, + HTTPPort: 10001, + Image: "harbor.hq.rosti.cz/public/runtime:2022.01-1", + CPU: 50, + Memory: 256, +} + +func getPWD() string { + dir, err := os.Getwd() + if err != nil { + log.Fatal(err) + } + return dir +} + +func testDB() *gorm.DB { + db, err := gorm.Open("sqlite3", testDBPath) + if err != nil { + log.Fatalln(err) + } + + return db +} + +func getTestDockerSock() string { + dockerSocket := os.Getenv("DOCKER_SOCKET") + if dockerSocket == "" { + return "unix:///run/user/1000/podman/podman.sock" + } + return dockerSocket +} + +var processor Processor + +func TestMain(m *testing.M) { + // This processor is used to test all methods + processor = Processor{ + AppName: testAppTemplate.Name, + DB: testDB(), + // SnapshotProcessor *apps.SnapshotProcessor + DockerSock: getTestDockerSock(), + BindIPHTTP: "127.0.0.1", + BindIPSSH: "127.0.0.1", + AppsPath: path.Join(getPWD(), "tmp/apps"), + } + + exitVal := m.Run() + + // We don't care about output here because we can't do anything about it + //! If testing fails and this won't be performed you have to remove the test container manually + fmt.Println("Removing test container") + processor.Delete() + + os.Exit(exitVal) +} + +func TestProcessorCreate(t *testing.T) { + err := processor.Create(testAppTemplate) + assert.Nil(t, err) + processor.Delete() + assert.Nil(t, err) +} + +func TestProcessorList(t *testing.T) { + +} + +func TestProcessorGet(t *testing.T) { + err := processor.Create(testAppTemplate) + assert.Nil(t, err) + + app, err := processor.Get() + assert.Nil(t, err) + fmt.Println("State", app.State) + + processor.Delete() + assert.Nil(t, err) +} + +func TestProcessorRegister(t *testing.T) { + +} + +func TestProcessorUpdate(t *testing.T) { + +} + +func TestProcessorDelete(t *testing.T) { + err := processor.Create(testAppTemplate) + assert.Nil(t, err) + processor.Delete() + assert.Nil(t, err) +} + +func TestProcessorStop(t *testing.T) { + +} + +func TestProcessorStart(t *testing.T) { + +} + +func TestProcessorRestart(t *testing.T) { + +} + +func TestProcessorUpdateKeys(t *testing.T) { + +} + +func TestProcessorSetPassword(t *testing.T) { + +} + +func TestProcessorProcesses(t *testing.T) { + +} + +func TestProcessorEnableTech(t *testing.T) { + +} + +func TestProcessorRebuild(t *testing.T) { + +} + +func TestProcessorAddLabel(t *testing.T) { + +} + +func TestProcessorRemoveLabel(t *testing.T) { + +} + +func TestProcessorGetNode(t *testing.T) { + +} + +func TestProcessorCreateSnapshot(t *testing.T) { + +} + +func TestProcessorRestoreFromSnapshot(t *testing.T) { + +} + +func TestProcessorListSnapshots(t *testing.T) { + +} + +func TestProcessorListAppsSnapshots(t *testing.T) { + +} + +func TestProcessorListSnapshotsByLabel(t *testing.T) { + +} + +func TestProcessorGetSnapshot(t *testing.T) { + +} + +func TestProcessorGetSnapshotDownloadLink(t *testing.T) { + +} + +func TestProcessorDeleteSnapshot(t *testing.T) { + +} + +func TestProcessorDeleteAppSnapshots(t *testing.T) { + +} diff --git a/glue/stats.go b/glue/stats.go index 7bd1e16..026f9e1 100644 --- a/glue/stats.go +++ b/glue/stats.go @@ -3,16 +3,29 @@ package glue import ( "log" + "github.com/jinzhu/gorm" "github.com/rosti-cz/node-api/apps" - "github.com/rosti-cz/node-api/common" docker "github.com/rosti-cz/node-api/containers" ) -// updateUsage updates various resource usage of the container/app in the database -func updateUsage(name string) error { +// StatsProcessor covers all methods that are needed +// to gather information about application containers. +type StatsProcessor struct { + DB *gorm.DB +} + +// returns instance of getAppProcessor +func (s *StatsProcessor) getAppProcessor() apps.AppsProcessor { processor := apps.AppsProcessor{ - DB: common.GetDBConnection(), + DB: s.DB, } + processor.Init() + return processor +} + +// updateUsage updates various resource usage of the container/app in the database +func (s *StatsProcessor) UpdateUsage(name string) error { + processor := s.getAppProcessor() app, err := processor.Get(name) if err != nil { @@ -42,10 +55,8 @@ func updateUsage(name string) error { } // Updates only container's state. Check current status of the container and saves it into the database. -func updateState(name string) error { - processor := apps.AppsProcessor{ - DB: common.GetDBConnection(), - } +func (s *StatsProcessor) UpdateState(name string) error { + processor := s.getAppProcessor() app, err := processor.Get(name) if err != nil { return err @@ -67,17 +78,15 @@ func updateState(name string) error { } // gatherStats loops over all applications and calls updateUsage to write various metric into the database. -func gatherStats() error { - processor := apps.AppsProcessor{ - DB: common.GetDBConnection(), - } +func (s *StatsProcessor) GatherStats() error { + processor := s.getAppProcessor() appList, err := processor.List() if err != nil { return err } for _, app := range appList { - err := updateUsage(app.Name) + err := s.UpdateUsage(app.Name) if err != nil { log.Println("STATS ERROR:", err.Error()) } @@ -87,17 +96,15 @@ func gatherStats() error { } // gatherStates loops over all apps and updates their container state -func gatherStates() error { - processor := apps.AppsProcessor{ - DB: common.GetDBConnection(), - } +func (s *StatsProcessor) GatherStates() error { + processor := s.getAppProcessor() appList, err := processor.List() if err != nil { return err } for _, app := range appList { - err := updateState(app.Name) + err := s.UpdateState(app.Name) if err != nil { log.Println("STATE ERROR:", err.Error()) } diff --git a/handlers.go b/handlers.go index c7c488a..5d7abd2 100644 --- a/handlers.go +++ b/handlers.go @@ -364,11 +364,12 @@ func getOrphansHander(c echo.Context) error { // Return info about the node including performance index func getNodeInfoHandler(c echo.Context) error { processor := glue.Processor{ - DB: common.GetDBConnection(), - DockerSock: config.DockerSocket, - BindIPHTTP: config.AppsBindIPHTTP, - BindIPSSH: config.AppsBindIPSSH, - AppsPath: config.AppsPath, + DB: common.GetDBConnection(), + DockerSock: config.DockerSocket, + BindIPHTTP: config.AppsBindIPHTTP, + BindIPSSH: config.AppsBindIPSSH, + AppsPath: config.AppsPath, + NodeProcessor: &nodeProcessor, } node, err := processor.GetNode() if err != nil { @@ -404,11 +405,12 @@ func metricsHandler(c echo.Context) error { // Node indexes processor := glue.Processor{ - DB: common.GetDBConnection(), - DockerSock: config.DockerSocket, - BindIPHTTP: config.AppsBindIPHTTP, - BindIPSSH: config.AppsBindIPSSH, - AppsPath: config.AppsPath, + DB: common.GetDBConnection(), + DockerSock: config.DockerSocket, + BindIPHTTP: config.AppsBindIPHTTP, + BindIPSSH: config.AppsBindIPSSH, + AppsPath: config.AppsPath, + NodeProcessor: &nodeProcessor, } node, err := processor.GetNode() if err != nil { diff --git a/handlers_nats.go b/handlers_nats.go index 6a293b6..263661d 100644 --- a/handlers_nats.go +++ b/handlers_nats.go @@ -126,7 +126,12 @@ func listEventHandler(m *nats.Msg, message *RequestMessage) error { // Returns one app func getEventHandler(m *nats.Msg, message *RequestMessage) error { processor := glue.Processor{ - DB: common.GetDBConnection(), + AppName: message.AppName, + DB: common.GetDBConnection(), + DockerSock: config.DockerSocket, + BindIPHTTP: config.AppsBindIPHTTP, + BindIPSSH: config.AppsBindIPSSH, + AppsPath: config.AppsPath, } app, err := processor.Get() @@ -597,6 +602,7 @@ func getNodeEventHandler(m *nats.Msg, message *RequestMessage) error { BindIPHTTP: config.AppsBindIPHTTP, BindIPSSH: config.AppsBindIPSSH, AppsPath: config.AppsPath, + NodeProcessor: &nodeProcessor, } node, err := processor.GetNode() diff --git a/main.go b/main.go index 606dfc2..d617230 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "github.com/rosti-cz/node-api/apps" "github.com/rosti-cz/node-api/apps/drivers" "github.com/rosti-cz/node-api/common" + "github.com/rosti-cz/node-api/glue" "github.com/rosti-cz/node-api/node" ) @@ -19,6 +20,7 @@ const JSONIndent = " " var config common.Config var nc *nats.Conn var snapshotProcessor apps.SnapshotProcessor +var nodeProcessor node.Processor func _init() { var err error @@ -52,6 +54,10 @@ func _init() { Bucket: config.SnapshotsS3Bucket, }, } + + nodeProcessor = node.Processor{ + DB: common.GetDBConnection(), + } } func main() { @@ -66,24 +72,28 @@ func main() { t := &Template{} // Stats loop - // go func() { - // for { - // log.Println("Stats gathering started") - // start := time.Now() - // err := gatherStats() - // if err != nil { - // log.Println("LOOP ERROR:", err.Error()) - // } - // elapsed := time.Since(start) - // log.Printf("Stats gathering elapsed time: %.2fs\n", elapsed.Seconds()) - // time.Sleep(300 * time.Second) - // } - // }() + go func() { + statsProcessor := glue.StatsProcessor{ + DB: common.GetDBConnection(), + } + + for { + log.Println("Stats gathering started") + start := time.Now() + err := statsProcessor.GatherStats() + if err != nil { + log.Println("LOOP ERROR:", err.Error()) + } + elapsed := time.Since(start) + log.Printf("Stats gathering elapsed time: %.2fs\n", elapsed.Seconds()) + time.Sleep(300 * time.Second) + } + }() // Node stats go func() { for { - err := node.Log() + err := nodeProcessor.Log() if err != nil { log.Println("NODE PERFORMANCE LOG ERROR:", err.Error()) } diff --git a/node/load.go b/node/load.go index e26a512..6bc0091 100644 --- a/node/load.go +++ b/node/load.go @@ -1,23 +1,33 @@ package node import ( - "github.com/rosti-cz/node-api/common" - "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/load" "github.com/shirou/gopsutil/mem" + + "github.com/jinzhu/gorm" ) const history = 72 * 3600 / 300 // 3 days, one record every five minutes -func init() { - db := common.GetDBConnection() - db.AutoMigrate(PerformanceLog{}) +// Processor covers Node related methods for monitoring and calculating performance indexes. +type Processor struct { + DB *gorm.DB +} + +func (p *Processor) Init() { + p.DB.AutoMigrate(PerformanceLog{}) +} + +// GetNodeInfo returns information about this node +func (p *Processor) GetNodeInfo() (*Node, error) { + node, err := p.Index() + return node, err } // Log creates a record for all important metrics used as -func Log() error { +func (p *Processor) Log() error { performanceLog := PerformanceLog{} // Load @@ -45,8 +55,7 @@ func Log() error { performanceLog.DiskSpaceUsage = diskUsage.UsedPercent / 100.0 // Save - db := common.GetDBConnection() - err = db.Create(&performanceLog).Error + err = p.DB.Create(&performanceLog).Error if err != nil { return err } @@ -54,13 +63,13 @@ func Log() error { // and clean // we have to use this stupid approach because DELETE doesn't support ORDER BY and LIMIT toDeleteLogs := []PerformanceLog{} - err = db.Order("id DESC").Limit("99").Offset(history).Find(&toDeleteLogs).Error + err = p.DB.Order("id DESC").Limit("99").Offset(history).Find(&toDeleteLogs).Error if err != nil { return err } for _, toDeleteLog := range toDeleteLogs { - err = db.Delete(&toDeleteLog).Error + err = p.DB.Delete(&toDeleteLog).Error if err != nil { return err } @@ -71,7 +80,7 @@ func Log() error { // Index returns number from 0 to 1 where 0 means least loaded and 1 maximally loaded. // It uses history of last 72 hours -func index() (*Node, error) { +func (p *Processor) Index() (*Node, error) { node := Node{ Index: 1.0, } @@ -82,11 +91,9 @@ func index() (*Node, error) { return &node, err } - db := common.GetDBConnection() - logs := []PerformanceLog{} - err = db.Find(&logs).Error + err = p.DB.Find(&logs).Error if err != nil { return &node, err } diff --git a/node/main.go b/node/main.go index fbd8bb5..2b4023a 100644 --- a/node/main.go +++ b/node/main.go @@ -1,7 +1 @@ package node - -// GetNodeInfo returns information about this node -func GetNodeInfo() (*Node, error) { - node, err := index() - return node, err -}