468 lines
12 KiB
Go
468 lines
12 KiB
Go
![]() |
package simulation
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/base64"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"net/http/httputil"
|
||
|
"net/url"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/docker/api/types"
|
||
|
"github.com/docker/docker/client"
|
||
|
"github.com/ethereum/go-ethereum/p2p"
|
||
|
"github.com/ethereum/go-ethereum/rpc"
|
||
|
"github.com/ethersphere/swarm"
|
||
|
"github.com/ethersphere/swarm/log"
|
||
|
v1 "k8s.io/api/core/v1"
|
||
|
"k8s.io/client-go/kubernetes"
|
||
|
"k8s.io/client-go/rest"
|
||
|
"k8s.io/client-go/tools/clientcmd"
|
||
|
|
||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
|
)
|
||
|
|
||
|
// KubernetesAdapter can manage nodes on a kubernetes cluster
|
||
|
type KubernetesAdapter struct {
|
||
|
client *kubernetes.Clientset
|
||
|
config KubernetesAdapterConfig
|
||
|
image string
|
||
|
proxy string
|
||
|
}
|
||
|
|
||
|
// KubernetesAdapterConfig is the configuration provided to a KubernetesAdapter
|
||
|
type KubernetesAdapterConfig struct {
|
||
|
// KubeConfigPath is the path to your kubernetes configuration path
|
||
|
KubeConfigPath string `json:"kubeConfigPath"`
|
||
|
// Namespace is the kubernetes namespaces where the pods should be running
|
||
|
Namespace string `json:"namespace"`
|
||
|
// BuildContext can be used to build a docker image
|
||
|
// from a Dockerfile and a context directory
|
||
|
BuildContext *KubernetesBuildContext `json:"build,omitempty"`
|
||
|
// DockerImage points to an existing docker image
|
||
|
// e.g. ethersphere/swarm:latest
|
||
|
DockerImage string `json:"image,omitempty"`
|
||
|
}
|
||
|
|
||
|
// KubernetesBuildContext defines the build context to build
|
||
|
// local docker images
|
||
|
type KubernetesBuildContext struct {
|
||
|
// Dockefile is the path to the dockerfile
|
||
|
Dockerfile string `json:"dockerfile"`
|
||
|
// Directory is the directory that will be used
|
||
|
// in the context of a docker build
|
||
|
Directory string `json:"dir"`
|
||
|
// Tag is used to tag the image
|
||
|
Tag string `json:"tag"`
|
||
|
// Registry is the image registry where the image will be pushed to
|
||
|
Registry string `json:"registry"`
|
||
|
// Username is the user used to push the image to the registry
|
||
|
Username string `json:"username"`
|
||
|
// Password is the password of the user that is used to push the image
|
||
|
// to the registry
|
||
|
Password string `json:"-"`
|
||
|
}
|
||
|
|
||
|
// ImageTag is the full image tag, including the registry
|
||
|
func (bc *KubernetesBuildContext) ImageTag() string {
|
||
|
return fmt.Sprintf("%s/%s", bc.Registry, bc.Tag)
|
||
|
}
|
||
|
|
||
|
// DefaultKubernetesAdapterConfig uses the default ~/.kube/config
|
||
|
// to discover the kubernetes clusters. It also uses the "default" namespace.
|
||
|
func DefaultKubernetesAdapterConfig() KubernetesAdapterConfig {
|
||
|
kubeconfig := filepath.Join(homeDir(), ".kube", "config")
|
||
|
return KubernetesAdapterConfig{
|
||
|
KubeConfigPath: kubeconfig,
|
||
|
Namespace: "default",
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// IsKubernetesAvailable checks if a kubernetes configuration file exists
|
||
|
func IsKubernetesAvailable(kubeConfigPath string) bool {
|
||
|
k8scfg, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
|
||
|
if err != nil {
|
||
|
return false
|
||
|
}
|
||
|
_, err = kubernetes.NewForConfig(k8scfg)
|
||
|
return err == nil
|
||
|
}
|
||
|
|
||
|
// NewKubernetesAdapter creates a KubernetesAdpater by receiving a KubernetesAdapterConfig
|
||
|
func NewKubernetesAdapter(config KubernetesAdapterConfig) (*KubernetesAdapter, error) {
|
||
|
if config.DockerImage != "" && config.BuildContext != nil {
|
||
|
return nil, fmt.Errorf("only one can be defined: BuildContext (%v) or DockerImage(%s)",
|
||
|
config.BuildContext, config.DockerImage)
|
||
|
}
|
||
|
|
||
|
if config.DockerImage == "" && config.BuildContext == nil {
|
||
|
return nil, errors.New("required: Dockerfile or DockerImage")
|
||
|
}
|
||
|
|
||
|
// Define k8s client configuration
|
||
|
k8scfg, err := clientcmd.BuildConfigFromFlags("", config.KubeConfigPath)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("could not start k8s client from config: %v", err)
|
||
|
|
||
|
}
|
||
|
|
||
|
// Create the clientset
|
||
|
clientset, err := kubernetes.NewForConfig(k8scfg)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("could not create clientset: %v", err)
|
||
|
}
|
||
|
|
||
|
// Figure out which docker image should be used
|
||
|
image := config.DockerImage
|
||
|
|
||
|
// Build and push container image
|
||
|
if config.BuildContext != nil {
|
||
|
var err error
|
||
|
// Build image
|
||
|
image, err = buildImage(DockerBuildContext{
|
||
|
Dockerfile: config.BuildContext.Dockerfile,
|
||
|
Directory: config.BuildContext.Directory,
|
||
|
Tag: config.BuildContext.ImageTag(),
|
||
|
}, DefaultDockerAdapterConfig().DaemonAddr)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("could not build the docker image: %v", err)
|
||
|
}
|
||
|
|
||
|
// Push image
|
||
|
dockerClient, err := client.NewClientWithOpts(
|
||
|
client.WithHost(client.DefaultDockerHost),
|
||
|
client.WithAPIVersionNegotiation(),
|
||
|
)
|
||
|
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("could not create the docker client: %v", err)
|
||
|
}
|
||
|
|
||
|
authConfig := types.AuthConfig{
|
||
|
Username: config.BuildContext.Username,
|
||
|
Password: config.BuildContext.Password,
|
||
|
}
|
||
|
encodedJSON, err := json.Marshal(authConfig)
|
||
|
if err != nil {
|
||
|
return nil, errors.New("failed marshaling the authentication parameters")
|
||
|
}
|
||
|
authStr := base64.URLEncoding.EncodeToString(encodedJSON)
|
||
|
|
||
|
out, err := dockerClient.ImagePush(
|
||
|
context.Background(),
|
||
|
config.BuildContext.ImageTag(),
|
||
|
types.ImagePushOptions{
|
||
|
RegistryAuth: authStr,
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("failed to push image: %v", err)
|
||
|
}
|
||
|
defer out.Close()
|
||
|
if _, err := io.Copy(os.Stdout, out); err != nil && err != io.EOF {
|
||
|
log.Error("Error pushing docker image", "err", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Setup proxy to access pods
|
||
|
server, err := newProxyServer(k8scfg)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("failed to create proxy: %v", err)
|
||
|
}
|
||
|
|
||
|
l, err := server.Listen("127.0.0.1", 0)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("failed to start proxy: %v", err)
|
||
|
}
|
||
|
go func() {
|
||
|
if err := server.ServeOnListener(l); err != nil {
|
||
|
log.Error("Kubernetes dapater proxy failed:", "err", err.Error())
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Return adapter
|
||
|
return &KubernetesAdapter{
|
||
|
client: clientset,
|
||
|
image: image,
|
||
|
config: config,
|
||
|
proxy: l.Addr().String(),
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// NewNode creates a new node
|
||
|
func (a KubernetesAdapter) NewNode(config NodeConfig) Node {
|
||
|
info := NodeInfo{
|
||
|
ID: config.ID,
|
||
|
}
|
||
|
node := &KubernetesNode{
|
||
|
config: config,
|
||
|
adapter: &a,
|
||
|
info: info,
|
||
|
}
|
||
|
return node
|
||
|
}
|
||
|
|
||
|
// Snapshot returns a snapshot of the Adapter
|
||
|
func (a KubernetesAdapter) Snapshot() AdapterSnapshot {
|
||
|
return AdapterSnapshot{
|
||
|
Type: "kubernetes",
|
||
|
Config: a.config,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// KubernetesNode is a node that was started via the KubernetesAdapter
|
||
|
type KubernetesNode struct {
|
||
|
config NodeConfig
|
||
|
adapter *KubernetesAdapter
|
||
|
info NodeInfo
|
||
|
}
|
||
|
|
||
|
// Info returns the node info
|
||
|
func (n *KubernetesNode) Info() NodeInfo {
|
||
|
return n.info
|
||
|
}
|
||
|
|
||
|
// Start starts the node
|
||
|
func (n *KubernetesNode) Start() error {
|
||
|
// Define arguments
|
||
|
args := []string{}
|
||
|
|
||
|
// Append user defined arguments
|
||
|
args = append(args, n.config.Args...)
|
||
|
|
||
|
// Append network ports arguments
|
||
|
args = append(args, "--pprofport", strconv.Itoa(dockerPProfPort))
|
||
|
args = append(args, "--bzzport", strconv.Itoa(dockerHTTPPort))
|
||
|
args = append(args, "--ws")
|
||
|
// TODO: Can we get the APIs from somewhere instead of hardcoding them here?
|
||
|
args = append(args, "--wsapi", "admin,net,debug,bzz,accounting,hive")
|
||
|
args = append(args, "--wsport", strconv.Itoa(dockerWebsocketPort))
|
||
|
args = append(args, "--wsaddr", "0.0.0.0")
|
||
|
args = append(args, "--wsorigins", "*")
|
||
|
args = append(args, "--port", strconv.Itoa(dockerP2PPort))
|
||
|
args = append(args, "--nat", "ip:$(POD_IP)")
|
||
|
|
||
|
// Build environment variables
|
||
|
env := []v1.EnvVar{
|
||
|
{
|
||
|
// POD_IP is useful for setting the NAT config: e.g. `--nat ip:$POD_IP`
|
||
|
Name: "POD_IP",
|
||
|
ValueFrom: &v1.EnvVarSource{
|
||
|
FieldRef: &v1.ObjectFieldSelector{
|
||
|
FieldPath: "status.podIP",
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
for _, e := range n.config.Env {
|
||
|
var name, value string
|
||
|
s := strings.SplitN(e, "=", 1)
|
||
|
name = s[0]
|
||
|
if len(s) > 1 {
|
||
|
value = s[1]
|
||
|
}
|
||
|
env = append(env, v1.EnvVar{
|
||
|
Name: name,
|
||
|
Value: value,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
adapter := n.adapter
|
||
|
|
||
|
// Create Kubernetes Pod
|
||
|
podRequest := &v1.Pod{
|
||
|
ObjectMeta: metav1.ObjectMeta{
|
||
|
Name: n.podName(),
|
||
|
Labels: map[string]string{
|
||
|
"app": "simulation",
|
||
|
},
|
||
|
},
|
||
|
Spec: v1.PodSpec{
|
||
|
Containers: []v1.Container{
|
||
|
{
|
||
|
Name: n.podName(),
|
||
|
Image: adapter.image,
|
||
|
Args: args,
|
||
|
Env: env,
|
||
|
Resources: v1.ResourceRequirements{
|
||
|
Limits: v1.ResourceList{
|
||
|
v1.ResourceMemory: resource.MustParse("400Mi"),
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
pod, err := adapter.client.CoreV1().Pods(adapter.config.Namespace).Create(podRequest)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to create pod: %v", err)
|
||
|
}
|
||
|
|
||
|
// Wait for pod
|
||
|
start := time.Now()
|
||
|
for {
|
||
|
log.Debug("Waiting for pod", "pod", n.podName())
|
||
|
pod, err := adapter.client.CoreV1().Pods(adapter.config.Namespace).Get(n.podName(), metav1.GetOptions{})
|
||
|
if err != nil {
|
||
|
time.Sleep(100 * time.Millisecond)
|
||
|
continue
|
||
|
}
|
||
|
if pod.Status.Phase == v1.PodRunning {
|
||
|
break
|
||
|
}
|
||
|
if time.Since(start) > 5*time.Minute {
|
||
|
return errors.New("timeout waiting for pod")
|
||
|
}
|
||
|
time.Sleep(500 * time.Millisecond)
|
||
|
}
|
||
|
|
||
|
// Get logs
|
||
|
logOpts := &v1.PodLogOptions{
|
||
|
Container: n.podName(),
|
||
|
Follow: true,
|
||
|
Previous: false,
|
||
|
}
|
||
|
req := adapter.client.CoreV1().Pods(adapter.config.Namespace).GetLogs(n.podName(), logOpts)
|
||
|
readCloser, err := req.Stream()
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("could not get logs: %v", err)
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
defer readCloser.Close()
|
||
|
if _, err := io.Copy(n.config.Stderr, readCloser); err != nil && err != io.EOF {
|
||
|
log.Error("Error writing pod logs", "pod", pod.Name, "err", err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Wait for the node to start
|
||
|
var client *rpc.Client
|
||
|
wsAddr := fmt.Sprintf("ws://%s/api/v1/namespaces/%s/pods/%s:%d/proxy",
|
||
|
adapter.proxy, adapter.config.Namespace, n.podName(), dockerWebsocketPort)
|
||
|
|
||
|
for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(50 * time.Millisecond) {
|
||
|
client, err = rpc.Dial(wsAddr)
|
||
|
if err == nil {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if client == nil {
|
||
|
return fmt.Errorf("could not establish rpc connection. node %s: %v", n.config.ID, err)
|
||
|
}
|
||
|
defer client.Close()
|
||
|
|
||
|
var swarminfo swarm.Info
|
||
|
err = client.Call(&swarminfo, "bzz_info")
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("could not get info via rpc call. node %s: %v", n.config.ID, err)
|
||
|
}
|
||
|
|
||
|
var p2pinfo p2p.NodeInfo
|
||
|
err = client.Call(&p2pinfo, "admin_nodeInfo")
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("could not get info via rpc call. node %s: %v", n.config.ID, err)
|
||
|
}
|
||
|
|
||
|
n.info = NodeInfo{
|
||
|
ID: n.config.ID,
|
||
|
Enode: p2pinfo.Enode,
|
||
|
BzzAddr: swarminfo.BzzKey,
|
||
|
RPCListen: wsAddr,
|
||
|
HTTPListen: fmt.Sprintf("http://%s/api/v1/namespaces/%s/pods/%s:%d/proxy",
|
||
|
adapter.proxy, adapter.config.Namespace, n.podName(), dockerHTTPPort),
|
||
|
PprofListen: fmt.Sprintf("http://%s/api/v1/namespaces/%s/pods/%s:%d/proxy",
|
||
|
adapter.proxy, adapter.config.Namespace, n.podName(), dockerPProfPort),
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Stop stops the node
|
||
|
func (n *KubernetesNode) Stop() error {
|
||
|
adapter := n.adapter
|
||
|
|
||
|
gracePeriod := int64(30)
|
||
|
|
||
|
deleteOpts := &metav1.DeleteOptions{
|
||
|
GracePeriodSeconds: &gracePeriod,
|
||
|
}
|
||
|
err := adapter.client.CoreV1().Pods(adapter.config.Namespace).Delete(n.podName(), deleteOpts)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("could not delete pod: %v", err)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Snapshot returns a snapshot of the node
|
||
|
func (n *KubernetesNode) Snapshot() (NodeSnapshot, error) {
|
||
|
snap := NodeSnapshot{
|
||
|
Config: n.config,
|
||
|
}
|
||
|
adapterSnap := n.adapter.Snapshot()
|
||
|
snap.Adapter = &adapterSnap
|
||
|
return snap, nil
|
||
|
}
|
||
|
|
||
|
func (n *KubernetesNode) podName() string {
|
||
|
return fmt.Sprintf("sim-k8s-%s", n.config.ID)
|
||
|
}
|
||
|
|
||
|
func homeDir() string {
|
||
|
if h := os.Getenv("HOME"); h != "" {
|
||
|
return h
|
||
|
}
|
||
|
return os.Getenv("USERPROFILE") // windows
|
||
|
}
|
||
|
|
||
|
// proxyServer is a http.Handler which proxies Kubernetes APIs to remote API server.
|
||
|
type proxyServer struct {
|
||
|
handler http.Handler
|
||
|
}
|
||
|
|
||
|
// Listen is a simple wrapper around net.Listen.
|
||
|
func (s *proxyServer) Listen(address string, port int) (net.Listener, error) {
|
||
|
return net.Listen("tcp", fmt.Sprintf("%s:%d", address, port))
|
||
|
}
|
||
|
|
||
|
// ServeOnListener starts the server using given listener, loops forever.
|
||
|
func (s *proxyServer) ServeOnListener(l net.Listener) error {
|
||
|
server := http.Server{
|
||
|
Handler: s.handler,
|
||
|
}
|
||
|
return server.Serve(l)
|
||
|
}
|
||
|
|
||
|
func (s *proxyServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||
|
s.handler.ServeHTTP(rw, req)
|
||
|
}
|
||
|
|
||
|
// newProxyServer creates a proxy server that can be used to proxy to the kubernetes API
|
||
|
func newProxyServer(cfg *rest.Config) (*proxyServer, error) {
|
||
|
target, err := url.Parse(cfg.Host)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
proxy := httputil.NewSingleHostReverseProxy(target)
|
||
|
|
||
|
transport, err := rest.TransportFor(cfg)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
proxy.Transport = transport
|
||
|
|
||
|
return &proxyServer{
|
||
|
handler: proxy,
|
||
|
}, nil
|
||
|
}
|