From 277ab3b2f4485539485fc8e08c87d36bb9c815d0 Mon Sep 17 00:00:00 2001 From: Ankur_Anand Date: Fri, 3 Nov 2023 15:47:59 +0530 Subject: [PATCH 1/4] support async for event writer Signed-off-by: Ankur_Anand --- src/stream_writer.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 429b5ae..4b3df8e 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -84,8 +84,11 @@ impl StreamWriter { /// #[pyo3(text_signature = "($self, event, routing_key=None)")] #[args(event, routing_key = "None", "*")] - pub fn write_event(&mut self, event: &str, routing_key: Option) -> PyResult<()> { - self.write_event_bytes(event.as_bytes(), routing_key) + pub fn write_event<'p>(&mut self, event: &str, routing_key: Option, py: Python<'p>) -> PyResult<&'p PyAny> { + pyo3_asyncio::tokio::future_into_py(py, async move { + self.write_event_bytes(event.as_bytes(), routing_key); + Python::with_gil(|py| Ok(py.None())) + }) } /// From ba1de5ea36811ce2679124ab176858d5abd25028 Mon Sep 17 00:00:00 2001 From: Ankur_Anand Date: Tue, 7 Nov 2023 18:09:05 +0530 Subject: [PATCH 2/4] created new event writer async method Signed-off-by: Ankur_Anand --- src/stream_writer.rs | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 4b3df8e..da1d793 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -20,10 +20,14 @@ cfg_if! { use tracing::trace; use tracing::info; use std::time::Duration; + use std::sync::Arc; use tokio::runtime::Handle; use tokio::time::timeout; + use tokio::sync::Mutex; + use tokio::sync::oneshot; use tokio::sync::oneshot::error::RecvError; use pravega_client::util::oneshot_holder::OneShotHolder; + use pravega_client::error::Error; } } @@ -84,10 +88,40 @@ impl StreamWriter { /// #[pyo3(text_signature = "($self, event, routing_key=None)")] #[args(event, routing_key = "None", "*")] - pub fn write_event<'p>(&mut self, event: &str, routing_key: Option, py: Python<'p>) -> PyResult<&'p PyAny> { + pub fn write_event(&mut self, event: &str, routing_key: Option) -> PyResult<()> { + self.write_event_bytes(event.as_bytes(), routing_key) + } + + #[pyo3(text_signature = "($self, event, routing_key=None)")] + #[args(event, routing_key = "None", "*")] + pub fn write_event_async<'p>(&mut self, event: &str, routing_key: Option, py: Python<'p>) -> PyResult<&'p PyAny> { + let writer: EventWriter = self.writer.into(); + let arc_mutex_writer = Arc::new(Mutex::new(writer)); + let writer = arc_mutex_writer.clone(); + // to_vec creates an owned copy of the python byte array object. + let event = event.as_bytes().to_vec(); + pyo3_asyncio::tokio::future_into_py(py, async move { - self.write_event_bytes(event.as_bytes(), routing_key); - Python::with_gil(|py| Ok(py.None())) + let write_result: oneshot::Receiver>; + match routing_key { + Option::None => { + trace!("Writing a single event with no routing key"); + write_result = writer.lock().await.write_event(event.to_vec()).await; + } + Option::Some(key) => { + trace!("Writing a single event for a given routing key {:?}", key); + write_result = writer.lock().await.write_event_by_routing_key(key, event.to_vec()).await; + } + } + match write_result.await { + Ok(_) => { + Python::with_gil(|py| Ok(py.None())) + }, + Err(e) => Err(exceptions::PyOSError::new_err(format!( + "Error observed while writing an event {:?}", + e + ))), + } }) } From 7bcb59beaf30849b99c8a1e59c5ebefccd12a0a3 Mon Sep 17 00:00:00 2001 From: Ankur_Anand Date: Wed, 8 Nov 2023 18:28:56 +0530 Subject: [PATCH 3/4] using arc_mutex in self Signed-off-by: Ankur_Anand --- src/stream_manager.rs | 4 +++- src/stream_writer.rs | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/stream_manager.rs b/src/stream_manager.rs index 6aa1409..5fbf538 100644 --- a/src/stream_manager.rs +++ b/src/stream_manager.rs @@ -20,6 +20,8 @@ cfg_if! { use pyo3::prelude::*; use pyo3::PyResult; use pyo3::{exceptions, PyObjectProtocol}; + use std::sync::Arc; + use tokio::sync::Mutex; use tracing::info; use pravega_client::event::reader_group::ReaderGroupConfigBuilder; use crate::stream_reader_group::StreamReaderGroupConfig; @@ -440,7 +442,7 @@ impl StreamManager { stream: Stream::from(stream_name.to_string()), }; let stream_writer = StreamWriter::new( - self.cf.create_event_writer(scoped_stream.clone()), + Arc::new(Mutex::new(self.cf.create_event_writer(scoped_stream.clone()))), self.cf.runtime_handle(), scoped_stream, max_inflight_events, diff --git a/src/stream_writer.rs b/src/stream_writer.rs index da1d793..b8bacea 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -38,7 +38,7 @@ cfg_if! { #[cfg(feature = "python_binding")] #[pyclass] pub(crate) struct StreamWriter { - writer: EventWriter, + writer: Arc>, runtime_handle: Handle, stream: ScopedStream, inflight: OneShotHolder, @@ -49,7 +49,7 @@ const TIMEOUT_IN_SECONDS: u64 = 120; impl StreamWriter { pub fn new( - writer: EventWriter, + writer: Arc>, runtime_handle: Handle, stream: ScopedStream, max_inflight_count: usize, @@ -92,12 +92,27 @@ impl StreamWriter { self.write_event_bytes(event.as_bytes(), routing_key) } + /// + /// Write an event into the Pravega Stream asynchronously. The events that are written will appear + /// in the Stream exactly once. The event of type String is converted into bytes with `UTF-8` encoding. + /// The user can optionally specify the routing key. + /// + /// ``` + /// import pravega_client; + /// manager=pravega_client.StreamManager("tcp://127.0.0.1:9090") + /// // lets assume the Pravega scope and stream are already created. + /// writer=manager.create_writer("scope", "stream") + /// + /// // write into Pravega stream without specifying the routing key. + /// await writer.write_event_async("e1") + /// // write into Pravega stream by specifying the routing key. + /// await writer.write_event_async("e2", "key1") + /// ``` + /// #[pyo3(text_signature = "($self, event, routing_key=None)")] #[args(event, routing_key = "None", "*")] pub fn write_event_async<'p>(&mut self, event: &str, routing_key: Option, py: Python<'p>) -> PyResult<&'p PyAny> { - let writer: EventWriter = self.writer.into(); - let arc_mutex_writer = Arc::new(Mutex::new(writer)); - let writer = arc_mutex_writer.clone(); + let writer = self.writer.clone(); // to_vec creates an owned copy of the python byte array object. let event = event.as_bytes().to_vec(); @@ -106,11 +121,11 @@ impl StreamWriter { match routing_key { Option::None => { trace!("Writing a single event with no routing key"); - write_result = writer.lock().await.write_event(event.to_vec()).await; + write_result = writer.lock().await.write_event(event).await; } Option::Some(key) => { trace!("Writing a single event for a given routing key {:?}", key); - write_result = writer.lock().await.write_event_by_routing_key(key, event.to_vec()).await; + write_result = writer.lock().await.write_event_by_routing_key(key, event).await; } } match write_result.await { @@ -148,18 +163,23 @@ impl StreamWriter { #[pyo3(text_signature = "($self, event, routing_key=None)")] #[args(event, routing_key = "None", "*")] pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option) -> PyResult<()> { + let writer = self.writer.clone(); // to_vec creates an owned copy of the python byte array object. let write_future: tokio::sync::oneshot::Receiver> = match routing_key { Option::None => { trace!("Writing a single event with no routing key"); self.runtime_handle - .block_on(self.writer.write_event(event.to_vec())) + .block_on(async { + writer.lock().await.write_event(event.to_vec()).await + }) } Option::Some(key) => { trace!("Writing a single event for a given routing key {:?}", key); self.runtime_handle - .block_on(self.writer.write_event_by_routing_key(key, event.to_vec())) + .block_on(async { + writer.lock().await.write_event_by_routing_key(key, event.to_vec()).await + }) } }; let _guard = self.runtime_handle.enter(); From fb7af4a57a709115298d2c524b55da8906257be9 Mon Sep 17 00:00:00 2001 From: Ankur_Anand Date: Fri, 10 Nov 2023 12:07:29 +0530 Subject: [PATCH 4/4] Adding test case Signed-off-by: Ankur_Anand --- tests/pravega_reader_test.py | 41 ++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/pravega_reader_test.py b/tests/pravega_reader_test.py index 70555e5..18a1587 100644 --- a/tests/pravega_reader_test.py +++ b/tests/pravega_reader_test.py @@ -59,6 +59,47 @@ async def test_writeEventAndRead(self): self.assertEqual(b'test event', event.data(), "Invalid event data") self.assertEqual(count, 2, "Two events are expected") + async def test_asyncEventwriteAndRead(self): + suffix = str(random.randint(0, 100)) + scope = "testAsyncEventWriteScope" + stream = "testAsyncEventWriteStream" + suffix + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager = pravega_client.StreamManager("tcp://127.0.0.1:9090") + + print("Creating a scope") + scope_result = stream_manager.create_scope(scope) + print(scope_result) + print("Creating a stream ", stream) + stream_result = stream_manager.create_stream(scope, stream, 1) + print(stream_result) + + print("Creating a writer for Stream") + w1 = stream_manager.create_writer(scope, stream) + + print("Write events") + await w1.write_event_async("test async write event") + await w1.write_event_async("test async write event") + await w1.write_event_async("test async write event with routing key", "key1") + await w1.write_event_async("test async write event with routing key", "key2") + w1.flush() + # Create a reader Group Configuration to read from HEAD of stream. + rg_config = pravega_client.StreamReaderGroupConfig(False, scope, stream) + reader_group=stream_manager.create_reader_group_with_config("rg" + suffix, scope, rg_config) + r1 = reader_group.create_reader("reader-1") + segment_slice = await r1.get_segment_slice_async() + print(segment_slice) + # consume the segment slice for events. + count1=0 + count2=0 + for event in segment_slice: + print(event.data()) + if event.data() == b'test async write event': + count1+=1 + elif event.data() == b'test async write event with routing key': + count2+=1 + self.assertEqual(count1, 2, "Two events are expected") + self.assertEqual(count2, 2, "Two events written with routing key are expected") + async def test_writeEventWithInflightAndRead(self): suffix = str(random.randint(0, 100)) scope = "testRead"