Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 19, 2025
1 parent 2a2c0df commit 4172b92
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 147 deletions.
238 changes: 118 additions & 120 deletions tests/integrations/realcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"syscall"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

Expand All @@ -32,9 +33,47 @@ import (
type clusterSuite struct {
suite.Suite

clusterCnt int
suiteName string
ms bool
suiteName string
mode string
cluster *cluster
}

// SetupSuite will run before the tests in the suite are run.
func (s *clusterSuite) SetupSuite() {
re := s.Require()

dataDir := s.dataDir()
matches, err := filepath.Glob(dataDir)
re.NoError(err)

for _, match := range matches {
re.NoError(runCommand("rm", "-rf", match))
}

s.cluster = newCluster(re, s.tag(), dataDir, s.mode)
s.cluster.start()
}

// TearDownSuite will run after all the tests in the suite have been run.
func (s *clusterSuite) TearDownSuite() {
// Even if the cluster deployment fails, we still need to destroy the cluster.
// If the cluster does not fail to deploy, the cluster will be destroyed in
// the cleanup function. And these code will not work.
s.cluster.stop()
}

func (s *clusterSuite) tag() string {
if s.mode == "ms" {
return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, time.Now().Unix())
}
return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, time.Now().Unix())
}

func (s *clusterSuite) dataDir() string {
if s.mode == "ms" {
return filepath.Join(os.Getenv("HOME"), ".tiup", "data", fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, time.Now().Unix()))
}
return filepath.Join(os.Getenv("HOME"), ".tiup", "data", fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, time.Now().Unix()))
}

var (
Expand All @@ -52,48 +91,6 @@ const (
deployTimeout = 5 * time.Minute
)

// ProcessManager is used to manage the processes of the cluster.
type ProcessManager struct {
tag string
pids []int
}

// NewProcessManager creates a new ProcessManager.
func NewProcessManager(tag string) *ProcessManager {
return &ProcessManager{tag: tag}
}

// CollectPids will collect the pids of the processes.
func (pm *ProcessManager) CollectPids() error {
output, err := runCommandWithOutput(fmt.Sprintf("pgrep -f %s", pm.tag))
if err != nil {
return fmt.Errorf("failed to collect pids: %v", err)
}

for _, pidStr := range strings.Split(strings.TrimSpace(output), "\n") {
if pid, err := strconv.Atoi(pidStr); err == nil {
pm.pids = append(pm.pids, pid)
}
}
return nil
}

// Cleanup will send SIGTERM to all the processes and wait for a while.
func (pm *ProcessManager) Cleanup() {
for _, pid := range pm.pids {
// First try SIGTERM
syscall.Kill(pid, syscall.SIGTERM)
}

// Wait and force kill if necessary
time.Sleep(3 * time.Second)
for _, pid := range pm.pids {
if isProcessRunning(pid) {
syscall.Kill(pid, syscall.SIGKILL)
}
}
}

func isProcessRunning(pid int) bool {
process, err := os.FindProcess(pid)
if err != nil {
Expand All @@ -103,68 +100,54 @@ func isProcessRunning(pid int) bool {
return err == nil
}

// SetupSuite will run before the tests in the suite are run.
func (s *clusterSuite) SetupSuite() {
re := s.Require()

// Clean the data dir. It is the default data dir of TiUP.
dataDir := filepath.Join(os.Getenv("HOME"), ".tiup", "data", "pd_real_cluster_test_"+s.suiteName+"_*")
matches, err := filepath.Glob(dataDir)
re.NoError(err)

for _, match := range matches {
re.NoError(runCommand("rm", "-rf", match))
}
s.startCluster()
type cluster struct {
re *require.Assertions
tag string
datadir string
mode string
pids []int
}

// TearDownSuite will run after all the tests in the suite have been run.
func (s *clusterSuite) TearDownSuite() {
// Even if the cluster deployment fails, we still need to destroy the cluster.
// If the cluster does not fail to deploy, the cluster will be destroyed in
// the cleanup function. And these code will not work.
s.clusterCnt++
s.stopCluster()
func newCluster(re *require.Assertions, tag, datadir, mode string) *cluster {
return &cluster{re: re, datadir: datadir, tag: tag, mode: mode}
}

func (s *clusterSuite) startCluster() {
log.Info("start to deploy a cluster", zap.Bool("ms", s.ms))
s.deploy()
s.waitReady()
s.clusterCnt++
func (c *cluster) start() {
log.Info("start to deploy a cluster", zap.String("mode", c.mode))
c.deploy()
c.waitReady()
}

func (s *clusterSuite) stopCluster() {
s.clusterCnt--
tag := s.tag()
func (c *cluster) restart() {
log.Info("start to restart", zap.String("tag", c.tag))
c.stop()
c.start()
log.Info("restart success")
}

pm := NewProcessManager(tag)
if err := pm.CollectPids(); err != nil {
func (c *cluster) stop() {
if err := c.collectPids(); err != nil {
log.Warn("failed to collect pids", zap.Error(err))
return
}

pm.Cleanup()
log.Info("cluster destroyed", zap.String("tag", tag))
}

func (s *clusterSuite) tag() string {
if s.ms {
return fmt.Sprintf("pd_real_cluster_test_ms_%s_%d", s.suiteName, s.clusterCnt)
for _, pid := range c.pids {
// First try SIGTERM
_ = syscall.Kill(pid, syscall.SIGTERM)
}
return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt)
}

func (s *clusterSuite) restart() {
tag := s.tag()
log.Info("start to restart", zap.String("tag", tag))
s.stopCluster()
s.startCluster()
log.Info("TiUP restart success")
// Wait and force kill if necessary
time.Sleep(3 * time.Second)
for _, pid := range c.pids {
if isProcessRunning(pid) {
_ = syscall.Kill(pid, syscall.SIGKILL)
}
}
log.Info("cluster destroyed", zap.String("tag", c.tag))
}

func (s *clusterSuite) deploy() {
re := s.Require()
func (c *cluster) deploy() {
re := c.re
curPath, err := os.Getwd()
re.NoError(err)
re.NoError(os.Chdir("../../.."))
Expand Down Expand Up @@ -192,10 +175,10 @@ func (s *clusterSuite) deploy() {
fmt.Sprintf("--db %d", defaultTiDBCount),
fmt.Sprintf("--pd %d", defaultPDCount),
"--without-monitor",
fmt.Sprintf("--tag %s", s.tag()),
fmt.Sprintf("--tag %s", c.tag),
}

if s.ms {
if c.mode == "ms" {
playgroundOpts = append(playgroundOpts,
"--pd.mode ms",
"--tso 1",
Expand All @@ -206,63 +189,78 @@ func (s *clusterSuite) deploy() {
cmd := fmt.Sprintf(`%s playground nightly %s %s > %s 2>&1 & `,
tiupBin,
strings.Join(playgroundOpts, " "),
buildBinPathsOpts(s.ms),
filepath.Join(playgroundLogDir, s.tag()+".log"),
buildBinPathsOpts(c.mode == "ms"),
filepath.Join(playgroundLogDir, c.tag+".log"),
)

runCommand("sh", "-c", cmd)
re.NoError(runCommand("sh", "-c", cmd))
}()

// Avoid to change the dir before execute `tiup playground`.
time.Sleep(10 * time.Second)
re.NoError(os.Chdir(curPath))
}

func buildBinPathsOpts(ms bool) string {
opts := []string{
"--pd.binpath ./bin/pd-server",
"--kv.binpath ./third_bin/tikv-server",
"--db.binpath ./third_bin/tidb-server",
"--tiflash.binpath ./third_bin/tiflash",
// collectPids will collect the pids of the processes.
func (c *cluster) collectPids() error {
output, err := runCommandWithOutput(fmt.Sprintf("pgrep -f %s", c.tag))
if err != nil {
return fmt.Errorf("failed to collect pids: %v", err)
}

if ms {
opts = append(opts,
"--tso.binpath ./bin/pd-server",
"--scheduling.binpath ./bin/pd-server",
)
for _, pidStr := range strings.Split(strings.TrimSpace(output), "\n") {
if pid, err := strconv.Atoi(pidStr); err == nil {
c.pids = append(c.pids, pid)
}
}

return strings.Join(opts, " ")
return nil
}

func (s *clusterSuite) waitReady() {
re := s.Require()
func (c *cluster) waitReady() {
re := c.re
const (
interval = 5
maxTimes = 20
)
log.Info("start to wait TiUP ready", zap.String("tag", s.tag()))
log.Info("start to wait TiUP ready", zap.String("tag", c.tag))
timeout := time.After(time.Duration(maxTimes*interval) * time.Second)
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer ticker.Stop()

for i := 0; i < maxTimes; i++ {
for i := range maxTimes {
select {
case <-timeout:
re.FailNowf("TiUP is not ready after timeout, tag: %s", s.tag())
re.FailNowf("TiUP is not ready after timeout, tag: %s", c.tag)
case <-ticker.C:
log.Info("check TiUP ready", zap.String("tag", s.tag()))
cmd := fmt.Sprintf(`%s playground display --tag %s`, tiupBin, s.tag())
log.Info("check TiUP ready", zap.String("tag", c.tag))
cmd := fmt.Sprintf(`%s playground display --tag %s`, tiupBin, c.tag)
output, err := runCommandWithOutput(cmd)
if err == nil {
log.Info("TiUP is ready", zap.String("tag", s.tag()))
log.Info("TiUP is ready", zap.String("tag", c.tag))
return
}
log.Info(output)
log.Info("TiUP is not ready, will retry", zap.Int("retry times", i),
zap.String("tag", s.tag()), zap.Error(err))
zap.String("tag", c.tag), zap.Error(err))
}
}
re.FailNowf("TiUP is not ready after max retries, tag: %s", s.tag())
re.FailNowf("TiUP is not ready after max retries, tag: %s", c.tag)
}

func buildBinPathsOpts(ms bool) string {
opts := []string{
"--pd.binpath ./bin/pd-server",
"--kv.binpath ./third_bin/tikv-server",
"--db.binpath ./third_bin/tidb-server",
"--tiflash.binpath ./third_bin/tiflash",
}

if ms {
opts = append(opts,
"--tso.binpath ./bin/pd-server",
"--scheduling.binpath ./bin/pd-server",
)
}

return strings.Join(opts, " ")
}
13 changes: 7 additions & 6 deletions tests/integrations/realcluster/cluster_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ func TestClusterID(t *testing.T) {
}

func (s *clusterIDSuite) TestClientClusterID() {
re := require.New(s.T())
re := s.Require()
ctx := context.Background()
// deploy second cluster
s.startCluster()
defer s.stopCluster()
cluster2 := newCluster(re, s.tag(), s.dataDir(), s.mode)
cluster2.start()
defer cluster2.stop()

pdEndpoints := getPDEndpoints(s.T())
pdEndpoints := getPDEndpoints(re)
// Try to create a client with the mixed endpoints.
_, err := pd.NewClientWithContext(
ctx, caller.TestComponent, pdEndpoints,
Expand All @@ -56,9 +57,9 @@ func (s *clusterIDSuite) TestClientClusterID() {
re.Contains(err.Error(), "unmatched cluster id")
}

func getPDEndpoints(t *testing.T) []string {
func getPDEndpoints(re *require.Assertions) []string {
output, err := runCommandWithOutput("ps -ef | grep tikv-server | awk -F '--pd-endpoints=' '{print $2}' | awk '{print $1}'")
require.NoError(t, err)
re.NoError(err)
var pdAddrs []string
for _, addr := range strings.Split(strings.TrimSpace(output), "\n") {
// length of addr is less than 5 means it must not be a valid address
Expand Down
Loading

0 comments on commit 4172b92

Please sign in to comment.