node-api/glue/main.go

950 lines
20 KiB
Go

package glue
import (
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path"
"strings"
"time"
"github.com/jinzhu/gorm"
"github.com/rosti-cz/node-api/apps"
"github.com/rosti-cz/node-api/containers"
docker "github.com/rosti-cz/node-api/containers"
"github.com/rosti-cz/node-api/detector"
"github.com/rosti-cz/node-api/node"
)
// Wait for the container a little bit longer
const ENABLE_TECH_WAIT = 10
// Processor separates logic of apps, containers, detector and node from handlers.
// It defines common interface for both types of handlers, HTTP and the events.
type Processor struct {
AppName string
DB *gorm.DB
SnapshotProcessor *apps.SnapshotProcessor
NodeProcessor *node.Processor
WaitForAppLoops uint // each loop is five seconds
DockerSock string
BindIPHTTP string
BindIPSSH string
AppsPath string
}
// Return prepared Container instance
func (p *Processor) getContainer() (containers.Container, error) {
container := containers.Container{}
processor := p.getAppProcessor()
app, err := processor.Get(p.AppName)
if err != nil {
return container, err
}
container = docker.Container{
App: &app,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
return container, nil
}
// returns instance of getAppProcessor
func (p *Processor) getAppProcessor() apps.AppsProcessor {
processor := apps.AppsProcessor{
DB: p.DB,
}
processor.Init()
return processor
}
// waits until app is ready
func (p *Processor) waitForApp() error {
sleepFor := 5 * time.Second
loops := 6
if p.WaitForAppLoops != 0 {
loops = int(p.WaitForAppLoops)
}
statsProcessor := StatsProcessor{
DB: p.DB,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
for i := 0; i < loops; i++ {
err := statsProcessor.UpdateState(p.AppName)
if err != nil {
time.Sleep(sleepFor)
continue
}
container, err := p.getContainer()
if err != nil {
return err
}
status, err := container.Status()
if err != nil {
return err
}
if status.Status == "running" {
return nil
}
time.Sleep(sleepFor)
}
return errors.New("timeout reached")
}
// List returns list of apps
// noUpdate skips stats gathering to speed things up
func (p *Processor) List(noUpdate bool) (apps.Apps, error) {
appList := apps.Apps{}
if !noUpdate {
statsProcessor := StatsProcessor{
DB: p.DB,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
err := statsProcessor.GatherStates()
if err != nil {
return appList, fmt.Errorf("backend error: %v", err)
}
}
processor := p.getAppProcessor()
appList, err := processor.List()
if err != nil {
return appList, fmt.Errorf("backend error: %v", err)
}
return appList, err
}
// Get returns one app
func (p *Processor) Get(noUpdate bool) (apps.App, error) {
app := apps.App{}
if !noUpdate {
statsProcessor := StatsProcessor{
DB: p.DB,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
err := statsProcessor.UpdateState(p.AppName)
if err != nil {
return app, err
}
}
processor := p.getAppProcessor()
app, err := processor.Get(p.AppName)
if err != nil {
return app, err
}
// Gather runtime info about the container
if !noUpdate {
container := docker.Container{
App: &app,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
status, err := container.Status()
if err != nil {
return app, err
}
if status.Status == "running" {
var err error
app.Techs, err = container.GetTechs()
if err != nil {
return app, err
}
app.PrimaryTech, err = container.GetPrimaryTech()
if err != nil {
return app, err
}
processList, err := container.GetSystemProcesses()
if err != nil {
return app, err
}
flags, err := detector.Check(processList)
if err != nil {
return app, err
}
app.Flags = flags.String()
}
}
return app, nil
}
// Takes URL with an tar archive and prepares container's volume from it.
func (p *Processor) volumeFromURL(url string, container *docker.Container) error {
// Validation, check if url ends with tar.zst
if !strings.HasSuffix(url, ".tar.zst") {
return fmt.Errorf("archive has to end with .tar.zst")
}
volumePath := container.VolumeHostPath()
// Prepare volume path
err := os.MkdirAll(volumePath, 0755)
if err != nil {
return fmt.Errorf("failed to create volume path: %v", err)
}
// Download the archive
archivePath := path.Join(volumePath, "archive.tar.zst")
log.Printf("%s: downloading archive from %s\n", container.App.Name, url)
f, err := os.Create(archivePath)
if err != nil {
return fmt.Errorf("failed to create archive file: %v", err)
}
defer f.Close()
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to download archive: %v", err)
}
defer resp.Body.Close()
n, err := io.Copy(f, resp.Body)
if err != nil {
return fmt.Errorf("failed to download archive: %v", err)
}
log.Printf("downloaded %d bytes\n", n)
// Extract the archive
log.Printf("%s: extracting archive\n", container.App.Name)
// Call tar xf archive.tar.zst -C /volume
cmd := exec.Command("tar", "-xf", archivePath, "-C", volumePath)
err = cmd.Run()
if err != nil {
log.Printf("%s: failed to extract archive: %v", container.App.Name, err)
return err
}
// Remove archive
log.Printf("%s: removing archive\n", container.App.Name)
err = os.Remove(volumePath + "/archive.tar.zst")
if err != nil {
return fmt.Errorf("failed to remove archive: %v", err)
}
log.Printf("%s: volume preparing done\n", container.App.Name)
return nil
}
// Create creates a single app in the system
func (p *Processor) Create(appTemplate apps.App) error {
if appTemplate.EnvRaw == nil {
appTemplate.EnvRaw = make(map[string]interface{})
}
err := p.Register(appTemplate)
if err != nil {
return err
}
container := docker.Container{
App: &appTemplate,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
if len(appTemplate.Snapshot) > 0 && len(appTemplate.Setup.ArchiveURL) > 0 {
return fmt.Errorf("snapshot and archive_url cannot be used together")
}
if len(appTemplate.Setup.ArchiveURL) > 0 {
err = p.volumeFromURL(appTemplate.Setup.ArchiveURL, &container)
if err != nil {
return fmt.Errorf("failed to prepare volume: %v", err)
}
}
err = container.Create()
if err != nil {
return err
}
// Restore from snapshot if it's noted in the request
if len(appTemplate.Snapshot) > 0 {
log.Printf("App %s is going to be created from %s snapshot\n", appTemplate.Name, appTemplate.Snapshot)
// Restore the data
err = p.SnapshotProcessor.RestoreSnapshot(appTemplate.Snapshot, appTemplate.Name)
if err != nil {
return fmt.Errorf("failed to restore snapshot: %v", err)
}
}
err = container.Start()
if err != nil {
return fmt.Errorf("failed to start container: %v", err)
}
// Wait for the app to be created
err = p.waitForApp()
if err != nil {
return fmt.Errorf("failed to wait for app: %v", err)
}
time.Sleep(5 * time.Second) // We wait for a little bit longer to make sure the container is fully started
// Setup SSH keys if it's noted in the request
log.Println("Checking if SSH key is required")
if len(appTemplate.Setup.SSHKeys) > 0 && len(appTemplate.Snapshot) == 0 {
log.Println("Setting up SSH keys")
err = p.UpdateKeys(appTemplate.Setup.SSHKeys)
if err != nil {
return fmt.Errorf("failed to update keys: %v", err)
}
}
// Setup technology if it's noted in the request
if len(appTemplate.Setup.Tech) > 0 && len(appTemplate.Snapshot) == 0 {
err = p.EnableTech(appTemplate.Setup.Tech, appTemplate.Setup.TechVersion)
if err != nil {
return fmt.Errorf("failed to enable tech: %v", err)
}
}
// Set password if it's noted in the request
if len(appTemplate.Setup.Password) > 0 && len(appTemplate.Snapshot) == 0 {
err = p.SetPassword(appTemplate.Setup.Password)
if err != nil {
return fmt.Errorf("failed to set password: %v", err)
}
}
// Changes port of the app hosted inside the container
if appTemplate.AppPort != 0 && len(appTemplate.Snapshot) == 0 {
err = container.SetAppPort(appTemplate.AppPort)
if err != nil {
return fmt.Errorf("failed to change app port to %d: %v", appTemplate.AppPort, err)
}
err = container.RestartProcess("nginx")
if err != nil {
return fmt.Errorf("failed to restart nginx: %v", err)
}
}
return nil
}
// Register registers app without creating a container for it
// Returns app name and an error
func (p *Processor) Register(appTemplate apps.App) error {
processor := p.getAppProcessor()
err := processor.New(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory)
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
return fmt.Errorf("validation error: %v", validationError.Error())
}
return err
}
return nil
}
// Update updates application
func (p *Processor) Update(appTemplate apps.App) error {
if appTemplate.EnvRaw == nil {
appTemplate.EnvRaw = make(map[string]interface{})
}
processor := p.getAppProcessor()
app, err := processor.Update(appTemplate.Name, appTemplate.SSHPort, appTemplate.HTTPPort, appTemplate.Image, appTemplate.CPU, appTemplate.Memory, appTemplate.GetEnv())
if err != nil {
if validationError, ok := err.(apps.ValidationError); ok {
return fmt.Errorf("validation error: %v", validationError.Error())
}
return err
}
container := docker.Container{
App: app,
DockerSock: p.DockerSock,
BindIPHTTP: p.BindIPHTTP,
BindIPSSH: p.BindIPSSH,
AppsPath: p.AppsPath,
}
err = container.Destroy()
if err != nil && err.Error() == "no container found" {
// We don't care if the container didn't exist anyway
err = nil
}
if err != nil {
return err
}
err = container.Create()
if err != nil {
return err
}
err = container.Start()
if err != nil {
return err
}
// Setup technology if it's noted in the request
if len(appTemplate.Setup.Tech) > 0 {
err := p.waitForApp()
if err != nil {
return err
}
err = p.EnableTech(appTemplate.Setup.Tech, appTemplate.Setup.TechVersion)
if err != nil {
return fmt.Errorf("failed to enable tech: %v", err)
}
// We restart the container so everything can use the new tech
err = container.Restart()
if err != nil {
return err
}
}
return nil
}
// Delete removes app from the system
func (p *Processor) Delete() error {
processor := p.getAppProcessor()
container, err := p.getContainer()
if err != nil {
log.Println("ERROR: delete app:", err.Error())
return err
}
status, err := container.Status()
if err != nil {
return err
}
if status.Status != "no-container" {
// We stop the container first
err = container.Stop()
if err != nil {
return err
}
// Then delete it
err = container.Delete()
if err != nil {
return err
}
}
err = processor.Delete(p.AppName)
if err != nil {
return err
}
return nil
}
// Stop stops app
func (p *Processor) Stop() error {
container, err := p.getContainer()
if err != nil {
return err
}
status, err := container.Status()
if err != nil {
return err
}
// Stop the container only when it exists
if status.Status != "no-container" {
err = container.Stop()
if err != nil {
return err
}
err = container.Destroy()
if err != nil {
return err
}
}
return nil
}
// Start starts app
func (p *Processor) Start() error {
container, err := p.getContainer()
if err != nil {
return err
}
status, err := container.Status()
if err != nil {
return err
}
if status.Status == "no-container" {
err = container.Create()
if err != nil {
return err
}
}
err = container.Start()
if err != nil {
return err
}
return nil
}
// Restart restarts app
func (p *Processor) Restart() error {
container, err := p.getContainer()
if err != nil {
return err
}
err = container.Restart()
if err != nil {
return err
}
return nil
}
// UpdateKeys uploads SSH keys into the container
// keys parameters is just a string, one key per line
func (p *Processor) UpdateKeys(keys string) error {
err := p.waitForApp()
if err != nil {
return err
}
container, err := p.getContainer()
if err != nil {
return err
}
log.Println("Storing keys into " + sshPubKeysLocation)
err = container.AppendFile(sshPubKeysLocation, keys+"\n", "0600")
if err != nil {
return err
}
return nil
}
// SetPassword sets up a new password to access the SSH user
func (p *Processor) SetPassword(password string) error {
err := p.waitForApp()
if err != nil {
return err
}
container, err := p.getContainer()
if err != nil {
return err
}
err = container.SetPassword(password)
if err != nil {
return err
}
return nil
}
// Generate SSH key and adds it into authorized_keys
// These pair of keys is used for deployment.
// Returns private key, pubkey and error.
// Keys are returned every time even if it was already generated
func (p *Processor) GenerateDeploySSHKeys() (string, string, error) {
container, err := p.getContainer()
if err != nil {
return "", "", err
}
// If the container is not running we skip this code
status, err := container.Status()
if err != nil {
return "", "", err
}
if status.Status != "running" {
return "", "", nil
}
created, err := container.GenerateDeploySSHKeys()
if err != nil {
return "", "", err
}
privateKey, pubKey, err := container.GetDeploySSHKeys()
if err != nil {
return "", "", err
}
if created {
err = container.AppendFile(sshPubKeysLocation, pubKey+"\n", "0600")
if err != nil {
return "", "", err
}
}
return privateKey, pubKey, nil
}
// Return SSH host key without hostname (first part of the line)
func (p *Processor) GetHostKey() (string, error) {
container, err := p.getContainer()
if err != nil {
return "", err
}
// If the container is not running we skip this code
status, err := container.Status()
if err != nil {
return "", err
}
if status.Status != "running" {
return "", nil
}
hostKey, err := container.GetHostKey()
if err != nil {
return "", err
}
return hostKey, nil
}
// Save meta data about app into a file
func (p *Processor) SaveMetadata(metadata string) error {
container, err := p.getContainer()
if err != nil {
return err
}
volumePath := container.VolumeHostPath()
f, err := os.Create(path.Join(volumePath, ".metadata.json"))
if err != nil {
return err
}
defer f.Close()
_, err = f.Write([]byte(metadata))
if err != nil {
return err
}
// Set permissions
err = os.Chmod(path.Join(volumePath, ".metadata.json"), 0600)
if err != nil {
return err
}
// Set owner
err = os.Chown(path.Join(volumePath, ".metadata.json"), ownerUID, ownerGID)
if err != nil {
return err
}
return nil
}
// Processes returns list of supervisord processes
func (p *Processor) Processes() ([]docker.Process, error) {
container, err := p.getContainer()
if err != nil {
return nil, err
}
processes, err := container.GetProcessList()
if err != nil {
return nil, err
}
return processes, nil
}
// EnableTech sets up runtime for new tech or new version a tech
func (p *Processor) EnableTech(service, version string) error {
err := p.waitForApp()
if err != nil {
return err
}
time.Sleep(ENABLE_TECH_WAIT * time.Second)
container, err := p.getContainer()
if err != nil {
return err
}
err = container.SetTechnology(service, version)
if err != nil {
return err
}
return nil
}
// Rebuild recreates container for app
func (p *Processor) Rebuild() error {
container, err := p.getContainer()
if err != nil {
return err
}
err = container.Destroy()
if err != nil && !strings.Contains(err.Error(), "no container found") {
return err
}
err = container.Create()
if err != nil {
return err
}
err = container.Start()
if err != nil {
return err
}
return nil
}
// AddLabel adds a label to the app
func (p *Processor) AddLabel(label string) error {
appProcessor := p.getAppProcessor()
err := appProcessor.AddLabel(p.AppName, label)
if err != nil {
return err
}
return nil
}
// RemoveLabel removes a label from the app
func (p *Processor) RemoveLabel(label string) error {
appProcessor := p.getAppProcessor()
err := appProcessor.RemoveLabel(p.AppName, label)
if err != nil {
return err
}
return nil
}
// GetNote returns node's info
func (p *Processor) GetNode() (*node.Node, error) {
node, err := p.NodeProcessor.GetNodeInfo()
if err != nil {
return nil, err
}
return node, nil
}
// CreateSnapshot creates a snapshot of given app
func (p *Processor) CreateSnapshot(labels []string) error {
_, err := p.SnapshotProcessor.CreateSnapshot(p.AppName, labels)
return err
}
// RestoreFromSnapshot restores app from given snapshot
func (p *Processor) RestoreFromSnapshot(snapshotName string) error {
container, err := p.getContainer()
// Stop the container
status, err := container.Status()
if err != nil {
return err
}
// Stop the container only when it exists
if status.Status != "no-container" {
err = container.Stop()
if err != nil {
return err
}
}
// Restore the data
err = p.SnapshotProcessor.RestoreSnapshot(snapshotName, p.AppName)
if err != nil {
return err
}
// Start the container
status, err = container.Status()
if err != nil {
return err
}
if status.Status == "no-container" {
err = container.Create()
if err != nil {
return err
}
}
err = container.Start()
if err != nil {
return err
}
return nil
}
// ListSnapshots returns list of snapshots
func (p *Processor) ListSnapshots() (SnapshotsMetadata, error) {
snapshots, err := p.SnapshotProcessor.ListAppSnapshots(p.AppName)
if err != nil {
return SnapshotsMetadata{}, err
}
var output SnapshotsMetadata
for _, snapshot := range snapshots {
output = append(output, SnapshotMetadata{
Key: snapshot.KeyName(p.SnapshotProcessor.IndexLabel),
Metadata: snapshot,
})
}
return output, nil
}
// ListAppsSnapshots returns list of snapshots for given app list
func (p *Processor) ListAppsSnapshots(appNames []string) (SnapshotsMetadata, error) {
snapshots, err := p.SnapshotProcessor.ListAppsSnapshots(appNames)
if err != nil {
return SnapshotsMetadata{}, err
}
var output SnapshotsMetadata
for _, snapshot := range snapshots {
output = append(output, SnapshotMetadata{
Key: snapshot.KeyName(p.SnapshotProcessor.IndexLabel),
Metadata: snapshot,
})
}
return output, nil
}
// ListSnapshotsByLabel returns list of snapshots by given label
func (p *Processor) ListSnapshotsByLabel(label string) (SnapshotsMetadata, error) {
snapshots, err := p.SnapshotProcessor.ListAppsSnapshotsByLabel(label)
if err != nil {
return SnapshotsMetadata{}, err
}
var output SnapshotsMetadata
for _, snapshot := range snapshots {
output = append(output, SnapshotMetadata{
Key: snapshot.KeyName(p.SnapshotProcessor.IndexLabel),
Metadata: snapshot,
})
}
return output, nil
}
// GetSnapshot returns info about a single snapshot
func (p *Processor) GetSnapshot(snapshotName string) (SnapshotMetadata, error) {
snapshot, err := p.SnapshotProcessor.GetSnapshot(snapshotName)
if err != nil {
return SnapshotMetadata{}, err
}
output := SnapshotMetadata{
Key: snapshot.KeyName(p.SnapshotProcessor.IndexLabel),
Metadata: snapshot,
}
return output, nil
}
// GetSnapshotDownloadLink return download link for a snapshot
func (p *Processor) GetSnapshotDownloadLink(snapshotName string) (string, error) {
link, err := p.SnapshotProcessor.GetDownloadLink(snapshotName)
if err != nil {
return "", err
}
return link, nil
}
// DeleteSnapshot deletes a snapshot
func (p *Processor) DeleteSnapshot(snapshotName string) error {
err := p.SnapshotProcessor.DeleteSnapshot(snapshotName)
if err != nil {
return err
}
return nil
}
// DeleteAppSnapshots deletes all snapshot of given app
func (p *Processor) DeleteAppSnapshots() error {
err := p.SnapshotProcessor.DeleteAppSnapshots(p.AppName)
if err != nil {
return err
}
return nil
}
// Returns active technology in the app
func (p *Processor) GetActiveTech() (*containers.TechInfo, error) {
container, err := p.getContainer()
if err != nil {
return nil, err
}
tech, err := container.GetActiveTech()
if err != nil {
return tech, err
}
return tech, nil
}