package containers import ( "context" "encoding/json" "errors" "fmt" "io" "io/ioutil" "log" "os" "strconv" "strings" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" dockerClient "github.com/docker/docker/client" "github.com/docker/go-connections/nat" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/rosti-cz/node-api/detector" ) // Stats delay in seconds const statsDelay = 1 // Docker timeout const dockerTimeout = 10 // DOCKER_API_VERSION set API version of Docker, 1.41 is needed for the platform struct const dockerAPIVersion = "1.41" // Driver keeps everything for connection to Docker type Driver struct { DockerSock string // Docker socket BindIPHTTP string // IP to which containers are bound BindIPSSH string // IP to which containers are bound } func (d *Driver) getClient() (*dockerClient.Client, error) { cli, err := dockerClient.NewClient(d.DockerSock, dockerAPIVersion, nil, nil) if err != nil { return cli, fmt.Errorf("get docker client error: %v", err) } return cli, nil } // ConnectionStatus checks connection to the Docker daemon func (d *Driver) ConnectionStatus() (bool, error) { cli, err := d.getClient() if err != nil { return false, err } defer cli.Close() _, err = cli.ServerVersion(context.TODO()) if err != nil { return false, err } return true, nil } func (d *Driver) nameToID(name string) (string, error) { containerIDs, err := d.IsExist(name) if err != nil { return "", err } if len(containerIDs) == 0 { return "", errors.New("no container found") } if len(containerIDs) > 1 { return "", errors.New("multiple containers with the same name") } return containerIDs[0], nil } // Status return current status of container with given name func (d *Driver) Status(name string) (ContainerStatus, error) { status := ContainerStatus{ Status: "unknown", } cli, err := d.getClient() if err != nil { return status, err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil && err.Error() == "no container found" { status.Status = "no-container" return status, err } if err != nil { return status, err } info, err := cli.ContainerInspect(context.TODO(), containerID) if err != nil { return status, err } status.Status = info.State.Status status.OOMKilled = info.State.OOMKilled return status, nil } // RawStats returns snapshot of current cpu and memory usage, CPU cannot be used // for % usage because it's number of used tics. Call this twice to get the % usage. // One second is 10 000 000 ticks. func (d *Driver) RawStats(name string) (int64, int, error) { cli, err := d.getClient() if err != nil { return 0.0, 0, err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil { return 0.0, 0, err } stats, err := cli.ContainerStats(context.TODO(), containerID, false) if err != nil { return 0.0, 0, err } row := ContainerStats{} data, err := ioutil.ReadAll(stats.Body) if err != nil { return 0.0, 0, err } err = json.Unmarshal(data, &row) if err != nil { return 0.0, 0, err } return row.CPU.Usage.Total, row.Memory.Usage, nil } // Stats returns current CPU and memory usage func (d *Driver) Stats(name string) (float64, int, error) { cli, err := d.getClient() if err != nil { return 0.0, 0, err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil { return 0.0, 0, err } rows := make([]ContainerStats, 2) for idx := range rows { stats, err := cli.ContainerStats(context.TODO(), containerID, false) if err != nil { return 0.0, 0, err } data, err := ioutil.ReadAll(stats.Body) if err != nil { return 0.0, 0, err } // It returns one JSON: // {"read":"2020-07-11T20:42:31.486726241Z","preread":"2020-07-11T20:42:30.484048602Z","pids_stats":{"current":7},"blkio_stats":{"io_service_bytes_recursive":[{"major":253,"minor":0,"op":"Read","value":0},{"major":253,"minor":0,"op":"Write","value":20480},{"major":253,"minor":0,"op":"Sync","value":12288},{"major":253,"minor":0,"op":"Async","value":8192},{"major":253,"minor":0,"op":"Discard","value":0},{"major":253,"minor":0,"op":"Total","value":20480}],"io_serviced_recursive":[{"major":253,"minor":0,"op":"Read","value":0},{"major":253,"minor":0,"op":"Write","value":5},{"major":253,"minor":0,"op":"Sync","value":3},{"major":253,"minor":0,"op":"Async","value":2},{"major":253,"minor":0,"op":"Discard","value":0},{"major":253,"minor":0,"op":"Total","value":5}],"io_queue_recursive":[],"io_service_time_recursive":[],"io_wait_time_recursive":[],"io_merged_recursive":[],"io_time_recursive":[],"sectors_recursive":[]},"num_procs":0,"storage_stats":{},"cpu_stats":{"cpu_usage":{"total_usage":758392753,"percpu_usage":[302688474,0,11507116,124238500,222136766,5656446,3009320,0,19406386,1397028,6201423,62151294,0,0,0,0],"usage_in_kernelmode":100000000,"usage_in_usermode":640000000},"system_cpu_usage":119385810000000,"online_cpus":12,"throttling_data":{"periods":21,"throttled_periods":1,"throttled_time":2995938}},"precpu_stats":{"cpu_usage":{"total_usage":758282347,"percpu_usage":[302688474,0,11507116,124238500,222026360,5656446,3009320,0,19406386,1397028,6201423,62151294,0,0,0,0],"usage_in_kernelmode":100000000,"usage_in_usermode":640000000},"system_cpu_usage":119373720000000,"online_cpus":12,"throttling_data":{"periods":21,"throttled_periods":1,"throttled_time":2995938}},"memory_stats":{"usage":21626880,"max_usage":22630400,"stats":{"active_anon":15949824,"active_file":0,"cache":0,"dirty":0,"hierarchical_memory_limit":144179200,"hierarchical_memsw_limit":288358400,"inactive_anon":0,"inactive_file":0,"mapped_file":0,"pgfault":13167,"pgmajfault":0,"pgpgin":7293,"pgpgout":3406,"rss":15900672,"rss_huge":0,"total_active_anon":15949824,"total_active_file":0,"total_cache":0,"total_dirty":0,"total_inactive_anon":0,"total_inactive_file":0,"total_mapped_file":0,"total_pgfault":13167,"total_pgmajfault":0,"total_pgpgin":7293,"total_pgpgout":3406,"total_rss":15900672,"total_rss_huge":0,"total_unevictable":0,"total_writeback":0,"unevictable":0,"writeback":0},"limit":144179200},"name":"/test_1234","id":"576878d645efecc8e5e2a57b88351f7b5c551e3fc72dc8473fd965d10dfddbec","networks":{"eth0":{"rx_bytes":6150,"rx_packets":37,"rx_errors":0,"rx_dropped":0,"tx_bytes":0,"tx_packets":0,"tx_errors":0,"tx_dropped":0}}} err = json.Unmarshal(data, &rows[idx]) if err != nil { return 0.0, 0, err } if idx == 0 { time.Sleep(statsDelay * time.Second) } } cpuUsage := (float64(rows[1].CPU.Usage.Total) - float64(rows[0].CPU.Usage.Total)) / statsDelay / 10000000 return cpuUsage, rows[1].Memory.Usage, nil } // Remove removes container represented by containerID func (d *Driver) Remove(name string) error { log.Println("Removing container " + name) cli, err := d.getClient() if err != nil { return err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil { return err } timeout := time.Duration(dockerTimeout * time.Second) err = cli.ContainerStop(context.TODO(), containerID, &timeout) if err != nil { return err } err = cli.ContainerRemove(context.TODO(), containerID, types.ContainerRemoveOptions{}) return err } // Start starts container represented by containerID func (d *Driver) Start(name string) error { log.Println("Starting container " + name) cli, err := d.getClient() if err != nil { return err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil { return err } err = cli.ContainerStart(context.TODO(), containerID, types.ContainerStartOptions{}) return err } // Stop stops container represented by containerID func (d *Driver) Stop(name string) error { log.Println("Stopping container " + name) cli, err := d.getClient() if err != nil { return err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil { return err } timeout := time.Duration(dockerTimeout * time.Second) err = cli.ContainerStop(context.TODO(), containerID, &timeout) return err } // IsExist checks existence of the container based on container name // Returns container IDs in case of existence. Otherwise // empty slice. func (d *Driver) IsExist(name string) ([]string, error) { var containerIDs = make([]string, 0) cli, err := d.getClient() if err != nil { return containerIDs, err } defer cli.Close() containers, err := cli.ContainerList(context.TODO(), types.ContainerListOptions{All: true}) if err != nil { return containerIDs, err } // We go through the containers and pick the ones which match the task name for _, containerObject := range containers { for _, containerName := range containerObject.Names { containerName = strings.TrimLeft(containerName, "/") if containerName == name { containerIDs = append(containerIDs, containerObject.ID) } } } return containerIDs, nil } // pullImage pulls image into local docker instance func (d *Driver) pullImage(image string) error { log.Println("Pulling image " + image) cli, err := d.getClient() if err != nil { return err } defer cli.Close() stream, err := cli.ImagePull(context.TODO(), image, types.ImagePullOptions{}) if err != nil { return err } defer stream.Close() io.Copy(os.Stdout, stream) return nil } // Create creates the container // image - docker image // cmd - string slice of command and its arguments // volumePath - host's directory to mount into the container // returns container ID func (d *Driver) Create(name string, image string, volumePath string, HTTPPort int, SSHPort int, CPU int, memory int, cmd []string) (string, error) { log.Println("Creating container " + name) cli, err := d.getClient() if err != nil { return "", err } defer cli.Close() err = d.pullImage(image) if err != nil { return "", err } portBindings := nat.PortMap{ "8000/tcp": []nat.PortBinding{ { HostIP: d.BindIPHTTP, HostPort: strconv.Itoa(HTTPPort), }, }, } if SSHPort != 0 { portBindings["22/tcp"] = []nat.PortBinding{ { HostIP: d.BindIPSSH, HostPort: strconv.Itoa(SSHPort), }, } } createdContainer, err := cli.ContainerCreate( context.Background(), &container.Config{ Hostname: name, Env: []string{}, Image: image, Cmd: cmd, ExposedPorts: nat.PortSet{ nat.Port("22/tcp"): struct{}{}, nat.Port("8000/tcp"): struct{}{}, }, }, &container.HostConfig{ Resources: container.Resources{ CPUPeriod: 100000, CPUQuota: int64(CPU) * 1000, Memory: int64(memory*110/100) * 1024 * 1024, // Allow 10 % more memory so we have space for MemoryReservation MemoryReservation: int64(memory) * 1024 * 1024, // This should provide softer way how to limit the memory of our containers }, PortBindings: portBindings, AutoRemove: false, RestartPolicy: container.RestartPolicy{ Name: "on-failure", MaximumRetryCount: 3, }, Binds: []string{ volumePath + ":/srv", }, }, &network.NetworkingConfig{}, &specs.Platform{ Architecture: "amd64", OS: "linux", }, name, ) if err != nil { return "", err } containerID := createdContainer.ID // I dunno if we want this // err = cli.ContainerStart(context.TODO(), createdContainer.ID, types.ContainerStartOptions{}) return containerID, nil } // Exec runs command cmd with stdin if it's not empty. func (d *Driver) Exec(name string, cmd []string, stdin string, env []string, attachStdout bool) (*[]byte, error) { if len(cmd) == 0 { return &[]byte{}, errors.New("cmd needs at least one string in the slice") } ctx := context.Background() stdinEnabled := false if len(stdin) > 0 { stdinEnabled = true } log.Println("Command running in " + name) cli, err := d.getClient() if err != nil { return &[]byte{}, err } defer cli.Close() containerID, err := d.nameToID(name) if err != nil { return &[]byte{}, err } execOpts := types.ExecConfig{ AttachStdin: stdinEnabled, AttachStdout: attachStdout, AttachStderr: false, Tty: attachStdout, Cmd: cmd, } resp, err := cli.ContainerExecCreate(ctx, containerID, execOpts) if err != nil { return &[]byte{}, err } respAttach, err := cli.ContainerExecAttach(ctx, resp.ID, types.ExecStartCheck{ Detach: false, Tty: true, }) if err != nil { return &[]byte{}, err } defer respAttach.Close() err = cli.ContainerExecStart(ctx, resp.ID, types.ExecStartCheck{}) if err != nil { return &[]byte{}, err } if stdinEnabled { _, err = respAttach.Conn.Write([]byte(stdin)) if err != nil { return &[]byte{}, err } } stdouterr := []byte{} if attachStdout { stdouterr, err = ioutil.ReadAll(respAttach.Reader) } return &stdouterr, err } // GetProcesses return list of processes running under this container func (d *Driver) GetProcesses(name string) ([]string, error) { processes := []string{} ctx := context.Background() cli, err := d.getClient() if err != nil { return processes, err } defer cli.Close() processList, err := cli.ContainerTop(ctx, name, []string{"-eo", "pid,args"}) if err != nil { return processes, fmt.Errorf("docker container top call error: %v", err) } for _, process := range processList.Processes { if len(process) > 0 { // This removes PID from the list. PID has to be printed otherwise docker daemon can't handle it. processes = append(processes, strings.Join(strings.Fields(process[0])[1:], " ")) } } return processes, nil } // GetFlags returns list of flags with problems found in the container, mainly used to detect miners or viruses func (d *Driver) GetFlags(name string) (detector.Flags, error) { processes, err := d.GetProcesses(name) if err != nil { return detector.Flags{}, err } flags, err := detector.Check(processes) if err != nil { return flags, err } return flags, nil }