PoC: Network simulation framework (#1555)

* simv2: wip

* simulation: exec adapter start/stop

* simulation: add node status to exec adapter

* simulation: initial simulation code

* simulation: exec adapter, configure path to executable

* simulation: initial docker adapter

* simulation: wip kubernetes adapter

* simulation: kubernetes adapter proxy

* simulation: implement GetAll/StartAll/StopAll

* simulation: kuberentes adapter - set env vars and resource limits

* simulation: discovery test

* simulation: remove port definitions within docker adapter

* simulation: simplify wait for healthy loop

* simulation: get nat ip addr from interface

* simulation: pull docker images automatically

* simulation: NodeStatus -> NodeInfo

* simulation: move discovery test to example dir

* simulation: example snapshot usage

* simulation: add goclient specific simulation

* simulation: add peer connections to snapshot

* simulation: close rpc client

* simulation: don't export kubernetes proxy server

* simulation: merge simulation code

* simulation: don't export nodemap

* simulation: rename SimulationSnapshot -> Snapshot

* simulation: linting fixes

* simulation: add k8s available helper func

* simulation: vendor

* simulation: fix 'no non-test Go files' when building

* simulation: remove errors from interface methods where non were returned

* simulation: run getHealthInfo check in parallel
This commit is contained in:
Rafael Matias
2019-07-24 17:00:13 +02:00
committed by GitHub
parent 9720da34db
commit 388d8ccd9f
1570 changed files with 460774 additions and 3203 deletions

467
simulation/kubernetes.go Normal file
View File

@@ -0,0 +1,467 @@
package simulation
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm"
"github.com/ethersphere/swarm/log"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// KubernetesAdapter can manage nodes on a kubernetes cluster
type KubernetesAdapter struct {
client *kubernetes.Clientset
config KubernetesAdapterConfig
image string
proxy string
}
// KubernetesAdapterConfig is the configuration provided to a KubernetesAdapter
type KubernetesAdapterConfig struct {
// KubeConfigPath is the path to your kubernetes configuration path
KubeConfigPath string `json:"kubeConfigPath"`
// Namespace is the kubernetes namespaces where the pods should be running
Namespace string `json:"namespace"`
// BuildContext can be used to build a docker image
// from a Dockerfile and a context directory
BuildContext *KubernetesBuildContext `json:"build,omitempty"`
// DockerImage points to an existing docker image
// e.g. ethersphere/swarm:latest
DockerImage string `json:"image,omitempty"`
}
// KubernetesBuildContext defines the build context to build
// local docker images
type KubernetesBuildContext struct {
// Dockefile is the path to the dockerfile
Dockerfile string `json:"dockerfile"`
// Directory is the directory that will be used
// in the context of a docker build
Directory string `json:"dir"`
// Tag is used to tag the image
Tag string `json:"tag"`
// Registry is the image registry where the image will be pushed to
Registry string `json:"registry"`
// Username is the user used to push the image to the registry
Username string `json:"username"`
// Password is the password of the user that is used to push the image
// to the registry
Password string `json:"-"`
}
// ImageTag is the full image tag, including the registry
func (bc *KubernetesBuildContext) ImageTag() string {
return fmt.Sprintf("%s/%s", bc.Registry, bc.Tag)
}
// DefaultKubernetesAdapterConfig uses the default ~/.kube/config
// to discover the kubernetes clusters. It also uses the "default" namespace.
func DefaultKubernetesAdapterConfig() KubernetesAdapterConfig {
kubeconfig := filepath.Join(homeDir(), ".kube", "config")
return KubernetesAdapterConfig{
KubeConfigPath: kubeconfig,
Namespace: "default",
}
}
// IsKubernetesAvailable checks if a kubernetes configuration file exists
func IsKubernetesAvailable(kubeConfigPath string) bool {
k8scfg, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return false
}
_, err = kubernetes.NewForConfig(k8scfg)
return err == nil
}
// NewKubernetesAdapter creates a KubernetesAdpater by receiving a KubernetesAdapterConfig
func NewKubernetesAdapter(config KubernetesAdapterConfig) (*KubernetesAdapter, error) {
if config.DockerImage != "" && config.BuildContext != nil {
return nil, fmt.Errorf("only one can be defined: BuildContext (%v) or DockerImage(%s)",
config.BuildContext, config.DockerImage)
}
if config.DockerImage == "" && config.BuildContext == nil {
return nil, errors.New("required: Dockerfile or DockerImage")
}
// Define k8s client configuration
k8scfg, err := clientcmd.BuildConfigFromFlags("", config.KubeConfigPath)
if err != nil {
return nil, fmt.Errorf("could not start k8s client from config: %v", err)
}
// Create the clientset
clientset, err := kubernetes.NewForConfig(k8scfg)
if err != nil {
return nil, fmt.Errorf("could not create clientset: %v", err)
}
// Figure out which docker image should be used
image := config.DockerImage
// Build and push container image
if config.BuildContext != nil {
var err error
// Build image
image, err = buildImage(DockerBuildContext{
Dockerfile: config.BuildContext.Dockerfile,
Directory: config.BuildContext.Directory,
Tag: config.BuildContext.ImageTag(),
}, DefaultDockerAdapterConfig().DaemonAddr)
if err != nil {
return nil, fmt.Errorf("could not build the docker image: %v", err)
}
// Push image
dockerClient, err := client.NewClientWithOpts(
client.WithHost(client.DefaultDockerHost),
client.WithAPIVersionNegotiation(),
)
if err != nil {
return nil, fmt.Errorf("could not create the docker client: %v", err)
}
authConfig := types.AuthConfig{
Username: config.BuildContext.Username,
Password: config.BuildContext.Password,
}
encodedJSON, err := json.Marshal(authConfig)
if err != nil {
return nil, errors.New("failed marshaling the authentication parameters")
}
authStr := base64.URLEncoding.EncodeToString(encodedJSON)
out, err := dockerClient.ImagePush(
context.Background(),
config.BuildContext.ImageTag(),
types.ImagePushOptions{
RegistryAuth: authStr,
})
if err != nil {
return nil, fmt.Errorf("failed to push image: %v", err)
}
defer out.Close()
if _, err := io.Copy(os.Stdout, out); err != nil && err != io.EOF {
log.Error("Error pushing docker image", "err", err)
}
}
// Setup proxy to access pods
server, err := newProxyServer(k8scfg)
if err != nil {
return nil, fmt.Errorf("failed to create proxy: %v", err)
}
l, err := server.Listen("127.0.0.1", 0)
if err != nil {
return nil, fmt.Errorf("failed to start proxy: %v", err)
}
go func() {
if err := server.ServeOnListener(l); err != nil {
log.Error("Kubernetes dapater proxy failed:", "err", err.Error())
}
}()
// Return adapter
return &KubernetesAdapter{
client: clientset,
image: image,
config: config,
proxy: l.Addr().String(),
}, nil
}
// NewNode creates a new node
func (a KubernetesAdapter) NewNode(config NodeConfig) Node {
info := NodeInfo{
ID: config.ID,
}
node := &KubernetesNode{
config: config,
adapter: &a,
info: info,
}
return node
}
// Snapshot returns a snapshot of the Adapter
func (a KubernetesAdapter) Snapshot() AdapterSnapshot {
return AdapterSnapshot{
Type: "kubernetes",
Config: a.config,
}
}
// KubernetesNode is a node that was started via the KubernetesAdapter
type KubernetesNode struct {
config NodeConfig
adapter *KubernetesAdapter
info NodeInfo
}
// Info returns the node info
func (n *KubernetesNode) Info() NodeInfo {
return n.info
}
// Start starts the node
func (n *KubernetesNode) Start() error {
// Define arguments
args := []string{}
// Append user defined arguments
args = append(args, n.config.Args...)
// Append network ports arguments
args = append(args, "--pprofport", strconv.Itoa(dockerPProfPort))
args = append(args, "--bzzport", strconv.Itoa(dockerHTTPPort))
args = append(args, "--ws")
// TODO: Can we get the APIs from somewhere instead of hardcoding them here?
args = append(args, "--wsapi", "admin,net,debug,bzz,accounting,hive")
args = append(args, "--wsport", strconv.Itoa(dockerWebsocketPort))
args = append(args, "--wsaddr", "0.0.0.0")
args = append(args, "--wsorigins", "*")
args = append(args, "--port", strconv.Itoa(dockerP2PPort))
args = append(args, "--nat", "ip:$(POD_IP)")
// Build environment variables
env := []v1.EnvVar{
{
// POD_IP is useful for setting the NAT config: e.g. `--nat ip:$POD_IP`
Name: "POD_IP",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
}
for _, e := range n.config.Env {
var name, value string
s := strings.SplitN(e, "=", 1)
name = s[0]
if len(s) > 1 {
value = s[1]
}
env = append(env, v1.EnvVar{
Name: name,
Value: value,
})
}
adapter := n.adapter
// Create Kubernetes Pod
podRequest := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: n.podName(),
Labels: map[string]string{
"app": "simulation",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: n.podName(),
Image: adapter.image,
Args: args,
Env: env,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("400Mi"),
},
},
},
},
},
}
pod, err := adapter.client.CoreV1().Pods(adapter.config.Namespace).Create(podRequest)
if err != nil {
return fmt.Errorf("failed to create pod: %v", err)
}
// Wait for pod
start := time.Now()
for {
log.Debug("Waiting for pod", "pod", n.podName())
pod, err := adapter.client.CoreV1().Pods(adapter.config.Namespace).Get(n.podName(), metav1.GetOptions{})
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
if pod.Status.Phase == v1.PodRunning {
break
}
if time.Since(start) > 5*time.Minute {
return errors.New("timeout waiting for pod")
}
time.Sleep(500 * time.Millisecond)
}
// Get logs
logOpts := &v1.PodLogOptions{
Container: n.podName(),
Follow: true,
Previous: false,
}
req := adapter.client.CoreV1().Pods(adapter.config.Namespace).GetLogs(n.podName(), logOpts)
readCloser, err := req.Stream()
if err != nil {
return fmt.Errorf("could not get logs: %v", err)
}
go func() {
defer readCloser.Close()
if _, err := io.Copy(n.config.Stderr, readCloser); err != nil && err != io.EOF {
log.Error("Error writing pod logs", "pod", pod.Name, "err", err)
}
}()
// Wait for the node to start
var client *rpc.Client
wsAddr := fmt.Sprintf("ws://%s/api/v1/namespaces/%s/pods/%s:%d/proxy",
adapter.proxy, adapter.config.Namespace, n.podName(), dockerWebsocketPort)
for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(50 * time.Millisecond) {
client, err = rpc.Dial(wsAddr)
if err == nil {
break
}
}
if client == nil {
return fmt.Errorf("could not establish rpc connection. node %s: %v", n.config.ID, err)
}
defer client.Close()
var swarminfo swarm.Info
err = client.Call(&swarminfo, "bzz_info")
if err != nil {
return fmt.Errorf("could not get info via rpc call. node %s: %v", n.config.ID, err)
}
var p2pinfo p2p.NodeInfo
err = client.Call(&p2pinfo, "admin_nodeInfo")
if err != nil {
return fmt.Errorf("could not get info via rpc call. node %s: %v", n.config.ID, err)
}
n.info = NodeInfo{
ID: n.config.ID,
Enode: p2pinfo.Enode,
BzzAddr: swarminfo.BzzKey,
RPCListen: wsAddr,
HTTPListen: fmt.Sprintf("http://%s/api/v1/namespaces/%s/pods/%s:%d/proxy",
adapter.proxy, adapter.config.Namespace, n.podName(), dockerHTTPPort),
PprofListen: fmt.Sprintf("http://%s/api/v1/namespaces/%s/pods/%s:%d/proxy",
adapter.proxy, adapter.config.Namespace, n.podName(), dockerPProfPort),
}
return nil
}
// Stop stops the node
func (n *KubernetesNode) Stop() error {
adapter := n.adapter
gracePeriod := int64(30)
deleteOpts := &metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
}
err := adapter.client.CoreV1().Pods(adapter.config.Namespace).Delete(n.podName(), deleteOpts)
if err != nil {
return fmt.Errorf("could not delete pod: %v", err)
}
return nil
}
// Snapshot returns a snapshot of the node
func (n *KubernetesNode) Snapshot() (NodeSnapshot, error) {
snap := NodeSnapshot{
Config: n.config,
}
adapterSnap := n.adapter.Snapshot()
snap.Adapter = &adapterSnap
return snap, nil
}
func (n *KubernetesNode) podName() string {
return fmt.Sprintf("sim-k8s-%s", n.config.ID)
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
// proxyServer is a http.Handler which proxies Kubernetes APIs to remote API server.
type proxyServer struct {
handler http.Handler
}
// Listen is a simple wrapper around net.Listen.
func (s *proxyServer) Listen(address string, port int) (net.Listener, error) {
return net.Listen("tcp", fmt.Sprintf("%s:%d", address, port))
}
// ServeOnListener starts the server using given listener, loops forever.
func (s *proxyServer) ServeOnListener(l net.Listener) error {
server := http.Server{
Handler: s.handler,
}
return server.Serve(l)
}
func (s *proxyServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
s.handler.ServeHTTP(rw, req)
}
// newProxyServer creates a proxy server that can be used to proxy to the kubernetes API
func newProxyServer(cfg *rest.Config) (*proxyServer, error) {
target, err := url.Parse(cfg.Host)
if err != nil {
return nil, err
}
proxy := httputil.NewSingleHostReverseProxy(target)
transport, err := rest.TransportFor(cfg)
if err != nil {
return nil, err
}
proxy.Transport = transport
return &proxyServer{
handler: proxy,
}, nil
}