package main /* Message handling is pretty straightforward. When request is received it's usually asynchronic and the client is not waiting for an reply. In this case client writes down that something is happening here and when it's done publish() function is used to notify everybody that it's done. Client usually listens to this channel and it finish his part as it needs. In case there is a reply we use NATS's responde() method to return the data as soon as possible. */ import ( "encoding/json" "fmt" "log" "strings" "github.com/nats-io/nats.go" "github.com/pkg/errors" "github.com/rosti-cz/node-api/apps" "github.com/rosti-cz/node-api/apps/drivers" "github.com/rosti-cz/node-api/common" "github.com/rosti-cz/node-api/docker" "github.com/rosti-cz/node-api/node" ) // This handler only passes messages to another function for easier testing func messageHandler(msg *nats.Msg) { go _messageHandler(msg) } func _messageHandler(m *nats.Msg) error { message := RequestMessage{} err := json.Unmarshal(m.Data, &message) if err != nil { log.Println(errors.Wrap(err, "invalid JSON data in the incoming message")) return err } fmt.Printf("Received a message: %v\n", message) eventHandlerMap := map[string](func(m *nats.Msg, message *RequestMessage) error){ "list": listEventHandler, "get": getEventHandler, "create": createEventHandler, "register": registerEventHandler, "update": updateEventHandler, "delete": deleteEventHandler, "stop": stopEventHandler, "start": startEventHandler, "restart": restartEventHandler, "update_keys": updateKeysEventHandler, "set_password": setPasswordEventHandler, "processes": processesEventHandler, "enable_tech": enableTechEventHandler, "rebuild": rebuildEventHandler, "add_label": addLabelEventHandler, "remove_label": removeLabelEventHandler, "list_orphans": listOrphansEventHandler, "node": getNoteEventHandler, "create_snapshot": createSnapshotEventHandler, "restore_from_snapshot": restoreFromSnapshotEventHandler, "list_snapshots": listSnapshotsEventHandler, "list_apps_snapshots": listAppsSnapshotsEventHandler, "list_snapshots_by_label": listSnapshotsByLabelEventHandler, "delete_snapshot": deleteSnapshotEventHandler, "delete_app_snapshots": deleteAppSnapshotsEventHandler, } if eventHandler, ok := eventHandlerMap[message.Type]; ok { return eventHandler(m, &message) } else { log.Println("ERROR: event handler not defined for " + message.Type) } // Set password for the app user in the container // Application processes // Enable one of the supported technologies or services (python, node, redis, ...) // Rebuilds existing app, it keeps the data but creates the container again // Orphans returns directories in /srv that doesn't match any hosted application // Return info about the node including performance index return nil } // Returns list of apps func listEventHandler(m *nats.Msg, message *RequestMessage) error { log.Println("> List") err := gatherStates() if err != nil { return errorReplyFormater(m, "backend error", err) } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } processor.Init() applications, err := processor.List() if err != nil { return errorReplyFormater(m, "backend error", err) } reply := ReplyMessage{ Payload: applications, } data, err := json.Marshal(reply) if err != nil { return errorReplyFormater(m, "reply formatter error", err) } err = m.Respond(data) if err != nil { log.Println("ERROR: list apps:", err.Error()) } return err } // Returns one app func getEventHandler(m *nats.Msg, message *RequestMessage) error { err := updateState(message.AppName) if err != nil { return errorReplyFormater(m, "backend error", err) } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { return errorReplyFormater(m, "backend error", err) } reply := ReplyMessage{ AppName: app.Name, Payload: app, } data, err := json.Marshal(reply) if err != nil { return errorReplyFormater(m, "reply formatter error", err) } err = m.Respond(data) if err != nil { log.Println("ERROR: get app:", err.Error()) } return err } // Create a new app func createEventHandler(m *nats.Msg, message *RequestMessage) error { appTemplate := apps.App{} body := []byte(message.Payload) err := json.Unmarshal(body, &appTemplate) if err != nil { log.Println("ERROR create application problem: " + err.Error()) publish(message.AppName, "payload parsing problem", true) return err } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } err = processor.New(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory) if err != nil { if validationError, ok := err.(apps.ValidationError); ok { log.Println("ERROR create application problem: " + validationError.Error()) publish(message.AppName, "validation problem", true) return err } log.Println("ERROR create application problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } container := docker.Container{ App: &appTemplate, } err = container.Create() if err != nil { log.Println("ERROR create application problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } err = container.Start() if err != nil { log.Println("ERROR create application problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } publish(message.AppName, "created", false) return nil } // Registers new app without creating an container func registerEventHandler(m *nats.Msg, message *RequestMessage) error { appTemplate := apps.App{} body := []byte(message.Payload) err := json.Unmarshal(body, &appTemplate) if err != nil { log.Println("ERROR create application problem: " + err.Error()) publish(message.AppName, "payload parsing problem", true) return err } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } err = processor.New(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory) if err != nil { if validationError, ok := err.(apps.ValidationError); ok { log.Println("ERROR create application problem: " + validationError.Error()) publish(message.AppName, "validation problem", true) return err } log.Println("ERROR create application problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } publish(message.AppName, "registered", false) return nil } // Update existing app func updateEventHandler(m *nats.Msg, message *RequestMessage) error { appTemplate := apps.App{} body := []byte(message.Payload) err := json.Unmarshal(body, &appTemplate) if err != nil { log.Println("ERROR update application problem: " + err.Error()) publish(message.AppName, "payload parsing problem", true) return err } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Update(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory) if err != nil { if validationError, ok := err.(apps.ValidationError); ok { log.Println("ERROR update application problem: " + validationError.Error()) publish(message.AppName, "backend problem", true) return err } log.Println("ERROR update application problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } container := docker.Container{ App: app, } err = container.Destroy() if err != nil && err.Error() == "no container found" { // We don't care if the container didn't exist anyway err = nil } if err != nil { log.Println("ERROR update application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } err = container.Create() if err != nil { log.Println("ERROR update application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } err = container.Start() if err != nil { log.Println("ERROR update application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "updated", false) return nil } // Delete one app func deleteEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR: delete app:", err.Error()) } container := docker.Container{ App: app, } status, err := container.Status() if err != nil { log.Println("ERROR delete application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } if status != "no-container" { // We stop the container first err = container.Stop() if err != nil { log.Println("ERROR delete application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } // Then delete it err = container.Delete() if err != nil { log.Println("ERROR delete application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } } err = processor.Delete(app.Name) if err != nil { log.Println("ERROR delete application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "deleted", false) return nil } // Stop existing app func stopEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR stop application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } status, err := container.Status() if err != nil { log.Println("ERROR stop application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } // Stop the container only when it exists if status != "no-container" { err = container.Stop() if err != nil { log.Println("ERROR stop application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } } publish(app.Name, "stopped", false) return nil } // Start existing app func startEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR start application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } status, err := container.Status() if err != nil { return err } if status == "no-container" { err = container.Create() if err != nil { log.Println("ERROR start application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } } err = container.Start() if err != nil { log.Println("ERROR start application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "started", false) return nil } // Restart existing app func restartEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR restart application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } err = container.Restart() if err != nil { log.Println("ERROR restart application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "restarted", false) return nil } // Copies body of the request into /srv/.ssh/authorized_keys func updateKeysEventHandler(m *nats.Msg, message *RequestMessage) error { err := waitForApp(message.AppName) if err != nil { log.Println("ERROR enable tech problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } body := message.Payload processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR keys update problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } err = container.SetFileContent(sshPubKeysLocation, body+"\n", "0600") if err != nil { log.Println("ERROR keys update problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "keys updated", false) return nil } // Set password for the app user in the container func setPasswordEventHandler(m *nats.Msg, message *RequestMessage) error { err := waitForApp(message.AppName) if err != nil { log.Println("ERROR enable tech problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } password := message.Payload processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR password update problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } err = container.SetPassword(password) if err != nil { log.Println("ERROR password update problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "password updated", false) return nil } // Application processes func processesEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR processes list problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } processes, err := container.GetProcessList() if err != nil { log.Println("ERROR processes list problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } reply := ReplyMessage{ Payload: processes, } data, err := json.Marshal(reply) if err != nil { log.Println("ERROR processes list problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } err = m.Respond(data) if err != nil { log.Println("ERROR processes list problem: " + err.Error()) return err } return nil } // Enable one of the supported technologies or services (python, node, redis, ...) func enableTechEventHandler(m *nats.Msg, message *RequestMessage) error { service := message.Payload err := waitForApp(message.AppName) if err != nil { log.Println("ERROR enable tech problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR enable tech problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } err = container.SetTechnology(service) if err != nil { log.Println("ERROR enable tech problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "tech updated", false) return nil } // Rebuilds existing app, it keeps the data but creates the container again func rebuildEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR rebuild app problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } err = container.Destroy() if err != nil && err.Error() == "no container found" { // We don't care if the container didn't exist anyway err = nil } if err != nil { log.Println("ERROR rebuild app problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } err = container.Create() if err != nil { log.Println("ERROR rebuild app problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } err = container.Start() if err != nil { log.Println("ERROR rebuild app problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } publish(app.Name, "app rebuild", false) return nil } // Adds new label func addLabelEventHandler(m *nats.Msg, message *RequestMessage) error { label := message.Payload processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } err := processor.AddLabel(message.AppName, label) if err != nil { log.Println("ERROR add label problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } publish(message.AppName, "label added", false) return nil } // Removes existing label func removeLabelEventHandler(m *nats.Msg, message *RequestMessage) error { label := message.Payload processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } err := processor.RemoveLabel(message.AppName, label) if err != nil { log.Println("ERROR remove label problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } publish(message.AppName, "label removed", false) return nil } // Orphans returns directories in /srv that doesn't match any hosted application func listOrphansEventHandler(m *nats.Msg, message *RequestMessage) error { reply := ReplyMessage{ Error: true, Payload: "not implemented yet", } data, err := json.Marshal(reply) if err != nil { log.Println("ERROR orphans list problem: " + err.Error()) return err } err = m.Respond(data) if err != nil { log.Println("ERROR orphans list problem: " + err.Error()) return err } return nil } /* getNoteEventHandler returns info about the node including performance index */ func getNoteEventHandler(m *nats.Msg, message *RequestMessage) error { node, err := node.GetNodeInfo() if err != nil { log.Println("ERROR performance index problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } reply := ReplyMessage{ Payload: node, } data, err := json.Marshal(reply) if err != nil { log.Println("ERROR performance index problem: " + err.Error()) publish(message.AppName, "backend problem", true) return err } err = m.Respond(data) if err != nil { log.Println("ERROR performance index problem: " + err.Error()) return err } return nil } /* createSnapshotEventHandler create snapshot of given application Uses appName from the message struct Payload: no payload needed Response: notification when it's done or error */ func createSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } _, err := processor.CreateSnapshot(message.AppName, strings.Split(message.Payload, ",")) if err != nil { log.Println("ERROR create snapshot error: " + err.Error()) publish(message.AppName, "backend problem", true) return err } publish(message.AppName, "snapshot created", false) return nil } /* restoreFromSnapshotEventHandler restores app from a snapshot. The app has to exist before it can be restored. Any snapshot can be used to restore any application. Use labels to store information about what app should be created. Uses appName from the message struct as the destination app where the data should be restored Payload: string with the snapshot name Response: notification when it's done or error */ func restoreFromSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error { // Setup processors snapshotProcessor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } processor := apps.AppsProcessor{ DB: common.GetDBConnection(), } app, err := processor.Get(message.AppName) if err != nil { log.Println("ERROR restore snapshot problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } container := docker.Container{ App: app, } // Stop the container status, err := container.Status() if err != nil { log.Println("ERROR restore snapshot problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } // Stop the container only when it exists if status != "no-container" { err = container.Stop() if err != nil { log.Println("ERROR restore snapshot problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } } // Restore the data err = snapshotProcessor.RestoreSnapshot(message.Payload, message.AppName) if err != nil { log.Println("ERROR restore snapshot error: " + err.Error()) publish(message.AppName, "backend problem", true) return err } // Start the container status, err = container.Status() if err != nil { return err } if status == "no-container" { err = container.Create() if err != nil { log.Println("ERROR start application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } } err = container.Start() if err != nil { log.Println("ERROR start application problem: " + err.Error()) publish(app.Name, "backend problem", true) return err } // notify clients that it's all done publish(message.AppName, "snapshot restored", false) return nil } /* listSnapshotsEventHandler returns list of snapshots related to a single application Uses appName from the message Payload: no payload needed Response: replies with list of snapshots or an error message */ func listSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } snapshots, err := processor.ListAppSnapshots(message.AppName) if err != nil { return errorReplyFormater(m, "backend error", err) } var output SnapshotsMetadata for _, snapshot := range snapshots { output = append(output, SnapshotMetadata{ Key: snapshot.KeyName(), Metadata: snapshot, }) } reply := ReplyMessage{ Payload: output, } data, err := json.Marshal(reply) if err != nil { return errorReplyFormater(m, "reply formatter error", err) } err = m.Respond(data) if err != nil { log.Println("ERROR: list of snapshots:", err.Error()) } return nil } /* listAppsSnapshotsEventHandler returns list of snapshots related to list of application names. Payload: list of appNames separated by comma (no spaces) Response: replies with list of snapshots or an error message */ func listAppsSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } snapshots, err := processor.ListAppsSnapshots(strings.Split(message.Payload, ",")) if err != nil { return errorReplyFormater(m, "backend error", err) } var output SnapshotsMetadata for _, snapshot := range snapshots { output = append(output, SnapshotMetadata{ Key: snapshot.KeyName(), Metadata: snapshot, }) } reply := ReplyMessage{ Payload: output, } data, err := json.Marshal(reply) if err != nil { return errorReplyFormater(m, "reply formatter error", err) } err = m.Respond(data) if err != nil { log.Println("ERROR: list of snapshots:", err.Error()) } return nil } /* listSnapshotsByLabelEventHandler returns list of snapshots with given label Payload: snapshot label Response: replies with list of snapshots or an error message */ func listSnapshotsByLabelEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } snapshots, err := processor.ListAppsSnapshotsByLabel(message.Payload) if err != nil { return errorReplyFormater(m, "backend error", err) } var output SnapshotsMetadata for _, snapshot := range snapshots { output = append(output, SnapshotMetadata{ Key: snapshot.KeyName(), Metadata: snapshot, }) } reply := ReplyMessage{ Payload: output, } data, err := json.Marshal(reply) if err != nil { return errorReplyFormater(m, "reply formatter error", err) } err = m.Respond(data) if err != nil { log.Println("ERROR: list of snapshots:", err.Error()) } return nil } /* deleteSnapshotEventHandler delete a single snapshot. This is not bound to application name. Payload: string with a snapshot name Response: notification when it's done or error */ func deleteSnapshotEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } err := processor.DeleteSnapshot(message.Payload) if err != nil { return errorReplyFormater(m, "backend error", err) } publish(message.AppName, "snapshot deleted", false) return nil } /* deleteAppSnapshotsEventHandler deletes all snapshots related to a single application Uses appName from the message struct Payload: no payload needed Response: notification when it's done or error */ func deleteAppSnapshotsEventHandler(m *nats.Msg, message *RequestMessage) error { processor := apps.SnapshotProcessor{ AppsPath: config.AppsPath, TmpSnapshotPath: config.SnapshotsPath, Driver: drivers.S3Driver{ S3AccessKey: config.SnapshotsS3AccessKey, S3SecretKey: config.SnapshotsS3SecretKey, S3Endpoint: config.SnapshotsS3Endpoint, S3SSL: config.SnapshotsS3SSL, Bucket: config.SnapshotsS3Bucket, }, } err := processor.DeleteAppSnapshots(message.AppName) if err != nil { return errorReplyFormater(m, "backend error", err) } publish(message.AppName, "snapshots deleted", false) return nil }