diff --git a/buildspecs/test-nodeadm.yml b/buildspecs/test-nodeadm.yml index 469b8906..7a3667eb 100644 --- a/buildspecs/test-nodeadm.yml +++ b/buildspecs/test-nodeadm.yml @@ -9,10 +9,14 @@ phases: build: commands: - SANITIZED_CODEBUILD_BUILD_ID=$(echo $CODEBUILD_BUILD_ID | tr ':' '-') - - ./hack/run-e2e.sh $SANITIZED_CODEBUILD_BUILD_ID $AWS_REGION $KUBERNETES_VERSION $CNI s3://$ARTIFACTS_BUCKET/latest-pre/linux/amd64/nodeadm s3://$ARTIFACTS_BUCKET/latest-pre/linux/arm64/nodeadm $LOGS_BUCKET $ENDPOINT + - ./hack/run-e2e.sh $SANITIZED_CODEBUILD_BUILD_ID $AWS_REGION $KUBERNETES_VERSION $CNI s3://$ARTIFACTS_BUCKET/latest-pre/linux/amd64/nodeadm s3://$ARTIFACTS_BUCKET/latest-pre/linux/arm64/nodeadm $LOGS_BUCKET e2e-artifacts $ENDPOINT reports: e2e-reports: files: - e2e-reports/junit-nodeadm.xml file-format: "JUNITXML" + +artifacts: + files: + - "e2e-artifacts/*" diff --git a/go.mod b/go.mod index bb5dbe6e..c75d7f83 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.27 github.com/aws/aws-sdk-go-v2/service/cloudformation v1.56.10 github.com/aws/aws-sdk-go-v2/service/ec2 v1.202.2 + github.com/aws/aws-sdk-go-v2/service/ec2instanceconnect v1.27.12 github.com/aws/aws-sdk-go-v2/service/ecr v1.40.1 github.com/aws/aws-sdk-go-v2/service/eks v1.57.2 github.com/aws/aws-sdk-go-v2/service/iam v1.38.10 @@ -27,6 +28,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/tredoe/osutil v1.5.0 go.uber.org/zap v1.27.0 + golang.org/x/crypto v0.31.0 golang.org/x/mod v0.22.0 k8s.io/apimachinery v0.32.1 k8s.io/client-go v0.32.1 @@ -119,8 +121,8 @@ require ( golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sync v0.10.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/term v0.27.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.7.0 // indirect golang.org/x/tools v0.28.0 // indirect diff --git a/go.sum b/go.sum index 8e9aa4a1..84d32904 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/aws/aws-sdk-go-v2/service/cloudformation v1.56.10 h1:5asGyAEAXc1A5FWC github.com/aws/aws-sdk-go-v2/service/cloudformation v1.56.10/go.mod h1:synt0SxqCGxfepb1DUAMK7TxtRvucnmk80qXbp9Cfcs= github.com/aws/aws-sdk-go-v2/service/ec2 v1.202.2 h1:qas57zkkMX8OM+MVz+4sMaOaD9HRmeFJRb8nzMdYkx0= github.com/aws/aws-sdk-go-v2/service/ec2 v1.202.2/go.mod h1:2omfxRebtpbbFqQGqeurDzlyB7Txa2e1xe9rCDFqlwA= +github.com/aws/aws-sdk-go-v2/service/ec2instanceconnect v1.27.12 h1:UK36TdbN1GB8FwZYTT0ZPlsg7ZqVpMHMcee0b6BYpSY= +github.com/aws/aws-sdk-go-v2/service/ec2instanceconnect v1.27.12/go.mod h1:qvHbeq93l0rsTolEGpOZ12yHcVQ/yfHiqVEmh8AVw1U= github.com/aws/aws-sdk-go-v2/service/ecr v1.40.1 h1:1C4JhM1VGzObZBPY0t+ymBRvF9BuCFToJ/j8RaW/9p8= github.com/aws/aws-sdk-go-v2/service/ecr v1.40.1/go.mod h1:XGy5jWOLlZK6nN3NWkoux6vN0QfaEIB91xbPCiX8gao= github.com/aws/aws-sdk-go-v2/service/eks v1.57.2 h1:Uxm6iUIEaRtyvcp8Gj45viJmM2KksMLNBRCd8DBxuJA= @@ -237,6 +239,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= @@ -258,10 +262,10 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= -golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= diff --git a/hack/run-e2e.sh b/hack/run-e2e.sh index 3542e6a6..960707b0 100755 --- a/hack/run-e2e.sh +++ b/hack/run-e2e.sh @@ -27,8 +27,9 @@ KUBERNETES_VERSION="${3?Please specify the Kubernetes version}" CNI="${4?Please specify the cni}" NODEADM_AMD_URL="${5?Please specify the nodeadm amd url}" NODEADM_ARM_URL="${6?Please specify the nodeadm arm url}" -LOGS_BUCKET="${7-}" -ENDPOINT="${8-}" +LOGS_BUCKET="${7-?Please specify the bucket for logs}" +ARTIFACTS_FOLDER="${8-?Please specify the folder for artifacts}" +ENDPOINT="${9-}" CONFIG_DIR="$REPO_ROOT/e2e-config" ARCH="$([ "x86_64" = "$(uname -m)" ] && echo amd64 || echo arm64)" @@ -65,6 +66,10 @@ trap "cleanup" EXIT $BIN_DIR/e2e-test setup -s $RESOURCES_YAML +mkdir -p e2e-reports +mkdir -p "$ARTIFACTS_FOLDER" +ARTIFACTS_FOLDER=$(realpath "$ARTIFACTS_FOLDER") + cat < $CONFIG_DIR/e2e-param.yaml clusterName: "$CLUSTER_NAME" clusterRegion: "$REGION" @@ -72,6 +77,7 @@ nodeadmUrlAMD: "$NODEADM_AMD_URL" nodeadmUrlARM: "$NODEADM_ARM_URL" logsBucket: "$LOGS_BUCKET" endpoint: "$ENDPOINT" +artifactsFolder: "$ARTIFACTS_FOLDER" EOF @@ -79,8 +85,6 @@ SKIP_FILE=$REPO_ROOT/hack/SKIPPED_TESTS.yaml # Extract skipped_tests field from SKIP_FILE file and join entries with ' || ' skip=$(yq '.skipped_tests | join("|")' ${SKIP_FILE}) -mkdir -p e2e-reports - # We explicitly specify procs instead of letting ginkgo decide (with -p) because in if not # ginkgo will use all available CPUs, which could be a small number depending # on how the CI runner has been configured. However, even if only one CPU is available, diff --git a/hybrid-nodes-cdk/lib/nodeadm-stack.ts b/hybrid-nodes-cdk/lib/nodeadm-stack.ts index a78edcc8..5d5866f6 100644 --- a/hybrid-nodes-cdk/lib/nodeadm-stack.ts +++ b/hybrid-nodes-cdk/lib/nodeadm-stack.ts @@ -344,6 +344,7 @@ export class NodeadmBuildStack extends cdk.Stack { 'ec2:RebootInstances', 'ec2:StopInstances', 'ec2:TerminateInstances', + 'ec2-instance-connect:SendSerialConsoleSSHPublicKey', ], resources: ['*'], effect: iam.Effect.ALLOW, diff --git a/test/e2e/ec2/instance.go b/test/e2e/ec2/instance.go index 7f892b5f..3b089214 100644 --- a/test/e2e/ec2/instance.go +++ b/test/e2e/ec2/instance.go @@ -158,7 +158,7 @@ func DeleteEC2Instance(ctx context.Context, client *ec2.Client, instanceID strin func WaitForEC2InstanceRunning(ctx context.Context, ec2Client *ec2.Client, instanceID string) error { waiter := ec2.NewInstanceRunningWaiter(ec2Client, func(isowo *ec2.InstanceRunningWaiterOptions) { - isowo.LogWaitAttempts = true + isowo.MinDelay = 1 * time.Second }) return waiter.Wait(ctx, &ec2.DescribeInstancesInput{InstanceIds: []string{instanceID}}, nodeRunningTimeout) } diff --git a/test/e2e/kubernetes/verify.go b/test/e2e/kubernetes/verify.go index 2673682b..c5c3f16a 100644 --- a/test/e2e/kubernetes/verify.go +++ b/test/e2e/kubernetes/verify.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" clientgo "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -24,32 +25,43 @@ type VerifyNode struct { NodeIPAddress string } -func (t VerifyNode) Run(ctx context.Context) error { +func (v VerifyNode) WaitForNodeReady(ctx context.Context) (*corev1.Node, error) { // get the hybrid node registered using nodeadm by the internal IP of an EC2 Instance - node, err := WaitForNode(ctx, t.K8s, t.NodeIPAddress, t.Logger) + node, err := WaitForNode(ctx, v.K8s, v.NodeIPAddress, v.Logger) if err != nil { - return err + return nil, err } if node == nil { - return fmt.Errorf("returned node is nil") + return nil, fmt.Errorf("returned node is nil") } nodeName := node.Name - t.Logger.Info("Waiting for hybrid node to be ready...") - if err = WaitForHybridNodeToBeReady(ctx, t.K8s, nodeName, t.Logger); err != nil { + v.Logger.Info("Waiting for hybrid node to be ready...") + if err = WaitForHybridNodeToBeReady(ctx, v.K8s, nodeName, v.Logger); err != nil { + return nil, err + } + + return node, nil +} + +func (v VerifyNode) Run(ctx context.Context) error { + node, err := v.WaitForNodeReady(ctx) + if err != nil { return err } - t.Logger.Info("Creating a test pod on the hybrid node...") + nodeName := node.Name + + v.Logger.Info("Creating a test pod on the hybrid node...") podName := GetNginxPodName(nodeName) - if err = CreateNginxPodInNode(ctx, t.K8s, nodeName, testPodNamespace, t.Region, t.Logger); err != nil { + if err = CreateNginxPodInNode(ctx, v.K8s, nodeName, testPodNamespace, v.Region, v.Logger); err != nil { return err } - t.Logger.Info(fmt.Sprintf("Pod %s created and running on node %s", podName, nodeName)) + v.Logger.Info(fmt.Sprintf("Pod %s created and running on node %s", podName, nodeName)) - t.Logger.Info("Exec-ing nginx -version", "pod", podName) - stdout, stderr, err := ExecPodWithRetries(ctx, t.ClientConfig, t.K8s, podName, testPodNamespace, "/sbin/nginx", "-version") + v.Logger.Info("Exec-ing nginx -version", "pod", podName) + stdout, stderr, err := ExecPodWithRetries(ctx, v.ClientConfig, v.K8s, podName, testPodNamespace, "/sbin/nginx", "-version") if err != nil { return err } @@ -59,23 +71,23 @@ func (t VerifyNode) Run(ctx context.Context) error { if stderr != "" { return fmt.Errorf("pod exec stderr should be empty %s", stderr) } - t.Logger.Info("Successfully exec'd nginx -version", "pod", podName) + v.Logger.Info("Successfully exec'd nginx -version", "pod", podName) - t.Logger.Info("Checking logs for nginx output", "pod", podName) - logs, err := GetPodLogsWithRetries(ctx, t.K8s, podName, testPodNamespace) + v.Logger.Info("Checking logs for nginx output", "pod", podName) + logs, err := GetPodLogsWithRetries(ctx, v.K8s, podName, testPodNamespace) if err != nil { return err } if !strings.Contains(logs, "nginx") { return fmt.Errorf("pod log does not contain expected value %s: %s", logs, "nginx") } - t.Logger.Info("Successfully validated log output", "pod", podName) + v.Logger.Info("Successfully validated log output", "pod", podName) - t.Logger.Info("Deleting test pod", "pod", podName) - if err = DeletePod(ctx, t.K8s, podName, testPodNamespace); err != nil { + v.Logger.Info("Deleting test pod", "pod", podName) + if err = DeletePod(ctx, v.K8s, podName, testPodNamespace); err != nil { return err } - t.Logger.Info("Pod deleted successfully", "pod", podName) + v.Logger.Info("Pod deleted successfully", "pod", podName) return nil } diff --git a/test/e2e/logger.go b/test/e2e/logger.go index 225839e1..2a0836ae 100644 --- a/test/e2e/logger.go +++ b/test/e2e/logger.go @@ -1,7 +1,10 @@ package e2e import ( + "bytes" + "io" "os" + "sync" "github.com/go-logr/logr" "github.com/go-logr/zapr" @@ -11,10 +14,14 @@ import ( type LoggerConfig struct { NoColor bool + syncer zapcore.WriteSyncer } func (c LoggerConfig) Apply(opts *LoggerConfig) { opts.NoColor = c.NoColor + if c.syncer != nil { + opts.syncer = c.syncer + } } type LoggerOption interface { @@ -27,13 +34,122 @@ func NewLogger(opts ...LoggerOption) logr.Logger { opt.Apply(cfg) } + if cfg.syncer == nil { + cfg.syncer = zapcore.AddSync(os.Stdout) + } + encoderCfg := zap.NewDevelopmentEncoderConfig() if !cfg.NoColor { encoderCfg.EncodeLevel = zapcore.CapitalColorLevelEncoder } consoleEncoder := zapcore.NewConsoleEncoder(encoderCfg) - core := zapcore.NewCore(consoleEncoder, zapcore.AddSync(os.Stdout), zapcore.DebugLevel) + core := zapcore.NewCore(consoleEncoder, cfg.syncer, zapcore.DebugLevel) log := zap.New(core) + return zapr.NewLogger(log) } + +// NewPausableLogger returns a logger that can be paused and resumed. +func NewPausableLogger(opts ...LoggerOption) PausableLogger { + cfg := &LoggerConfig{} + for _, opt := range opts { + opt.Apply(cfg) + } + + activeSyncer := zapcore.AddSync(os.Stdout) + syncer := newSwitchSyncer(activeSyncer) + cfg.syncer = syncer + + return PausableLogger{ + Logger: NewLogger(cfg), + syncer: syncer, + } +} + +// PausableLogger can be paused and resumed. It wraps a logr.Logger. +// When paused, writes go to a buffer; when resumed, writes go to stdout. +// After it's resumed, the buffer is flushed to stdout. +type PausableLogger struct { + logr.Logger + syncer *switchSyncer +} + +func (l PausableLogger) Pause() { + l.syncer.Pause() +} + +func (l PausableLogger) Resume() error { + return l.syncer.Resume() +} + +// switchSyncer implements zapcore.WriteSyncer. +// When paused, writes go to a buffer; when resumed, writes go to the active writer. +type switchSyncer struct { + *SwitchWriter + active zapcore.WriteSyncer // normally stdout +} + +func newSwitchSyncer(active zapcore.WriteSyncer) *switchSyncer { + return &switchSyncer{ + active: active, + SwitchWriter: NewSwitchWriter(active), + } +} + +func (s *switchSyncer) Sync() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.paused { + return nil + } + return s.active.Sync() +} + +// SwitchWriter implements io.Writer. +// When paused, writes go to a buffer; when resumed, writes go to the active writer. +type SwitchWriter struct { + mu sync.Mutex + active io.Writer // actual writer where we want to output + buffer *bytes.Buffer // holds bytes while paused + paused bool +} + +var _ io.Writer = &SwitchWriter{} + +func NewSwitchWriter(active io.Writer) *SwitchWriter { + return &SwitchWriter{ + active: active, + buffer: &bytes.Buffer{}, + } +} + +func (s *SwitchWriter) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.paused { + return s.buffer.Write(p) + } + return s.active.Write(p) +} + +// Pause causes subsequent writes to be buffered. +func (s *SwitchWriter) Pause() { + s.mu.Lock() + s.paused = true + s.mu.Unlock() +} + +// Resume flushes the buffer to the active writer and resumes normal output. +func (s *SwitchWriter) Resume() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.buffer.Len() > 0 { + if _, err := s.active.Write(s.buffer.Bytes()); err != nil { + return err + } + s.buffer.Reset() + } + s.paused = false + return nil +} diff --git a/test/e2e/peered/node.go b/test/e2e/peered/node.go index 69f951e0..271a70f7 100644 --- a/test/e2e/peered/node.go +++ b/test/e2e/peered/node.go @@ -2,14 +2,20 @@ package peered import ( "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/pem" "fmt" "time" "github.com/aws/aws-sdk-go-v2/aws" ec2sdk "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2instanceconnect" s3sdk "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/ssm" "github.com/go-logr/logr" + gssh "golang.org/x/crypto/ssh" clientgo "k8s.io/client-go/kubernetes" "sigs.k8s.io/yaml" @@ -20,6 +26,7 @@ import ( "github.com/aws/eks-hybrid/test/e2e/nodeadm" "github.com/aws/eks-hybrid/test/e2e/os" "github.com/aws/eks-hybrid/test/e2e/s3" + "github.com/aws/eks-hybrid/test/e2e/ssh" ) const ( @@ -130,11 +137,70 @@ func (c Node) Create(ctx context.Context, spec *NodeSpec) (ec2.Instance, error) if err != nil { return ec2.Instance{}, fmt.Errorf("EC2 Instance should have been created successfully: %w", err) } - c.Logger.Info(fmt.Sprintf("EC2 Instance Connect: https://%s.console.aws.amazon.com/ec2-instance-connect/ssh?connType=serial&instanceId=%s®ion=%s&serialPort=0", c.Cluster.Region, instance.ID, c.Cluster.Region)) return instance, nil } +// SerialConsole returns the serial console for the given instance. +func (c *Node) SerialConsole(ctx context.Context, instanceId string) (*ssh.SerialConsole, error) { + privateKey, publicKey, err := generateKeyPair() + if err != nil { + return nil, fmt.Errorf("generating keypair: %w", err) + } + + signer, err := gssh.ParsePrivateKey(privateKey) + if err != nil { + return nil, fmt.Errorf("parsing private key: %w", err) + } + + config := &ssh.ClientConfig{ + User: instanceId + ".port0", + Auth: []gssh.AuthMethod{ + gssh.PublicKeys(signer), + }, + HostKeyCallback: gssh.InsecureIgnoreHostKey(), + } + + // node needs to be passed pending state to send the serial public key + // the sooner this completes, the more of the initial boot log we will get + err = ec2.WaitForEC2InstanceRunning(ctx, c.EC2, instanceId) + if err != nil { + return nil, fmt.Errorf("waiting on instance running: %w", err) + } + + client := ec2instanceconnect.NewFromConfig(c.AWS) + _, err = client.SendSerialConsoleSSHPublicKey(ctx, &ec2instanceconnect.SendSerialConsoleSSHPublicKeyInput{ + InstanceId: aws.String(instanceId), + SSHPublicKey: aws.String(string(publicKey)), + }) + if err != nil { + return nil, fmt.Errorf("adding ssh key via instance connect: %w", err) + } + + return ssh.NewSerialConsole("tcp", "serial-console.ec2-instance-connect."+c.AWS.Region+".aws:22", config), nil +} + +func generateKeyPair() ([]byte, []byte, error) { + var empty []byte + privateKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return empty, empty, fmt.Errorf("generating private key: %w", err) + } + + privateKeyPEM := &pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(privateKey), + } + + // Generate the corresponding public key + publicKey, err := gssh.NewPublicKey(&privateKey.PublicKey) + if err != nil { + return empty, empty, fmt.Errorf("generating public key: %w", err) + } + + return pem.EncodeToMemory(privateKeyPEM), gssh.MarshalAuthorizedKey(publicKey), nil +} + // Cleanup collects logs and deletes the EC2 instance and Node object. func (c *Node) Cleanup(ctx context.Context, instance ec2.Instance) error { logCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) diff --git a/test/e2e/peered/serial.go b/test/e2e/peered/serial.go new file mode 100644 index 00000000..6535224e --- /dev/null +++ b/test/e2e/peered/serial.go @@ -0,0 +1,131 @@ +package peered + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/go-logr/logr" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + "github.com/aws/eks-hybrid/test/e2e" + "github.com/aws/eks-hybrid/test/e2e/ec2" + "github.com/aws/eks-hybrid/test/e2e/ssh" +) + +// SerialOutputBlock is a helper to run sections of a ginkgo test while outputting the serial console output of an instance. +// It paused the main test logs while streaming the serial console output, and resumes them once the test "body" is done. +// The serial console output is also saved to a file until Close is called, no matter if you are running a test block or not. +// This is very useful to help debugging issues with the node joining the cluster, specially if the process runs as part of the node initialization. +type SerialOutputBlock struct { + serial *ssh.SerialConsole + logsFile io.WriteCloser + serialOutout *e2e.SwitchWriter + testLogger e2e.PausableLogger +} + +type SerialOutputConfig struct { + PeeredNode *Node + Instance ec2.Instance + TestLogger e2e.PausableLogger + OutputFolder string +} + +func NewSerialOutputBlock(ctx context.Context, config *SerialOutputConfig) (*SerialOutputBlock, error) { + serial, err := config.PeeredNode.SerialConsole(ctx, config.Instance.ID) + if err != nil { + return nil, fmt.Errorf("preparing EC2 for serial connection: %w", err) + } + + pausableOutput := e2e.NewSwitchWriter(os.Stdout) + pausableOutput.Pause() // We start it paused, we will resume it once the test output is paused + outputFilePath := filepath.Join(config.OutputFolder, config.Instance.Name+"-serial-output.log") + file, err := os.Create(outputFilePath) + if err != nil { + return nil, fmt.Errorf("creating file to store serial output: %w", err) + } + + config.TestLogger.Info("Writing serial console output to disk", "file", outputFilePath) + + if err := serial.Copy(io.MultiWriter(pausableOutput, file)); err != nil { + return nil, fmt.Errorf("connecting to EC2 serial console: %w", err) + } + + return &SerialOutputBlock{ + serial: serial, + logsFile: file, + testLogger: config.TestLogger, + serialOutout: pausableOutput, + }, nil +} + +// It runs the test body while streaming the serial console output of the instance to stdout. +// It pauses the main test logs while streaming the serial console output, and resumes them once the test "body" is done. +// This actually doesn't create a ginkgo node, it just uses By to print the description and help distinguish this +// test block in the logs. +func (b *SerialOutputBlock) It(description string, body func()) { + // This ensures that test logs are restored even if this method panics + // Both Paused and Resume are idempotent + defer func() { + b.serialOutout.Pause() + gomega.Expect(b.testLogger.Resume()).To(gomega.Succeed()) + }() + + ginkgo.By(description, func() { + b.testLogger.Info( + fmt.Sprintf("Streaming Node serial output while the node %s. Test logs are paused in the meantime and will resume later.", description), + ) + b.testLogger.Pause() + // hack: this prints the resume message immediately after resuming the test logs + // and more importantly, before any logs produced by body() + b.testLogger.Info("Node serial output stopped and test logs resumed") + fmt.Println("-------------- Serial output starts here --------------") + gomega.Expect(b.serialOutout.Resume()).To(gomega.Succeed()) + body() + b.serialOutout.Pause() + fmt.Println("-------------- Serial output ends here --------------") + gomega.Expect(b.testLogger.Resume()).To(gomega.Succeed()) + }) +} + +func (b *SerialOutputBlock) Close() { + b.serial.Close() + b.logsFile.Close() +} + +// serialOutoutBlockNoop is a no-op implementation of SerialOutputBlock. +// Useful as a fallback when serial console is not available. +// It allows to return the same shape as the real SerialOutputBlock, +// so the caller doesn't need to check for nil and can freely use it as +// if it was the real thing. The only difference is that it won't actually +// stream the serial output and it will keep the test logs unpaused. +type serialOutoutBlockNoop struct { + testLogger logr.Logger +} + +func (b serialOutoutBlockNoop) It(description string, body func()) { + ginkgo.By(description, func() { + b.testLogger.Info("Serial console not available, skipping serial output stream and leaving test logs unpaused.") + body() + }) +} + +func (b serialOutoutBlockNoop) Close() {} + +type ItBlockCloser interface { + It(description string, body func()) + Close() +} + +// NewSerialOutputBlockBestEffort creates a SerialOutputBlock if the serial console is available, otherwise it returns a no-op implementation. +func NewSerialOutputBlockBestEffort(ctx context.Context, config *SerialOutputConfig) ItBlockCloser { + block, err := NewSerialOutputBlock(ctx, config) + if err != nil { + return serialOutoutBlockNoop{config.TestLogger.Logger} + } + + return block +} diff --git a/test/e2e/ssh/serial.go b/test/e2e/ssh/serial.go new file mode 100644 index 00000000..b5870d51 --- /dev/null +++ b/test/e2e/ssh/serial.go @@ -0,0 +1,134 @@ +package ssh + +import ( + "errors" + "fmt" + "io" + "math" + "syscall" + "time" + + "golang.org/x/crypto/ssh" +) + +// SerialConsole allows to get the serial console of a machine via SSH. +type SerialConsole struct { + network, addr string + config *ClientConfig + + client *ssh.Client + session *ssh.Session +} + +type ClientConfig = ssh.ClientConfig // define alias to avoid package name conflict in consumers + +func NewSerialConsole(network, addr string, config *ClientConfig) *SerialConsole { + return &SerialConsole{ + network: network, + addr: addr, + config: config, + } +} + +// Copy starts a new SSH session and copies the serial output stdout and stderr to dst. +// It starts go routines so it won't block. The caller should call Close when done. +func (o *SerialConsole) Copy(dst io.Writer) error { + var err error + o.client, err = dialSSH(o.network, o.addr, o.config) + if err != nil { + return fmt.Errorf("connecting to serial console: %w", err) + } + + o.session, err = o.client.NewSession() + if err != nil { + return fmt.Errorf("creating ssh session: %w", err) + } + + modes := ssh.TerminalModes{ + ssh.ECHO: 1, // enable echoing + ssh.TTY_OP_ISPEED: 14400, // input speed = 14.4kbaud + ssh.TTY_OP_OSPEED: 14400, // output speed = 14.4kbaud + } + + if err := o.session.RequestPty("xterm", 80, 40, modes); err != nil { + return fmt.Errorf("requesting pty: %w", err) + } + + stdout, err := o.session.StdoutPipe() + if err != nil { + return fmt.Errorf("opening stdout: %w", err) + } + + stderr, err := o.session.StderrPipe() + if err != nil { + return fmt.Errorf("opening stderr: %w", err) + } + + stdin, err := o.session.StdinPipe() + if err != nil { + return fmt.Errorf("opening stdin: %w", err) + } + + err = o.session.Shell() + if err != nil { + return fmt.Errorf("opening shell: %w", err) + } + + // sending a newline to "start" the output collection + // since we aren't running a new command just connecting + // to the serial console to capture output from the boot/init processes + _, err = stdin.Write([]byte("\n")) + if err != nil { + return fmt.Errorf("writing to stdin: %w", err) + } + + go func() { + io.Copy(dst, stdout) + }() + go func() { + io.Copy(dst, stderr) + }() + + return nil +} + +func (o *SerialConsole) Close() error { + if o.session != nil { + if err := o.session.Close(); err != nil { + return fmt.Errorf("closing ssh session: %w", err) + } + } + if o.client != nil { + if err := o.client.Close(); err != nil { + return fmt.Errorf("closing ssh client: %w", err) + } + } + + return nil +} + +const ( + maxRetries = 5 + backoffTime = 1 * time.Second +) + +func dialSSH(network, addr string, config *ClientConfig) (*ssh.Client, error) { + var err error + for retry := range maxRetries { + var client *ssh.Client + client, err = ssh.Dial(network, addr, config) + if err == nil { + return client, nil + } + + // We only retry on connection reset errors + if !errors.Is(err, syscall.ECONNRESET) { + return nil, err + } + + // Exponential backoff + time.Sleep(backoffTime * time.Duration(math.Floor(math.Pow(2, float64(retry))))) + } + + return nil, fmt.Errorf("dialing SSH to serial console reached max amount of retries: %w", err) +} diff --git a/test/e2e/suite/nodeadm_test.go b/test/e2e/suite/nodeadm_test.go index 89ceab6c..6c2c9b9b 100644 --- a/test/e2e/suite/nodeadm_test.go +++ b/test/e2e/suite/nodeadm_test.go @@ -57,6 +57,8 @@ type TestConfig struct { NodeK8sVersion string `yaml:"nodeK8SVersion"` LogsBucket string `yaml:"logsBucket"` Endpoint string `yaml:"endpoint"` + // ArtifactsFolder is the local path where the test will store the artifacts. + ArtifactsFolder string `yaml:"artifactsFolder"` } type suiteConfiguration struct { @@ -90,8 +92,10 @@ type peeredVPCTest struct { s3Client *s3v2.Client iamClient *iam.Client - logger logr.Logger - logsBucket string + logger logr.Logger + loggerControl e2e.PausableLogger + logsBucket string + artifactsPath string cluster *peered.HybridCluster stackOut *credentials.StackOutput @@ -118,7 +122,7 @@ var _ = SynchronizedBeforeSuite( config, err := readTestConfig(filePath) Expect(err).NotTo(HaveOccurred(), "should read valid test configuration") - logger := newLoggerForTests() + logger := newLoggerForTests().Logger aws, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(config.ClusterRegion)) Expect(err).NotTo(HaveOccurred()) @@ -231,19 +235,19 @@ var _ = Describe("Hybrid Nodes", func() { }) When("using ec2 instance as hybrid nodes", func() { - for _, os := range osList { + for _, nodeOS := range osList { providerLoop: for _, provider := range credentialProviders { - if notSupported.matches(os.Name(), provider.Name()) { + if notSupported.matches(nodeOS.Name(), provider.Name()) { continue providerLoop } DescribeTable("Joining a node", - func(ctx context.Context, os e2e.NodeadmOS, provider e2e.NodeadmCredentialsProvider) { - Expect(os).NotTo(BeNil()) + func(ctx context.Context, nodeOS e2e.NodeadmOS, provider e2e.NodeadmCredentialsProvider) { + Expect(nodeOS).NotTo(BeNil()) Expect(provider).NotTo(BeNil()) - instanceName := test.instanceName("init", os, provider) + instanceName := test.instanceName("init", nodeOS, provider) k8sVersion := test.cluster.KubernetesVersion if test.overrideNodeK8sVersion != "" { @@ -255,7 +259,7 @@ var _ = Describe("Hybrid Nodes", func() { InstanceName: instanceName, NodeK8sVersion: k8sVersion, NodeNamePrefix: "simpleflow", - OS: os, + OS: nodeOS, Provider: provider, }) Expect(err).NotTo(HaveOccurred(), "EC2 Instance should have been created successfully") @@ -263,14 +267,29 @@ var _ = Describe("Hybrid Nodes", func() { Expect(peeredNode.Cleanup(ctx, instance)).To(Succeed()) }, NodeTimeout(deferCleanupTimeout)) - test.logger.Info("Waiting for EC2 Instance to be Running...") - Expect(ec2.WaitForEC2InstanceRunning(ctx, test.ec2Client, instance.ID)).To(Succeed(), "EC2 Instance should have been reached Running status") - verifyNode := test.newVerifyNode(instance.IP) - Expect(verifyNode.Run(ctx)).To( - Succeed(), "node should have joined the cluster successfully"+ - ". You can access the collected node logs at: %s", peeredNode.S3LogsURL(instance.Name), - ) + + serialOutput := peered.NewSerialOutputBlockBestEffort(ctx, &peered.SerialOutputConfig{ + PeeredNode: peeredNode, + Instance: instance, + TestLogger: test.loggerControl, + OutputFolder: test.artifactsPath, + }) + Expect(err).NotTo(HaveOccurred(), "should prepare serial output") + DeferCleanup(func() { + serialOutput.Close() + }) + + serialOutput.It("joins the cluster", func() { + test.logger.Info("Waiting for EC2 Instance to be Running...") + Expect(ec2.WaitForEC2InstanceRunning(ctx, test.ec2Client, instance.ID)).To(Succeed(), "EC2 Instance should have been reached Running status") + Expect(verifyNode.WaitForNodeReady(ctx)).Error().To( + Succeed(), "node should have joined the cluster successfully"+ + ". You can access the collected node logs at: %s", peeredNode.S3LogsURL(instance.Name), + ) + }) + + Expect(verifyNode.Run(ctx)).To(Succeed(), "node should be fully functional") test.logger.Info("Testing Pod Identity add-on functionality") verifyPodIdentityAddon := test.newVerifyPodIdentityAddon() @@ -284,10 +303,14 @@ var _ = Describe("Hybrid Nodes", func() { Expect(nodeadm.RebootInstance(ctx, test.remoteCommandRunner, instance.IP)).NotTo(HaveOccurred(), "EC2 Instance should have rebooted successfully") test.logger.Info("EC2 Instance rebooted successfully.") - Expect(verifyNode.Run(ctx)).To(Succeed(), - "node should have re-joined, there must be a problem with uninstall"+ - ". You can access the collected node logs at: %s", peeredNode.S3LogsURL(instance.Name), - ) + serialOutput.It("re-joins the cluster after reboot", func() { + Expect(verifyNode.WaitForNodeReady(ctx)).Error().To(Succeed(), + "node should have re-joined, there must be a problem with uninstall"+ + ". You can access the collected node logs at: %s", peeredNode.S3LogsURL(instance.Name), + ) + }) + + Expect(verifyNode.Run(ctx)).To(Succeed(), "node should be fully functional") if test.skipCleanup { test.logger.Info("Skipping nodeadm uninstall from the hybrid node...") @@ -296,7 +319,7 @@ var _ = Describe("Hybrid Nodes", func() { Expect(cleanNode.Run(ctx)).To(Succeed(), "node should have been reset successfully") }, - Entry(fmt.Sprintf("With OS %s and with Credential Provider %s", os.Name(), string(provider.Name())), os, provider, Label(os.Name(), string(provider.Name()), "simpleflow", "init")), + Entry(fmt.Sprintf("With OS %s and with Credential Provider %s", nodeOS.Name(), string(provider.Name())), nodeOS, provider, Label(nodeOS.Name(), string(provider.Name()), "simpleflow", "init")), ) DescribeTable("Upgrade nodeadm flow", @@ -330,10 +353,28 @@ var _ = Describe("Hybrid Nodes", func() { }, NodeTimeout(deferCleanupTimeout)) verifyNode := test.newVerifyNode(instance.IP) - Expect(verifyNode.Run(ctx)).To( - Succeed(), "node should have joined the cluster successfully"+ - ". You can access the collected node logs at: %s", peeredNode.S3LogsURL(instance.Name), - ) + + serialOutput := peered.NewSerialOutputBlockBestEffort(ctx, &peered.SerialOutputConfig{ + PeeredNode: peeredNode, + Instance: instance, + TestLogger: test.loggerControl, + OutputFolder: test.artifactsPath, + }) + Expect(err).NotTo(HaveOccurred(), "should prepare serial output") + DeferCleanup(func() { + serialOutput.Close() + }) + + serialOutput.It("joins the cluster", func() { + test.logger.Info("Waiting for EC2 Instance to be Running...") + Expect(ec2.WaitForEC2InstanceRunning(ctx, test.ec2Client, instance.ID)).To(Succeed(), "EC2 Instance should have been reached Running status") + Expect(verifyNode.WaitForNodeReady(ctx)).Error().To( + Succeed(), "node should have joined the cluster successfully"+ + ". You can access the collected node logs at: %s", peeredNode.S3LogsURL(instance.Name), + ) + }) + + Expect(verifyNode.Run(ctx)).To(Succeed(), "node should be fully functional") Expect(test.newUpgradeNode(instance.IP).Run(ctx)).To( Succeed(), "node should have upgraded successfully"+ @@ -347,7 +388,7 @@ var _ = Describe("Hybrid Nodes", func() { Succeed(), "node should have been reset successfully", ) }, - Entry(fmt.Sprintf("With OS %s and with Credential Provider %s", os.Name(), string(provider.Name())), os, provider, Label(os.Name(), string(provider.Name()), "upgradeflow")), + Entry(fmt.Sprintf("With OS %s and with Credential Provider %s", nodeOS.Name(), string(provider.Name())), nodeOS, provider, Label(nodeOS.Name(), string(provider.Name()), "upgradeflow")), ) } } @@ -367,14 +408,21 @@ func readTestConfig(configPath string) (*TestConfig, error) { return nil, fmt.Errorf("unmarshaling test configuration: %w", err) } + if config.ArtifactsFolder == "" { + config.ArtifactsFolder = "/tmp" + } + return config, nil } func buildPeeredVPCTestForSuite(ctx context.Context, suite *suiteConfiguration) (*peeredVPCTest, error) { + pausableLogger := newLoggerForTests() test := &peeredVPCTest{ stackOut: suite.CredentialsStackOutput, - logger: newLoggerForTests(), + logger: pausableLogger.Logger, + loggerControl: pausableLogger, logsBucket: suite.TestConfig.LogsBucket, + artifactsPath: suite.TestConfig.ArtifactsFolder, overrideNodeK8sVersion: suite.TestConfig.NodeK8sVersion, publicKey: suite.PublicKey, setRootPassword: suite.TestConfig.SetRootPassword, @@ -388,7 +436,7 @@ func buildPeeredVPCTestForSuite(ctx context.Context, suite *suiteConfiguration) test.aws = aws test.eksClient = eks.NewFromConfig(aws) - test.ec2Client = ec2v2.NewFromConfig(aws) // TODO: move everything else to ec2 sdk v2 + test.ec2Client = ec2v2.NewFromConfig(aws) test.ssmClient = ssmv2.NewFromConfig(aws) test.s3Client = s3v2.NewFromConfig(aws) test.cfnClient = cloudformation.NewFromConfig(aws) @@ -501,11 +549,11 @@ func (t *peeredVPCTest) newVerifyPodIdentityAddon() *addon.VerifyPodIdentityAddo } } -func newLoggerForTests() logr.Logger { +func newLoggerForTests() e2e.PausableLogger { _, reporter := GinkgoConfiguration() cfg := e2e.LoggerConfig{} if reporter.NoColor { cfg.NoColor = true } - return e2e.NewLogger(cfg) + return e2e.NewPausableLogger(cfg) }