Skip to content
This repository was archived by the owner on Apr 19, 2024. It is now read-only.

Commit 7c6b730

Browse files
committed
Implement goburrow#25
1 parent 0d0a427 commit 7c6b730

File tree

1 file changed

+75
-40
lines changed

1 file changed

+75
-40
lines changed

tcpclient.go

Lines changed: 75 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,7 @@ func (mb *tcpPackager) Encode(pdu *ProtocolDataUnit) (adu []byte, err error) {
8484

8585
// Verify confirms transaction, protocol and unit id.
8686
func (mb *tcpPackager) Verify(aduRequest []byte, aduResponse []byte) (err error) {
87-
// Transaction id
88-
responseVal := binary.BigEndian.Uint16(aduResponse)
89-
requestVal := binary.BigEndian.Uint16(aduRequest)
90-
if responseVal != requestVal {
91-
err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal)
92-
return
93-
}
94-
// Protocol id
95-
responseVal = binary.BigEndian.Uint16(aduResponse[2:])
96-
requestVal = binary.BigEndian.Uint16(aduRequest[2:])
97-
if responseVal != requestVal {
98-
err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal)
99-
return
100-
}
101-
// Unit id (1 byte)
102-
if aduResponse[6] != aduRequest[6] {
103-
err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6])
104-
return
105-
}
106-
return
87+
return verify(aduRequest, aduResponse)
10788
}
10889

10990
// Decode extracts PDU from TCP frame:
@@ -134,6 +115,10 @@ type tcpTransporter struct {
134115
Timeout time.Duration
135116
// Idle timeout to close the connection
136117
IdleTimeout time.Duration
118+
// Recovery timeout if tcp communication misbehaves
119+
LinkRecoveryTimeout time.Duration
120+
// Recovery timeout if the protocol is malformed, e.g. wrong transaction ID
121+
ProtocolRecoveryTimeout time.Duration
137122
// Transmission logger
138123
Logger *log.Logger
139124

@@ -149,31 +134,58 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error
149134
mb.mu.Lock()
150135
defer mb.mu.Unlock()
151136

137+
var data [tcpMaxLength]byte
138+
recoveryDeadline := time.Now().Add(mb.IdleTimeout)
139+
152140
// Establish a new connection if not connected
153141
if err = mb.connect(); err != nil {
154142
return
155143
}
156-
// Set timer to close when idle
157-
mb.lastActivity = time.Now()
158-
mb.startCloseTimer()
159-
// Set write and read timeout
160-
var timeout time.Time
161-
if mb.Timeout > 0 {
162-
timeout = mb.lastActivity.Add(mb.Timeout)
163-
}
164-
if err = mb.conn.SetDeadline(timeout); err != nil {
165-
return
166-
}
167-
// Send data
168-
mb.logf("modbus: sending % x", aduRequest)
169-
if _, err = mb.conn.Write(aduRequest); err != nil {
170-
return
171-
}
172-
// Read header first
173-
var data [tcpMaxLength]byte
174-
if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err != nil {
175-
return
144+
145+
for {
146+
// Set timer to close when idle
147+
mb.lastActivity = time.Now()
148+
mb.startCloseTimer()
149+
// Set write and read timeout
150+
var timeout time.Time
151+
if mb.Timeout > 0 {
152+
timeout = mb.lastActivity.Add(mb.Timeout)
153+
}
154+
if err = mb.conn.SetDeadline(timeout); err != nil {
155+
return
156+
}
157+
// Send data
158+
mb.logf("modbus: sending % x", aduRequest)
159+
if _, err = mb.conn.Write(aduRequest); err != nil {
160+
return
161+
}
162+
// Read header first
163+
if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil {
164+
aduResponse, err = mb.processResponse(data[:])
165+
if err == nil && mb.ProtocolRecoveryTimeout > 0 && recoveryDeadline.Sub(time.Now()) > 0 &&
166+
verify(aduRequest, aduResponse) != nil {
167+
continue
168+
}
169+
mb.logf("modbus: received % x\n", aduResponse)
170+
return
171+
// Read attempt failed
172+
} else if (err != io.EOF && err != io.ErrUnexpectedEOF) ||
173+
mb.LinkRecoveryTimeout == 0 || recoveryDeadline.Sub(time.Now()) < 0 {
174+
return
175+
}
176+
mb.logf("modbus: close connection and retry, because of %v", err)
177+
178+
mb.close()
179+
time.Sleep(mb.LinkRecoveryTimeout)
180+
181+
// Establish a new connection if not connected
182+
if err = mb.connect(); err != nil {
183+
return
184+
}
176185
}
186+
}
187+
188+
func (mb *tcpTransporter) processResponse(data []byte) (aduResponse []byte, err error) {
177189
// Read length, ignore transaction & protocol id (4 bytes)
178190
length := int(binary.BigEndian.Uint16(data[4:]))
179191
if length <= 0 {
@@ -196,6 +208,29 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error
196208
return
197209
}
198210

211+
func verify(aduRequest []byte, aduResponse []byte) (err error) {
212+
// Transaction id
213+
responseVal := binary.BigEndian.Uint16(aduResponse)
214+
requestVal := binary.BigEndian.Uint16(aduRequest)
215+
if responseVal != requestVal {
216+
err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal)
217+
return
218+
}
219+
// Protocol id
220+
responseVal = binary.BigEndian.Uint16(aduResponse[2:])
221+
requestVal = binary.BigEndian.Uint16(aduRequest[2:])
222+
if responseVal != requestVal {
223+
err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal)
224+
return
225+
}
226+
// Unit id (1 byte)
227+
if aduResponse[6] != aduRequest[6] {
228+
err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6])
229+
return
230+
}
231+
return
232+
}
233+
199234
// Connect establishes a new connection to the address in Address.
200235
// Connect and Close are exported so that multiple requests can be done with one session
201236
func (mb *tcpTransporter) Connect() error {

0 commit comments

Comments
 (0)