diff --git a/src/stream_manager.rs b/src/stream_manager.rs index 6aa1409..5fbf538 100644 --- a/src/stream_manager.rs +++ b/src/stream_manager.rs @@ -20,6 +20,8 @@ cfg_if! { use pyo3::prelude::*; use pyo3::PyResult; use pyo3::{exceptions, PyObjectProtocol}; + use std::sync::Arc; + use tokio::sync::Mutex; use tracing::info; use pravega_client::event::reader_group::ReaderGroupConfigBuilder; use crate::stream_reader_group::StreamReaderGroupConfig; @@ -440,7 +442,7 @@ impl StreamManager { stream: Stream::from(stream_name.to_string()), }; let stream_writer = StreamWriter::new( - self.cf.create_event_writer(scoped_stream.clone()), + Arc::new(Mutex::new(self.cf.create_event_writer(scoped_stream.clone()))), self.cf.runtime_handle(), scoped_stream, max_inflight_events, diff --git a/src/stream_writer.rs b/src/stream_writer.rs index 429b5ae..b8bacea 100644 --- a/src/stream_writer.rs +++ b/src/stream_writer.rs @@ -20,10 +20,14 @@ cfg_if! { use tracing::trace; use tracing::info; use std::time::Duration; + use std::sync::Arc; use tokio::runtime::Handle; use tokio::time::timeout; + use tokio::sync::Mutex; + use tokio::sync::oneshot; use tokio::sync::oneshot::error::RecvError; use pravega_client::util::oneshot_holder::OneShotHolder; + use pravega_client::error::Error; } } @@ -34,7 +38,7 @@ cfg_if! { #[cfg(feature = "python_binding")] #[pyclass] pub(crate) struct StreamWriter { - writer: EventWriter, + writer: Arc>, runtime_handle: Handle, stream: ScopedStream, inflight: OneShotHolder, @@ -45,7 +49,7 @@ const TIMEOUT_IN_SECONDS: u64 = 120; impl StreamWriter { pub fn new( - writer: EventWriter, + writer: Arc>, runtime_handle: Handle, stream: ScopedStream, max_inflight_count: usize, @@ -88,6 +92,54 @@ impl StreamWriter { self.write_event_bytes(event.as_bytes(), routing_key) } + /// + /// Write an event into the Pravega Stream asynchronously. The events that are written will appear + /// in the Stream exactly once. The event of type String is converted into bytes with `UTF-8` encoding. + /// The user can optionally specify the routing key. + /// + /// ``` + /// import pravega_client; + /// manager=pravega_client.StreamManager("tcp://127.0.0.1:9090") + /// // lets assume the Pravega scope and stream are already created. + /// writer=manager.create_writer("scope", "stream") + /// + /// // write into Pravega stream without specifying the routing key. + /// await writer.write_event_async("e1") + /// // write into Pravega stream by specifying the routing key. + /// await writer.write_event_async("e2", "key1") + /// ``` + /// + #[pyo3(text_signature = "($self, event, routing_key=None)")] + #[args(event, routing_key = "None", "*")] + pub fn write_event_async<'p>(&mut self, event: &str, routing_key: Option, py: Python<'p>) -> PyResult<&'p PyAny> { + let writer = self.writer.clone(); + // to_vec creates an owned copy of the python byte array object. + let event = event.as_bytes().to_vec(); + + pyo3_asyncio::tokio::future_into_py(py, async move { + let write_result: oneshot::Receiver>; + match routing_key { + Option::None => { + trace!("Writing a single event with no routing key"); + write_result = writer.lock().await.write_event(event).await; + } + Option::Some(key) => { + trace!("Writing a single event for a given routing key {:?}", key); + write_result = writer.lock().await.write_event_by_routing_key(key, event).await; + } + } + match write_result.await { + Ok(_) => { + Python::with_gil(|py| Ok(py.None())) + }, + Err(e) => Err(exceptions::PyOSError::new_err(format!( + "Error observed while writing an event {:?}", + e + ))), + } + }) + } + /// /// Write a byte array into the Pravega Stream. This is similar to `write_event(...)` api except /// that the the event to be written is a byte array. The user can optionally specify the @@ -111,18 +163,23 @@ impl StreamWriter { #[pyo3(text_signature = "($self, event, routing_key=None)")] #[args(event, routing_key = "None", "*")] pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option) -> PyResult<()> { + let writer = self.writer.clone(); // to_vec creates an owned copy of the python byte array object. let write_future: tokio::sync::oneshot::Receiver> = match routing_key { Option::None => { trace!("Writing a single event with no routing key"); self.runtime_handle - .block_on(self.writer.write_event(event.to_vec())) + .block_on(async { + writer.lock().await.write_event(event.to_vec()).await + }) } Option::Some(key) => { trace!("Writing a single event for a given routing key {:?}", key); self.runtime_handle - .block_on(self.writer.write_event_by_routing_key(key, event.to_vec())) + .block_on(async { + writer.lock().await.write_event_by_routing_key(key, event.to_vec()).await + }) } }; let _guard = self.runtime_handle.enter(); diff --git a/tests/pravega_reader_test.py b/tests/pravega_reader_test.py index 70555e5..18a1587 100644 --- a/tests/pravega_reader_test.py +++ b/tests/pravega_reader_test.py @@ -59,6 +59,47 @@ async def test_writeEventAndRead(self): self.assertEqual(b'test event', event.data(), "Invalid event data") self.assertEqual(count, 2, "Two events are expected") + async def test_asyncEventwriteAndRead(self): + suffix = str(random.randint(0, 100)) + scope = "testAsyncEventWriteScope" + stream = "testAsyncEventWriteStream" + suffix + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager = pravega_client.StreamManager("tcp://127.0.0.1:9090") + + print("Creating a scope") + scope_result = stream_manager.create_scope(scope) + print(scope_result) + print("Creating a stream ", stream) + stream_result = stream_manager.create_stream(scope, stream, 1) + print(stream_result) + + print("Creating a writer for Stream") + w1 = stream_manager.create_writer(scope, stream) + + print("Write events") + await w1.write_event_async("test async write event") + await w1.write_event_async("test async write event") + await w1.write_event_async("test async write event with routing key", "key1") + await w1.write_event_async("test async write event with routing key", "key2") + w1.flush() + # Create a reader Group Configuration to read from HEAD of stream. + rg_config = pravega_client.StreamReaderGroupConfig(False, scope, stream) + reader_group=stream_manager.create_reader_group_with_config("rg" + suffix, scope, rg_config) + r1 = reader_group.create_reader("reader-1") + segment_slice = await r1.get_segment_slice_async() + print(segment_slice) + # consume the segment slice for events. + count1=0 + count2=0 + for event in segment_slice: + print(event.data()) + if event.data() == b'test async write event': + count1+=1 + elif event.data() == b'test async write event with routing key': + count2+=1 + self.assertEqual(count1, 2, "Two events are expected") + self.assertEqual(count2, 2, "Two events written with routing key are expected") + async def test_writeEventWithInflightAndRead(self): suffix = str(random.randint(0, 100)) scope = "testRead"