@@ -74,12 +74,11 @@ type participant struct {
7474 keyBuilder * KeyBuilder
7575 zkClient * uzk.Client
7676 // Mirrors org.apache.helix.participant.HelixStateMachineEngine
77- // stateModelName->stateModelProcessor
78- stateModelProcessors sync. Map
77+ stateModelProcessorsMu sync. Mutex
78+ stateModelProcessors map [ string ] * StateModelProcessor // state model name to processor
7979 stateModelProcessorLocks map [string ]* sync.Mutex
8080 stateModel StateModel
81- sync.Mutex
82- dataAccessor * DataAccessor
81+ dataAccessor * DataAccessor
8382
8483 // fatalErrChan would notify user when a fatal error occurs
8584 fatalErrChan chan error
@@ -122,6 +121,7 @@ func NewParticipant(
122121 port : port ,
123122 keyBuilder : keyBuilder ,
124123 zkClient : zkClient ,
124+ stateModelProcessors : make (map [string ]* StateModelProcessor ),
125125 stateModelProcessorLocks : make (map [string ]* sync.Mutex ),
126126 dataAccessor : newDataAccessor (zkClient , keyBuilder ),
127127 stateModel : NewStateModel (),
@@ -159,7 +159,9 @@ func (p *participant) IsConnected() bool {
159159
160160// RegisterStateModel associates state trasition functions with the participant
161161func (p * participant ) RegisterStateModel (stateModelName string , processor * StateModelProcessor ) {
162- p .stateModelProcessors .Store (stateModelName , processor )
162+ p .stateModelProcessorsMu .Lock ()
163+ p .stateModelProcessors [stateModelName ] = processor
164+ p .stateModelProcessorsMu .Unlock ()
163165 p .stateModelProcessorLocks [stateModelName ] = & sync.Mutex {}
164166}
165167
@@ -435,7 +437,7 @@ func (p *participant) handleMsg(msg *model.Message) error {
435437 mu , ok := p .stateModelProcessorLocks [msg .GetStateModelDef ()]
436438 if ! ok {
437439 p .logger .Error ("failed to find state model in stateModelProcessorLocks" ,
438- zap .String ("StateModelDefinition " , msg .GetStateModelDef ()),
440+ zap .String ("stateModelDefinition " , msg .GetStateModelDef ()),
439441 zap .Any ("stateModelProcessorLocks" , p .stateModelProcessorLocks ))
440442 return errMsgMissingStateModelDef
441443 }
@@ -556,8 +558,9 @@ func (p *participant) handleStateTransition(msg *model.Message) error {
556558 // set the msg execution time
557559 msg .SetExecuteStartTime (time .Now ())
558560
559- if val , ok := p .stateModelProcessors .Load (msg .GetStateModelDef ()); ok {
560- processor := val .(* StateModelProcessor )
561+ p .stateModelProcessorsMu .Lock ()
562+ defer p .stateModelProcessorsMu .Unlock ()
563+ if processor , ok := p .stateModelProcessors [msg .GetStateModelDef ()]; ok {
561564 if toStateHandler , ok := processor .Transitions [fromState ]; ok {
562565 if handler , ok := toStateHandler [toState ]; ok {
563566 // TODO: deal with handler error
0 commit comments