From 14a02ff25cda3877a3e96709bdb15d8aa665c313 Mon Sep 17 00:00:00 2001 From: Shwetha N <97447566+ShwethaSNayak@users.noreply.github.com> Date: Sat, 24 Feb 2024 07:33:49 +0530 Subject: [PATCH] Expose Reader streamcut to read from given Stream Cut (#22) If Client application crashes and if user wants to continue read from the stream where it left off, currently, there is no method to obtain StreamCuts from the ReaderGroup. StreamCuts are set of segment/offset pairs for a single stream that represent a consistent position in the stream. Signed-off-by: Shwetha N --- src/lib.rs | 3 ++- src/stream_manager.rs | 42 ++++++++++++++++++++++---------- src/stream_reader.rs | 3 +++ src/stream_reader_group.rs | 47 ++++++++++++++++++++++++++++++++++-- tests/pravega_reader_test.py | 1 + 5 files changed, 80 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7444dc6..6b863f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ #[macro_use] extern crate cfg_if; -use crate::stream_reader_group::StreamReaderGroupConfig; +use crate::stream_reader_group::{StreamCuts, StreamReaderGroupConfig}; mod byte_stream; mod stream_manager; @@ -58,6 +58,7 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; let txn_exception = py.get_type::(); txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?; diff --git a/src/stream_manager.rs b/src/stream_manager.rs index 2d39752..326790b 100644 --- a/src/stream_manager.rs +++ b/src/stream_manager.rs @@ -8,6 +8,9 @@ // http://www.apache.org/licenses/LICENSE-2.0 // +use std::collections::HashMap; +use pravega_client::event::reader_group::{StreamCutV1, StreamCutVersioned}; +use crate::stream_reader_group::StreamCuts; cfg_if! { if #[cfg(feature = "python_binding")] { use crate::stream_writer_transactional::StreamTxnWriter; @@ -558,7 +561,7 @@ impl StreamManager { /// event.reader_group=manager.create_reader_group("rg1", "scope", "stream", true) /// ``` /// - #[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail)")] + #[pyo3(text_signature = "($self, reader_group_name, scope_name, stream_name, read_from_tail, stream_cut)")] #[args(read_from_tail = "false")] pub fn create_reader_group( &self, @@ -566,24 +569,37 @@ impl StreamManager { scope_name: &str, stream_name: &str, read_from_tail: bool, + stream_cut: Option, ) -> PyResult { let scope = Scope::from(scope_name.to_string()); + let stream = Stream::from(stream_name.to_string()); let scoped_stream = ScopedStream { scope: scope.clone(), - stream: Stream::from(stream_name.to_string()), + stream: stream.clone(), }; let handle = self.cf.runtime_handle(); - let rg_config = if read_from_tail { - // Create a reader group to read from the current TAIL/end of the Stream. - ReaderGroupConfigBuilder::default() - .read_from_tail_of_stream(scoped_stream) - .build() - } else { - // Create a reader group to read from current HEAD/start of the Stream. - ReaderGroupConfigBuilder::default() - .read_from_head_of_stream(scoped_stream) - .build() - }; + let rg_config = if let Some(ref stream_cut) = stream_cut { + let mut positions = HashMap::new(); + // Iterate over the keys of the offset_map + for (segment_val, position) in stream_cut.stream_cuts.segment_offset_map.iter() { + let scoped_segment = ScopedSegment::new(scope.clone(), stream.clone(), Segment::from(*segment_val)); + positions.insert(scoped_segment, *position); + } + let stream_cut_v1 = StreamCutV1::new(scoped_stream.clone(), positions); + // Create a reader group to read from given StreamCut . + ReaderGroupConfigBuilder::default().read_from_stream(scoped_stream.clone(), StreamCutVersioned::V1(stream_cut_v1)).build() + }else if read_from_tail { + // Create a reader group to read from the current TAIL/end of the Stream. + ReaderGroupConfigBuilder::default() + .read_from_tail_of_stream(scoped_stream) + .build() + } else { + // Create a reader group to read from current HEAD/start of the Stream. + ReaderGroupConfigBuilder::default() + .read_from_head_of_stream(scoped_stream) + .build() + }; + let rg = handle.block_on(self.cf.create_reader_group_with_config( reader_group_name.to_string(), rg_config, diff --git a/src/stream_reader.rs b/src/stream_reader.rs index 419c913..a817d04 100644 --- a/src/stream_reader.rs +++ b/src/stream_reader.rs @@ -148,6 +148,9 @@ impl EventData { fn data(&self) -> &[u8] { self.value.as_slice() } + + ///Return the offset + fn offset(&self) -> i64 { self.offset_in_segment } /// Returns the string representation. fn to_str(&self) -> String { format!("offset {:?} data :{:?}", self.offset_in_segment, self.value) diff --git a/src/stream_reader_group.rs b/src/stream_reader_group.rs index a17ca22..6ff6b2c 100644 --- a/src/stream_reader_group.rs +++ b/src/stream_reader_group.rs @@ -10,8 +10,8 @@ cfg_if! { if #[cfg(feature = "python_binding")] { - use pravega_client_shared::ScopedStream; use pravega_client::event::reader_group::ReaderGroup; + use std::collections::HashMap; use pyo3::prelude::*; use pyo3::PyResult; use pyo3::PyObjectProtocol; @@ -22,7 +22,8 @@ cfg_if! { use crate::stream_reader::StreamReader; use pravega_client::event::reader_group::{ReaderGroupConfig, ReaderGroupConfigBuilder}; use pravega_client::event::reader_group_state::ReaderGroupStateError; - use pravega_client_shared::{Scope, Stream}; + use pravega_client_shared::{Scope, Stream, StreamCut}; + use pravega_client_shared::ScopedStream; use pyo3::types::PyTuple; use pyo3::exceptions; } @@ -99,6 +100,33 @@ impl PyObjectProtocol for StreamReaderGroupConfig { } } +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(Clone)] +pub(crate) struct StreamCuts { + pub(crate) stream_cuts: StreamCut, +} +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamCuts { + + fn get_segment_offset_map(&self) -> HashMap { + self.stream_cuts.segment_offset_map.clone() + } + + fn to_str(&self) -> String { + format!("StreamCuts: {:?}", self.stream_cuts) + } +} + +#[cfg(feature = "python_binding")] +#[pyproto] +impl PyObjectProtocol for StreamCuts { + fn __repr__(&self) -> PyResult { + Ok(format!("StreamCuts({:?})", self.to_str())) + } +} + /// /// This represents a Stream reader for a given Stream. /// Note: A python object of StreamReader cannot be created directly without using the StreamManager. @@ -179,6 +207,21 @@ impl StreamReaderGroup { } } + /// Return the latest StreamCut from ReaderGroup. + /// Use this StreamCut in the ReaderGroupConfig to initiate reading from this streamcut. + pub fn get_streamcut(&self) -> PyResult { + + let streamcut = self + .runtime_handle + .block_on(self.reader_group.get_streamcut()); + info!( + "Got streamcut {:?} ", streamcut + ); + Ok(StreamCuts { + stream_cuts: streamcut + }) + } + /// Returns the string representation. fn to_str(&self) -> String { format!( diff --git a/tests/pravega_reader_test.py b/tests/pravega_reader_test.py index 7b633c8..be5c509 100644 --- a/tests/pravega_reader_test.py +++ b/tests/pravega_reader_test.py @@ -7,6 +7,7 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # +import time import pravega_client import random