-
Notifications
You must be signed in to change notification settings - Fork 94
feat: client reconnection management #1179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/da-sequencer-node
Are you sure you want to change the base?
feat: client reconnection management #1179
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is a good start but:
- the server start and shutdown doesn't work
- the client reconnection logic must be implemented in the client. It's not done currently.
So you should update the test that failed, then implements the logic to make the test pass.
|
||
let service = DaSequencerNodeServiceServer::new(MockService); | ||
|
||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the purpose of the oneshot channel. To shut down the server you've to abort the task or stop the server but here, the channel just waits before starting the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is becoming complicated and doesn’t fully address our needs. I think some parts should be managed by the caller (the fullnode), while others should be handled here. I need to think about it more.
Simple methods like batch_write can be handled easily, but if batch_write fails, it means the stream has an issue and should be restarted. Perhaps we should use a heartbeat from the client to the server in parallel with the current one to detect issues and bypass connection errors.
In parallel, we also need to manage the current server heartbeat, which could be used to detect an idle connection that arrives intermittently.
Let’s keep this PR for now. I’ll open a specific issue to describe the entire behavior. We will see what we do with the PR after.
Err(err) => { | ||
tracing::warn!( | ||
"DA sequencer Http2 connection failed: {}. Retrying in 10s...", | ||
"DA sequencer HTTP/2 connection failed: {}. Retrying in 2s...", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set to 10 sec because it takes more than 2s to restart the DA.
client: DaSequencerNodeServiceClient<tonic::transport::Channel>, | ||
client: DaSequencerNodeServiceClient<Channel>, | ||
connection_string: String, | ||
last_received_height: Arc<Mutex<Option<u64>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last_received_height
is a u64 to you can use AtomicU64 that is more efficient than a mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should still use Arc though? In that case, in try_connect
, what value should be used in the returned client?
We had
last_received_height: Arc::new(Mutex::new(None)),
So I'm thinking either
last_received_height: Arc::new(AtomicU64::new(0)),
or
last_received_height: Arc::new(AtomicU64::new(u64::MAX))
would make sense?
I'm thinking MAX would be better, to represent that no value is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes Max is better for a non Value. You still need the Arc to share the Atomic reference.
)), | ||
Err(e) => { | ||
tracing::warn!("stream_read_from_height failed, trying reconnect: {e}"); | ||
self.reconnect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You try only one time immediately. You should have more retry and wait a little before retrying.
tracing::warn!("stream_read_from_height failed, trying reconnect: {e}"); | ||
self.reconnect() | ||
.await | ||
.map_err(|e| Status::unavailable(format!("Reconnect failed: {e}")))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No error should be return (ie: no ?) until the end of the reconnect loop.
stream: Streaming<StreamReadFromHeightResponse>, | ||
last_received_height: Arc<Mutex<Option<u64>>>, | ||
) -> Pin<Box<dyn Stream<Item = Result<StreamReadFromHeightResponse, Status>> + Send>> { | ||
let wrapped = unfold((stream, last_received_height), |(mut s, tracker)| async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should call tracker
last_received_height
Some((Ok(msg), (s, tracker))) | ||
} | ||
Ok(None) => None, | ||
Err(e) => Some((Err(e), (s, tracker))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream break or return an error, we should try to reconnect. But it starts to be very complicated.
Summary
protocol-units
,da sequencer
.Changelog
Add two tests to da sequencer client crate:
test_client_reconnect_if_connection_fails
test_reopen_block_stream_at_correct_height
with mock RPC methods.
Testing
cargo test -p movement-da-sequencer-client