node-api/handlers_nats.go

649 lines
15 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/rosti-cz/node-api/apps"
"github.com/rosti-cz/node-api/docker"
"github.com/rosti-cz/node-api/node"
)
// This handler only passes messages to another function for easier testing
func messageHandler(msg *nats.Msg) {
go _messageHandler(msg)
}
func _messageHandler(m *nats.Msg) error {
message := RequestMessage{}
err := json.Unmarshal(m.Data, &message)
if err != nil {
log.Println(errors.Wrap(err, "invalid JSON data in the incoming message"))
return err
}
fmt.Printf("Received a message: %v\n", message)
eventHandlerMap := map[string](func(m *nats.Msg, message *RequestMessage) error){
"list": listEventHandler,
"get": getEventHandler,
"create": createEventHandler,
"update": updateEventHandler,
"delete": deleteEventHandler,
"stop": stopEventHandler,
"start": startEventHandler,
"restart": restartEventHandler,
"update_keys": updateKeysEventHandler,
"set_password": setPasswordEventHandler,
"processes": processesEventHandler,
"enable_tech": enableTechEventHandler,
"rebuild": rebuildEventHandler,
"add_label": addLabelEventHandler,
"remove_label": removeLabelEventHandler,
"list_orphans": listOrphansEventHandler,
"node": getNoteEventHandler,
}
if eventHandler, ok := eventHandlerMap[message.Type]; ok {
return eventHandler(m, &message)
} else {
log.Println("ERROR: event handler not defined for " + message.Type)
}
// Set password for the app user in the container
// Application processes
// Enable one of the supported technologies or services (python, node, redis, ...)
// Rebuilds existing app, it keeps the data but creates the container again
// Adds new label
// Removes existing label
// Orphans returns directories in /srv that doesn't match any hosted application
// Return info about the node including performance index
return nil
}
// Returns list of apps
func listEventHandler(m *nats.Msg, message *RequestMessage) error {
log.Println("> List")
err := gatherStates()
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
applications, err := apps.List()
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
reply := ReplyMessage{
Payload: applications,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: list apps:", err.Error())
}
return err
}
// Returns one app
func getEventHandler(m *nats.Msg, message *RequestMessage) error {
err := updateState(message.AppName)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
app, err := apps.Get(message.AppName)
if err != nil {
return errorReplyFormater(m, "backend error", err)
}
reply := ReplyMessage{
AppName: app.Name,
Payload: app,
}
data, err := json.Marshal(reply)
if err != nil {
return errorReplyFormater(m, "reply formatter error", err)
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR: get app:", err.Error())
}
return err
}
// Create a new app
func createEventHandler(m *nats.Msg, message *RequestMessage) error {
appTemplate := apps.App{}
body := []byte(message.Payload)
err := json.Unmarshal(body, &appTemplate)
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
publish(appTemplate.Name, "payload parsing problem", true)
return err
}
err = apps.New(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
log.Println("ERROR create application problem: " + validationError.Error())
publish(appTemplate.Name, "validation problem", true)
return err
}
log.Println("ERROR create application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
container := docker.Container{
App: &appTemplate,
}
err = container.Create()
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
err = container.Start()
if err != nil {
log.Println("ERROR create application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
publish(appTemplate.Name, "created", false)
return nil
}
// Update existing app
func updateEventHandler(m *nats.Msg, message *RequestMessage) error {
appTemplate := apps.App{}
body := []byte(message.Payload)
err := json.Unmarshal(body, &appTemplate)
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
publish(appTemplate.Name, "payload parsing problem", true)
return err
}
app, err := apps.Update(message.AppName, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
log.Println("ERROR update application problem: " + validationError.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
log.Println("ERROR update application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.Destroy()
if err != nil && err.Error() == "no container found" {
// We don't care if the container didn't exist anyway
err = nil
}
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
err = container.Create()
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
err = container.Start()
if err != nil {
log.Println("ERROR update application problem: " + err.Error())
publish(appTemplate.Name, "backend problem", true)
return err
}
publish(appTemplate.Name, "updated", false)
return nil
}
// Delete one app
func deleteEventHandler(m *nats.Msg, message *RequestMessage) error {
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR: delete app:", err.Error())
}
container := docker.Container{
App: app,
}
status, err := container.Status()
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
if status != "no-container" {
// We stop the container first
err = container.Stop()
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
// Then delete it
err = container.Delete()
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
err = apps.Delete(app.Name)
if err != nil {
log.Println("ERROR delete application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "deleted", false)
return nil
}
// Stop existing app
func stopEventHandler(m *nats.Msg, message *RequestMessage) error {
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR stop application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
status, err := container.Status()
if err != nil {
log.Println("ERROR stop application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
// Stop the container only when it exists
if status != "no-container" {
err = container.Stop()
if err != nil {
log.Println("ERROR stop application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
publish(app.Name, "stopped", false)
return nil
}
// Start existing app
func startEventHandler(m *nats.Msg, message *RequestMessage) error {
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
status, err := container.Status()
if err != nil {
return err
}
if status == "no-container" {
err = container.Create()
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
}
err = container.Start()
if err != nil {
log.Println("ERROR start application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "started", false)
return nil
}
// Restart existing app
func restartEventHandler(m *nats.Msg, message *RequestMessage) error {
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR restart application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.Restart()
if err != nil {
log.Println("ERROR restart application problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "restarted", false)
return nil
}
// Copies body of the request into /srv/.ssh/authorized_keys
func updateKeysEventHandler(m *nats.Msg, message *RequestMessage) error {
err := waitForApp(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
body := message.Payload
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR keys update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.SetFileContent(sshPubKeysLocation, body+"\n", "0600")
if err != nil {
log.Println("ERROR keys update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "keys updated", false)
return nil
}
// Set password for the app user in the container
func setPasswordEventHandler(m *nats.Msg, message *RequestMessage) error {
err := waitForApp(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
password := message.Payload
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR password update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.SetPassword(password)
if err != nil {
log.Println("ERROR password update problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "password updated", false)
return nil
}
// Application processes
func processesEventHandler(m *nats.Msg, message *RequestMessage) error {
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
processes, err := container.GetProcessList()
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
reply := ReplyMessage{
Payload: processes,
}
data, err := json.Marshal(reply)
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR processes list problem: " + err.Error())
return err
}
return nil
}
// Enable one of the supported technologies or services (python, node, redis, ...)
func enableTechEventHandler(m *nats.Msg, message *RequestMessage) error {
service := message.Payload
err := waitForApp(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.SetTechnology(service)
if err != nil {
log.Println("ERROR enable tech problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "tech updated", false)
return nil
}
// Rebuilds existing app, it keeps the data but creates the container again
func rebuildEventHandler(m *nats.Msg, message *RequestMessage) error {
app, err := apps.Get(message.AppName)
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
container := docker.Container{
App: app,
}
err = container.Destroy()
if err != nil && err.Error() == "no container found" {
// We don't care if the container didn't exist anyway
err = nil
}
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
err = container.Create()
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
err = container.Start()
if err != nil {
log.Println("ERROR rebuild app problem: " + err.Error())
publish(app.Name, "backend problem", true)
return err
}
publish(app.Name, "app rebuild", false)
return nil
}
// Adds new label
func addLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
label := message.Payload
err := apps.AddLabel(message.AppName, label)
if err != nil {
log.Println("ERROR add label problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
publish(message.AppName, "label added", false)
return nil
}
// Removes existing label
func removeLabelEventHandler(m *nats.Msg, message *RequestMessage) error {
label := message.Payload
err := apps.RemoveLabel(message.AppName, label)
if err != nil {
log.Println("ERROR remove label problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
publish(message.AppName, "label removed", false)
return nil
}
// Orphans returns directories in /srv that doesn't match any hosted application
func listOrphansEventHandler(m *nats.Msg, message *RequestMessage) error {
reply := ReplyMessage{
Error: true,
Payload: "not implemented yet",
}
data, err := json.Marshal(reply)
if err != nil {
log.Println("ERROR orphans list problem: " + err.Error())
return err
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR orphans list problem: " + err.Error())
return err
}
return nil
}
// Return info about the node including performance index
func getNoteEventHandler(m *nats.Msg, message *RequestMessage) error {
node, err := node.GetNodeInfo()
if err != nil {
log.Println("ERROR performance index problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
reply := ReplyMessage{
Payload: node,
}
data, err := json.Marshal(reply)
if err != nil {
log.Println("ERROR performance index problem: " + err.Error())
publish(message.AppName, "backend problem", true)
return err
}
err = m.Respond(data)
if err != nil {
log.Println("ERROR performance index problem: " + err.Error())
return err
}
return nil
}