Skip to content

Commit

Permalink
add sequence support
Browse files Browse the repository at this point in the history
  • Loading branch information
Fengxq2014 committed Jan 20, 2024
1 parent 2db0e72 commit 4cd0ad0
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 60 deletions.
7 changes: 4 additions & 3 deletions build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ RUN apk add --no-cache shadow
#RUN useradd -m app
#USER app

WORKDIR /mars
RUN mkdir mars
WORKDIR /mars
ENV CFG=""
ENV PARAMS=""
ADD mars-amd64-upx /mars/
CMD echo $CFG > .env && ./mars-amd64-upx $PARAMS
ENV SEQ=""
ADD mars-amd64-upx /mars/mars-amd64-upx
CMD echo $CFG >/mars/.env && $SEQ >/mars/seq.conf && /mars/mars-amd64-upx $PARAMS
41 changes: 41 additions & 0 deletions mars/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,44 @@ func GetIDInfo(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write(bytes)
}

func GetSeq(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
if s, ok := m.seqMap[vars["id"]]; ok {
num := r.URL.Query().Get("num")
if num == "" {
next, err := s.Next()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}
fmt.Fprint(w, next)
return
}
atoi, err := strconv.Atoi(num)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "错误的参数内容%s", num)
return
}
if atoi > maxNum {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "请求数量:%d,超出最大限制数量%d", atoi, maxNum)
return
}
for i := 0; i < atoi; i++ {
next, err := s.Next()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err.Error())
return
}
fmt.Fprintln(w, next)
}

} else {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "sequence id "+vars["id"]+" not exist")
}
}
64 changes: 62 additions & 2 deletions mars/mars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package mars

import (
"context"
"encoding/json"
"fmt"
"github.com/Fengxq2014/mars/etcdsrv"
"github.com/Fengxq2014/mars/generator"
"github.com/Fengxq2014/mars/sequence"
"github.com/bsm/redeo"
"github.com/bsm/redeo/resp"
"github.com/go-redis/redis"
Expand Down Expand Up @@ -44,6 +46,7 @@ type mars struct {
redisPasswd string
httpName string
httpPasswd string
seqMap map[string]*sequence.Sequence
}

type MarsConfig struct {
Expand All @@ -64,7 +67,7 @@ var (
leaderKey = "mars/node/leader"
nodeKey = "mars/node"
workerKey = "mars/worker"
version = "1.2.0"
version = "1.3.0"
TxnFailed = errors.New("etcd txn failed")
once sync.Once
m *mars
Expand All @@ -77,7 +80,7 @@ func New(cfg *MarsConfig) *mars {
HttpAddr: ":8080",
TcpAddr: ":8089",
Log: log.New(),
EtcdEndPoints: "localhost:23790",
EtcdEndPoints: "localhost:2379",
HttpTimeOut: 10 * time.Second,
}
}
Expand All @@ -95,6 +98,7 @@ func New(cfg *MarsConfig) *mars {
r.HandleFunc("/id", GetID).Methods("GET")
r.HandleFunc("/id53", GetID53).Methods("GET")
r.HandleFunc("/info/{id:[0-9]+}", GetIDInfo).Methods("GET")
r.HandleFunc("/seq/{id}", GetSeq).Methods("GET")

if cfg.Ip != "" {
cfg.HttpAddr = cfg.Ip + ":" + strings.Split(cfg.HttpAddr, ":")[1]
Expand All @@ -121,6 +125,26 @@ func New(cfg *MarsConfig) *mars {
redisPasswd: cfg.RedisPasswd,
httpName: cfg.HttpName,
httpPasswd: cfg.HttpPasswd,
seqMap: make(map[string]*sequence.Sequence),
}
var seqConf []sequence.Config
getenv := os.Getenv("SEQ.CONF")
if getenv != "" {
seqSetting([]byte(getenv), seqConf, cfg)
} else {
_, err := os.Stat("./seq.conf")
if err == nil {

file, err := os.ReadFile("./seq.conf")
if err != nil {
m.log.Fatalf("Error reading seq.conf file:", err)
}
seqSetting(file, seqConf, cfg)
} else if os.IsNotExist(err) {
m.log.Warnf("文件 %s 不存在\n", "seq.conf")
} else {
m.log.Warn("发生了其他错误", err.Error())
}
}

fmt.Printf(`
Expand All @@ -144,6 +168,15 @@ func New(cfg *MarsConfig) *mars {
return m
}

func seqSetting(file []byte, seqConf []sequence.Config, cfg *MarsConfig) {
if err := json.Unmarshal(file, &seqConf); err != nil {
m.log.Fatalf("Error parse seq.conf file:", err)
}
for _, config := range seqConf {
m.seqMap[config.Id] = sequence.New(m.etcdCli, cfg.AppKey, m.log, config.Id, config.TimeRollback, config.NumRollback)
}
}

func (m *mars) StartHttp() {
httpLog := m.log.WithField("module", "http")
go func() {
Expand Down Expand Up @@ -257,6 +290,17 @@ func (m *mars) initRedisSrv() {
if c.Arg(0).String() == "id53" {
return m.gen.GetStr53()
}
if strings.HasPrefix(c.Arg(0).String(), "seq/") {
id, _ := strings.CutPrefix(c.Arg(0).String(), "seq/")
if s, ok := m.seqMap[id]; ok {
next, err := s.Next()
if err != nil {
return redeo.ErrUnknownCommand(err.Error())
}
return next
}
return redeo.ErrUnknownCommand(c.Arg(0).String())
}
return nil
} else {
return errors.New("NOAUTH Authentication required.")
Expand Down Expand Up @@ -304,6 +348,22 @@ func (m *mars) initRedisSrv() {
}
return
}
if strings.HasPrefix(c.Arg(0).String(), "seq/") {
id, _ := strings.CutPrefix(c.Arg(0).String(), "seq/")
if s, ok := m.seqMap[id]; ok {
for i := 0; i < num; i++ {
next, err := s.Next()
if err != nil {
w.AppendError(redeo.UnknownCommand(c.Arg(0).String()))
return
}
w.AppendBulkString(next)
}
return
}
w.AppendError(redeo.UnknownCommand(c.Arg(0).String()))
return
}
} else {
w.AppendError("NOAUTH Authentication required.")
return
Expand Down
7 changes: 7 additions & 0 deletions seq.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{
"id": "1",
"timeRollback": "m",
"numRollback": 1000
}
]
169 changes: 114 additions & 55 deletions sequence/sequence.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,116 @@
package sequence

//
//import (
// "context"
// "fmt"
// clientv3 "go.etcd.io/etcd/client/v3"
// "strconv"
//)
//
//type Sequence struct {
// Id string
// Format string
// TimeRollback string
// NumRollback string
// etcdCli *clientv3.Client
//}
//
//func (s *Sequence) New(etcdCli *clientv3.Client) *Sequence {
// return &Sequence{
// Id: "",
// Format: "",
// TimeRollback: "",
// NumRollback: "",
// etcdCli: etcdCli,
// }
//}
//
//func (s *Sequence) Next() string {
// resp, err := s.etcdCli.Txn(context.TODO()).
// If(clientv3.Compare(clientv3.Value("counter"), "=", "")).
// Then(clientv3.OpPut("counter", "1")).
// Else(clientv3.OpGet("counter")).
// Commit()
//
// if err != nil {
// fmt.Println("Error:", err)
// return
// }
//
// if !resp.Succeeded {
// // Key already exists, increment the value
// currentValue := resp.Responses[0].GetResponseRange().Kvs[0].Value
// fmt.Println("Current value:", string(currentValue))
// atoi, _ := strconv.Atoi(string(currentValue))
// newValue := atoi + 1
//
// // Increment the value
// _, err := s.etcdCli.Put(context.TODO(), "counter", strconv.Itoa(newValue))
// if err != nil {
// fmt.Println("Error incrementing value:", err)
// return
// }
// fmt.Println("Incremented value:", newValue)
// }
// return ""
//}
import (
"context"
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
"strconv"
"time"
)

type Sequence struct {
Id string
TimeRollback string
NumRollback int64
etcdCli *clientv3.Client
appKey string
log *log.Logger
}

type Config struct {
Id string `json:"id"`
TimeRollback string `json:"timeRollback"`
NumRollback int64 `json:"numRollback"`
}

func New(etcdCli *clientv3.Client, appKey string, log *log.Logger, id string, t string, max int64) *Sequence {
return &Sequence{
Id: id,
TimeRollback: t,
NumRollback: max,
etcdCli: etcdCli,
appKey: appKey,
log: log,
}
}

func (s *Sequence) Next() (string, error) {
var put clientv3.Op
key, sec := s.getKeyAndSec()
if sec > 0 {
var lease *clientv3.LeaseGrantResponse
var err error
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
lease, err = s.etcdCli.Grant(ctx, int64(sec))
if err != nil {
s.log.Errorf("lease err %s", err.Error())
return "", err
}
put = clientv3.OpPut(key, "1", clientv3.WithLease(lease.ID))
} else {
put = clientv3.OpPut(key, "1")
}

s.log.Infof("key %s, sec %d", key, sec)
tx, err := s.etcdCli.Txn(context.TODO()).Then(put, clientv3.OpGet(key)).Commit()
if err != nil {
s.log.Errorf("事务err,%s", err.Error())
return "", err
}
if tx.Succeeded {
if len(tx.Responses) == 2 {
s.log.Info("事务成功返回结果,", tx.Responses[1].GetResponseRange().Kvs[0].Version)
s.numRoll(tx.Responses[1].GetResponseRange().Kvs[0].Version, key)
return strconv.FormatInt(tx.Responses[1].GetResponseRange().Kvs[0].Version, 10), nil
}
return "", errors.Errorf("response count err:%d, key:%s", len(tx.Responses), key)
}
return "", errors.New("tx err")
}

func (s *Sequence) numRoll(version int64, key string) {
if s.NumRollback == 0 {
return
}
if s.NumRollback <= version {
_, err := s.etcdCli.Delete(context.TODO(), key)
if err != nil {
s.log.Errorf("numRoll del err %s", err.Error())
}
}
}

func (s *Sequence) getKeyAndSec() (string, int) {
now := time.Now()
if s.TimeRollback == "" {
return s.appKey + "/seq/", 0
}
tmp := s.appKey + "/seq/"
if s.TimeRollback == "m" {
tmp += fmt.Sprintf("%d%d%d%d%d", now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute())
return tmp, 60 - now.Second()
}
if s.TimeRollback == "h" {
tmp += fmt.Sprintf("%d%d%d%d", now.Year(), now.Month(), now.Day(), now.Hour())
return tmp, (60-now.Minute())*60 + (60 - now.Second())
}
if s.TimeRollback == "d" {
tmp += fmt.Sprintf("%d%d%d", now.Year(), now.Month(), now.Day())
return tmp, (24-now.Hour())*3600 + (60-now.Minute())*60 + (60 - now.Second())
}
if s.TimeRollback == "M" {
tmp += fmt.Sprintf("%d%d", now.Year(), now.Month())
daysInMonth := now.AddDate(0, 1, -now.Day()).Day()
return tmp, ((daysInMonth - now.Day()) * 24 * 3600) + (24-now.Hour())*3600 + (60-now.Minute())*60 + (60 - now.Second())
}
if s.TimeRollback == "y" {
tmp += fmt.Sprintf("%d", now.Year())
daysInMonth := now.AddDate(0, 1, -now.Day()).Day()
lastDayOfYear := time.Date(now.Year(), time.December, 31, 0, 0, 0, 0, time.UTC).YearDay()
return tmp, ((lastDayOfYear - now.YearDay()) * 24 * 3600) + ((daysInMonth - now.Day()) * 24 * 3600) + (24-now.Hour())*3600 + (60-now.Minute())*60 + (60 - now.Second())
}
return tmp, 0
}
17 changes: 17 additions & 0 deletions sequence/sequence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sequence

import (
"testing"
"time"
)

func TestTime(t *testing.T) {
now := time.Now()
duration, _ := time.ParseDuration("5m4s")
t.Log(duration.Minutes())

daysInMonth := now.AddDate(0, 1, -now.Day()).Day()
lastDayOfYear := time.Date(now.Year(), time.December, 31, 0, 0, 0, 0, time.UTC)
t.Log(daysInMonth)
t.Log(lastDayOfYear.YearDay())
}

0 comments on commit 4cd0ad0

Please sign in to comment.