diff --git a/internal/client/redis.go b/internal/client/redis.go index 5672bf5c..43cd1a68 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "net" "strconv" + "strings" "time" "RedisShake/internal/client/proto" @@ -17,6 +18,8 @@ type Redis struct { writer *bufio.Writer protoReader *proto.Reader protoWriter *proto.Writer + + ReplId string } func NewRedisClient(address string, username string, password string, Tls bool) *Redis { @@ -53,6 +56,21 @@ func NewRedisClient(address string, username string, password string, Tls bool) } } + // replId + r.ReplId = func() string { + reply := r.DoWithStringReply("info", "replication") + replyIdField := "master_replid" + idx1 := strings.Index(reply, replyIdField) + if idx1 < 0 { + log.Panicf("can not found replid with reply: %s", reply) + } + idx2 := strings.IndexByte(reply[idx1:], '\r') + if idx2 < 0 { + log.Panicf("can not found replid with reply: %s", reply) + } + return reply[idx1 + len(replyIdField) + 1 : idx1 + idx2] + }() + // ping to test connection reply := r.DoWithStringReply("ping") if reply != "PONG" { diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index b78c5639..def3d5a4 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -1,10 +1,11 @@ package reader import ( - "context" "bufio" + "context" "fmt" "io" + "io/fs" "os" "path/filepath" "strconv" @@ -82,11 +83,11 @@ func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader { r.opts = opts r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) r.rd = r.client.BufioReader() - r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) + r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) + "_" + r.client.ReplId r.stat.Address = opts.Address r.stat.Status = kHandShake r.stat.Dir = utils.GetAbsPath(r.stat.Name) - utils.CreateEmptyDir(r.stat.Dir) + r.stat.AofReceivedOffset = readLastReplOffset(r.stat.Dir) return r } @@ -95,20 +96,26 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry r.ch = make(chan *entry.Entry, 1024) go func() { r.sendReplconfListenPort() - r.sendPSync() + fullReSync := r.sendPSync() go r.sendReplconfAck() // start sent replconf ack - rdbFilePath := r.receiveRDB() - startOffset := r.stat.AofReceivedOffset - go r.receiveAOF(r.rd) - if r.opts.SyncRdb { + if fullReSync { + // empty out of date file before full sync + utils.CreateEmptyDir(r.stat.Dir) + rdbFilePath := r.receiveRDB() r.sendRDB(rdbFilePath) } + + // create aof file first + aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset) + go r.receiveAOF(r.rd, aofWriter) if r.opts.SyncAof { r.stat.Status = kSyncAof - r.sendAOF(startOffset) + r.sendAOF(r.stat.AofReceivedOffset) } - close(r.ch) r.client.Close() + aofWriter.Close() + // must be closed last so that other resources can be released + close(r.ch) }() return r.ch @@ -124,9 +131,15 @@ func (r *syncStandaloneReader) sendReplconfListenPort() { } } -func (r *syncStandaloneReader) sendPSync() { +// the return indicate whether full sync +func (r *syncStandaloneReader) sendPSync() bool { // send PSync - argv := []string{"PSYNC", "?", "-1"} + var argv []string + if r.opts.SyncRdb || r.stat.AofReceivedOffset <= 0 { + argv = []string{"PSYNC", "?", "-1"} + } else { + argv = []string{"PSYNC", r.client.ReplId, strconv.FormatInt(r.stat.AofReceivedOffset, 10)} + } if config.Opt.Advanced.AwsPSync != "" { argv = []string{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} } @@ -142,12 +155,27 @@ func (r *syncStandaloneReader) sendPSync() { break } } + reply := r.client.ReceiveString() + if reply == "CONTINUE" { + log.Infof("increment sync start at last offset: %d", r.stat.AofReceivedOffset) + b, err := r.rd.ReadByte() + if err != nil { + log.Panicf(err.Error()) + } + if b != '\n' { + log.Panicf("unexpected data:%s", string(b)) + } + return false + } + + // FULLRESYNC masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2]) if err != nil { log.Panicf(err.Error()) } r.stat.AofReceivedOffset = int64(masterOffset) + return true } func (r *syncStandaloneReader) receiveRDB() string { @@ -225,10 +253,8 @@ func (r *syncStandaloneReader) receiveRDB() string { return rdbFilePath } -func (r *syncStandaloneReader) receiveAOF(rd io.Reader) { +func (r *syncStandaloneReader) receiveAOF(rd io.Reader, aofWriter *rotate.AOFWriter) { log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name) - aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset) - defer aofWriter.Close() buf := make([]byte, 16*1024) // 16KB is enough for writing file for { select { @@ -344,3 +370,32 @@ func (r *syncStandaloneReader) StatusConsistent() bool { r.stat.AofReceivedOffset == r.stat.AofSentOffset && len(r.ch) == 0 } + +func readLastReplOffset(dir string) int64 { + var offset int64 = 0 + if !utils.IsExist(dir) { + return 0 + } + if err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + ext := filepath.Ext(path) + if !info.IsDir() && (ext == ".aof") { + baseOffset, err := strconv.ParseInt(strings.TrimSuffix(info.Name(), ext), 10, 64) + if err != nil { + log.Warnf("illegal file name of aof: %s", info.Name()) + return nil + } + if baseOffset + info.Size() > offset { + offset = baseOffset + info.Size() + } + } + return nil + }); err != nil { + log.Warnf("parse repl offset from aof file err: %s", err.Error()) + return 0 + } + log.Infof("read repl offset:%d for increment sync", offset) + return offset +} diff --git a/internal/utils/file_rotate/aof_reader.go b/internal/utils/file_rotate/aof_reader.go index ea48404a..d9528737 100644 --- a/internal/utils/file_rotate/aof_reader.go +++ b/internal/utils/file_rotate/aof_reader.go @@ -23,23 +23,23 @@ func NewAOFReader(name string, dir string, offset int64) *AOFReader { r := new(AOFReader) r.name = name r.dir = dir - r.openFile(offset) + r.offset = offset + r.openFile() return r } -func (r *AOFReader) openFile(offset int64) { +func (r *AOFReader) openFile() { r.filepath = fmt.Sprintf("%s/%d.aof", r.dir, r.offset) var err error r.file, err = os.OpenFile(r.filepath, os.O_RDONLY, 0644) if err != nil { log.Panicf(err.Error()) } - r.offset = offset r.pos = 0 log.Debugf("[%s] open file for read. filename=[%s]", r.name, r.filepath) } -func (r *AOFReader) readNextFile(offset int64) { +func (r *AOFReader) readNextFile() { filepath := fmt.Sprintf("%s/%d.aof", r.dir, r.offset) if utils.IsExist(filepath) { r.Close() @@ -47,7 +47,7 @@ func (r *AOFReader) readNextFile(offset int64) { if err != nil { return } - r.openFile(offset) + r.openFile() } } @@ -55,7 +55,7 @@ func (r *AOFReader) Read(buf []byte) (n int, err error) { n, err = r.file.Read(buf) for err == io.EOF { if r.filepath != fmt.Sprintf("%s/%d.aof", r.dir, r.offset) { - r.readNextFile(r.offset) + r.readNextFile() } time.Sleep(time.Millisecond * 10) _, err = r.file.Seek(0, 1) diff --git a/internal/utils/file_rotate/aof_writer.go b/internal/utils/file_rotate/aof_writer.go index 17647eff..d975c877 100644 --- a/internal/utils/file_rotate/aof_writer.go +++ b/internal/utils/file_rotate/aof_writer.go @@ -23,18 +23,18 @@ func NewAOFWriter(name string, dir string, offset int64) *AOFWriter { w := new(AOFWriter) w.name = name w.dir = dir - w.openFile(offset) + w.offset = offset + w.openFile() return w } -func (w *AOFWriter) openFile(offset int64) { +func (w *AOFWriter) openFile() { w.filepath = fmt.Sprintf("%s/%d.aof", w.dir, w.offset) var err error w.file, err = os.OpenFile(w.filepath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { log.Panicf(err.Error()) } - w.offset = offset w.filesize = 0 log.Debugf("[%s] open file for write. filename=[%s]", w.name, w.filepath) } @@ -48,7 +48,7 @@ func (w *AOFWriter) Write(buf []byte) { w.filesize += int64(len(buf)) if w.filesize > MaxFileSize { w.Close() - w.openFile(w.offset) + w.openFile() } err = w.file.Sync() if err != nil {