Skip to content

Commit

Permalink
Merge pull request #8 from AnkurAnand11/support-async-for-write-event
Browse files Browse the repository at this point in the history
Support async version for write event
  • Loading branch information
tkaitchuck authored Nov 14, 2023
2 parents 3c293dd + fb7af4a commit 8893cc3
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ cfg_if! {
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::{exceptions, PyObjectProtocol};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
use pravega_client::event::reader_group::ReaderGroupConfigBuilder;
use crate::stream_reader_group::StreamReaderGroupConfig;
Expand Down Expand Up @@ -440,7 +442,7 @@ impl StreamManager {
stream: Stream::from(stream_name.to_string()),
};
let stream_writer = StreamWriter::new(
self.cf.create_event_writer(scoped_stream.clone()),
Arc::new(Mutex::new(self.cf.create_event_writer(scoped_stream.clone()))),
self.cf.runtime_handle(),
scoped_stream,
max_inflight_events,
Expand Down
65 changes: 61 additions & 4 deletions src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ cfg_if! {
use tracing::trace;
use tracing::info;
use std::time::Duration;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::time::timeout;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use pravega_client::util::oneshot_holder::OneShotHolder;
use pravega_client::error::Error;
}
}

Expand All @@ -34,7 +38,7 @@ cfg_if! {
#[cfg(feature = "python_binding")]
#[pyclass]
pub(crate) struct StreamWriter {
writer: EventWriter,
writer: Arc<Mutex<EventWriter>>,
runtime_handle: Handle,
stream: ScopedStream,
inflight: OneShotHolder<WriterError>,
Expand All @@ -45,7 +49,7 @@ const TIMEOUT_IN_SECONDS: u64 = 120;

impl StreamWriter {
pub fn new(
writer: EventWriter,
writer: Arc<Mutex<EventWriter>>,
runtime_handle: Handle,
stream: ScopedStream,
max_inflight_count: usize,
Expand Down Expand Up @@ -88,6 +92,54 @@ impl StreamWriter {
self.write_event_bytes(event.as_bytes(), routing_key)
}

///
/// Write an event into the Pravega Stream asynchronously. The events that are written will appear
/// in the Stream exactly once. The event of type String is converted into bytes with `UTF-8` encoding.
/// The user can optionally specify the routing key.
///
/// ```
/// import pravega_client;
/// manager=pravega_client.StreamManager("tcp://127.0.0.1:9090")
/// // lets assume the Pravega scope and stream are already created.
/// writer=manager.create_writer("scope", "stream")
///
/// // write into Pravega stream without specifying the routing key.
/// await writer.write_event_async("e1")
/// // write into Pravega stream by specifying the routing key.
/// await writer.write_event_async("e2", "key1")
/// ```
///
#[pyo3(text_signature = "($self, event, routing_key=None)")]
#[args(event, routing_key = "None", "*")]
pub fn write_event_async<'p>(&mut self, event: &str, routing_key: Option<String>, py: Python<'p>) -> PyResult<&'p PyAny> {
let writer = self.writer.clone();
// to_vec creates an owned copy of the python byte array object.
let event = event.as_bytes().to_vec();

pyo3_asyncio::tokio::future_into_py(py, async move {
let write_result: oneshot::Receiver<Result<(), Error>>;
match routing_key {
Option::None => {
trace!("Writing a single event with no routing key");
write_result = writer.lock().await.write_event(event).await;
}
Option::Some(key) => {
trace!("Writing a single event for a given routing key {:?}", key);
write_result = writer.lock().await.write_event_by_routing_key(key, event).await;
}
}
match write_result.await {
Ok(_) => {
Python::with_gil(|py| Ok(py.None()))
},
Err(e) => Err(exceptions::PyOSError::new_err(format!(
"Error observed while writing an event {:?}",
e
))),
}
})
}

///
/// Write a byte array into the Pravega Stream. This is similar to `write_event(...)` api except
/// that the the event to be written is a byte array. The user can optionally specify the
Expand All @@ -111,18 +163,23 @@ impl StreamWriter {
#[pyo3(text_signature = "($self, event, routing_key=None)")]
#[args(event, routing_key = "None", "*")]
pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option<String>) -> PyResult<()> {
let writer = self.writer.clone();
// to_vec creates an owned copy of the python byte array object.
let write_future: tokio::sync::oneshot::Receiver<Result<(), WriterError>> =
match routing_key {
Option::None => {
trace!("Writing a single event with no routing key");
self.runtime_handle
.block_on(self.writer.write_event(event.to_vec()))
.block_on(async {
writer.lock().await.write_event(event.to_vec()).await
})
}
Option::Some(key) => {
trace!("Writing a single event for a given routing key {:?}", key);
self.runtime_handle
.block_on(self.writer.write_event_by_routing_key(key, event.to_vec()))
.block_on(async {
writer.lock().await.write_event_by_routing_key(key, event.to_vec()).await
})
}
};
let _guard = self.runtime_handle.enter();
Expand Down
41 changes: 41 additions & 0 deletions tests/pravega_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,47 @@ async def test_writeEventAndRead(self):
self.assertEqual(b'test event', event.data(), "Invalid event data")
self.assertEqual(count, 2, "Two events are expected")

async def test_asyncEventwriteAndRead(self):
suffix = str(random.randint(0, 100))
scope = "testAsyncEventWriteScope"
stream = "testAsyncEventWriteStream" + suffix
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager = pravega_client.StreamManager("tcp://127.0.0.1:9090")

print("Creating a scope")
scope_result = stream_manager.create_scope(scope)
print(scope_result)
print("Creating a stream ", stream)
stream_result = stream_manager.create_stream(scope, stream, 1)
print(stream_result)

print("Creating a writer for Stream")
w1 = stream_manager.create_writer(scope, stream)

print("Write events")
await w1.write_event_async("test async write event")
await w1.write_event_async("test async write event")
await w1.write_event_async("test async write event with routing key", "key1")
await w1.write_event_async("test async write event with routing key", "key2")
w1.flush()
# Create a reader Group Configuration to read from HEAD of stream.
rg_config = pravega_client.StreamReaderGroupConfig(False, scope, stream)
reader_group=stream_manager.create_reader_group_with_config("rg" + suffix, scope, rg_config)
r1 = reader_group.create_reader("reader-1")
segment_slice = await r1.get_segment_slice_async()
print(segment_slice)
# consume the segment slice for events.
count1=0
count2=0
for event in segment_slice:
print(event.data())
if event.data() == b'test async write event':
count1+=1
elif event.data() == b'test async write event with routing key':
count2+=1
self.assertEqual(count1, 2, "Two events are expected")
self.assertEqual(count2, 2, "Two events written with routing key are expected")

async def test_writeEventWithInflightAndRead(self):
suffix = str(random.randint(0, 100))
scope = "testRead"
Expand Down

0 comments on commit 8893cc3

Please sign in to comment.