Skip to content

Commit

Permalink
Merge pull request #18 from AnkurAnand11/codecov-improvement
Browse files Browse the repository at this point in the history
Changes to generate code coverage report for python client and add test code to improve code coverage.
  • Loading branch information
tkaitchuck authored Feb 2, 2024
2 parents 5b0176a + 25974b5 commit f0446dd
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 68 deletions.
41 changes: 31 additions & 10 deletions .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,23 @@ jobs:
timeout-minutes: 25
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
default: true
override: true
profile: minimal
components: llvm-tools-preview
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v1
with:
key: coverage-cargo-ubuntu
continue-on-error: true
- uses: actions/cache@v3
with:
path: |
Expand All @@ -39,17 +51,26 @@ jobs:
pravega-0.13.0/bin/pravega-standalone > pravega.log 2>&1 &
sleep 120 && echo "Started standalone"
tail pravega.log
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install maturin and tox
run: pip install 'maturin>=0.14,<0.15' virtualenv tox==3.28.0 tox-pyo3
- name: Test with tox
run: tox -c tox.ini
- name: Setup virtual environment
run: |
python -m venv venv
source venv/bin/activate
- name: Run coverage
run: |
source venv/bin/activate
pip install 'maturin>=0.14,<0.15' virtualenv tox==3.28.0 tox-pyo3
source <(cargo llvm-cov show-env --export-prefix)
export CARGO_TARGET_DIR=$CARGO_LLVM_COV_TARGET_DIR
export CARGO_INCREMENTAL=1
cargo llvm-cov clean --workspace
cargo test
maturin develop
pip install -r requirements.txt
pytest tests --cov=pravega_client --cov-report xml --timeout=300 -vvvvv
cargo llvm-cov --no-run --lcov --output-path coverage.lcov
- uses: codecov/codecov-action@v3
with:
files: coverage.xml
files: coverage.lcov,coverage.xml
name: ${{github.job}}-reports
- name: Upload Pravega standalone logs
uses: actions/upload-artifact@v2
Expand Down
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ python_binding = ["pyo3", "pyo3-asyncio"]
#Run tests for bindings using command cargo test --no-default-features

[dependencies]
tracing = "0.1.17"
tracing-futures = "0.2.4"
tracing-subscriber = "0.2.2"
tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.2"
pravega-client = {version = "0.3"}
pravega-client-shared = {version = "0.3"}
pravega-controller-client = {version = "0.3"}
pravega-wire-protocol = {version = "0.3"}
pravega-client-retry = {version = "0.3"}
pravega-connection-pool = {version = "0.3"}
pravega-client-config = {version = "0.3"}
tokio = "1.1"
lazy_static = "1.4.0"
tokio = { version = "1", features = ["full"] }
lazy_static = "1.4"
uuid = {version = "0.8", features = ["v4"]}
futures = "0.3.5"
futures = "0.3"
derive-new = "0.5"
#Python bindings
pyo3 = { version = "0.14.5" , features = ["extension-module", "multiple-pymethods"], optional = true }
pyo3 = { version = "0.14.5" ,optional = true }
pyo3-asyncio = { version = "0.14", features = ["tokio-runtime"], optional = true }
#WASM bindings
wasm-bindgen = { version = "0.2.63", optional = true }
cfg-if = "0.1.10"
cfg-if = "1.0.0"
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pytest==7.4.4
pytest-timeout==2.2.0
aiounittest==1.4.2
pytest-cov==4.1.0
5 changes: 1 addition & 4 deletions src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,7 @@ impl ByteStream {

impl Drop for ByteStream {
fn drop(&mut self) {
info!(
"Drop invoked on ByteStream {:?}, invoking flush",
self.stream
);
info!("Drop invoked on ByteStream {:?}, invoking flush", self.stream);
if let Err(e) = self.runtime_handle.block_on(self.writer.flush()) {
error!("Error while flushing byteStream {:?}", e);
}
Expand Down
27 changes: 6 additions & 21 deletions src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ impl StreamManager {
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
info!("creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
Expand Down Expand Up @@ -311,10 +308,7 @@ impl StreamManager {
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
info!("updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
Expand Down Expand Up @@ -346,10 +340,7 @@ impl StreamManager {
stream_name: &str,
) -> PyResult<Option<Vec<String>>> {
let handle = self.cf.runtime_handle();
info!(
"fetch tags for stream {:?} under scope {:?}",
stream_name, scope_name,
);
info!("fetch tags for stream {:?} under scope {:?}", stream_name, scope_name);
let stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
Expand All @@ -370,10 +361,7 @@ impl StreamManager {
#[pyo3(text_signature = "($self, scope_name, stream_name)")]
pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"Sealing stream {:?} under scope {:?} ",
stream_name, scope_name
);
info!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
Expand All @@ -395,10 +383,7 @@ impl StreamManager {
#[pyo3(text_signature = "($self, scope_name, stream_name)")]
pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"Deleting stream {:?} under scope {:?} ",
stream_name, scope_name
);
info!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
Expand Down Expand Up @@ -575,7 +560,7 @@ impl StreamManager {
/// import pravega_client;
/// manager=pravega_client.StreamManager("tcp://127.0.0.1:9090")
/// // Delete a ReaderGroup against an already created Pravega scope..
/// manager.delete_reader_group_with_config("rg1", "scope", rg_config)
/// manager.delete_reader_group("rg1", "scope", rg_config)
///
/// ```
///
Expand Down
10 changes: 2 additions & 8 deletions src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ impl StreamReaderGroup {
///```
///
pub fn create_reader(&self, reader_name: &str) -> PyResult<StreamReader> {
info!(
"Creating reader {:?} under reader group {:?}",
reader_name, self.reader_group.name
);
info!("Creating reader {:?} under reader group {:?}", reader_name, self.reader_group.name);
let reader = self
.runtime_handle
.block_on(self.reader_group.create_reader(reader_name.to_string()));
Expand All @@ -156,10 +153,7 @@ impl StreamReaderGroup {
///```
///
pub fn reader_offline(&self, reader_name: &str) -> PyResult<()> {
info!(
"Marking reader {:?} under reader group {:?} as offline",
reader_name, self.reader_group.name
);
info!("Marking reader {:?} under reader group {:?} as offline", reader_name, self.reader_group.name);
let res = self.runtime_handle.block_on(
self.reader_group
.reader_offline(reader_name.to_string(), None),
Expand Down
5 changes: 1 addition & 4 deletions src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,7 @@ impl StreamWriter {
let res = self.runtime_handle.block_on(x);
// fail fast on error.
if let Err(e) = res {
info!(
"RecvError observed while flushing events on stream {:?}",
self.stream
);
info!("RecvError observed while flushing events on stream {:?}", self.stream);
flush_result = Err(exceptions::PyValueError::new_err(format!(
"RecvError observed while writing an event {:?}",
e
Expand Down
5 changes: 1 addition & 4 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ impl StreamTransaction {
#[pyo3(text_signature = "($self, event, routing_key=None)")]
#[args(event, routing_key = "None", "*")]
pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option<String>) -> PyResult<()> {
trace!(
"Writing a single event to a transaction {:?}",
self.txn.txn_id()
);
trace!("Writing a single event to a transaction {:?}", self.txn.txn_id());
// to_vec creates an owned copy of the python byte array object.
let result: Result<(), TransactionError> = self
.runtime_handle
Expand Down
98 changes: 90 additions & 8 deletions tests/pravega_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def test_tags(self):

# create a stream with stream scaling is enabled with data rate as 10kbps, scaling factor as 2 and initial segments as 1
policy = StreamScalingPolicy.auto_scaling_policy_by_data_rate(10, 2, 1)
stream_result=stream_manager.create_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy)
retention = StreamRetentionPolicy.none()
stream_result=stream_manager.create_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy, retention_policy=retention)
self.assertTrue(stream_result, "Stream creation status")
# add tags
stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy, tags=['t1', 't2'])
Expand Down Expand Up @@ -78,6 +79,58 @@ def test_writeEvent(self):
w1.write_event("test event1")
w1.write_event("test event2")

def test_deleteScope(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager=pravega_client.StreamManager("tcp://127.0.0.1:9090", False, False)

print("Creating a scope")
scope_result=stream_manager.create_scope(scope)
self.assertEqual(True, scope_result, "Scope creation status")

print("Deleting the scope")
delete_scope_result=stream_manager.delete_scope(scope)
self.assertEqual(True, delete_scope_result, "Scope deletion status")

def test_seal_deleteStream(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager=pravega_client.StreamManager("tcp://127.0.0.1:9090", False, False)
print(repr(stream_manager))
print("Creating a scope")
scope_result=stream_manager.create_scope(scope)
self.assertEqual(True, scope_result, "Scope creation status")

print("Creating a stream")
stream_result=stream_manager.create_stream(scope, "testSealStream", 1)
self.assertEqual(True, stream_result, "Stream creation status")

policy = StreamScalingPolicy.auto_scaling_policy_by_event_rate(10, 2, 1)
retention = StreamRetentionPolicy.by_time(24*60*60*1000)
stream_update=stream_manager.update_stream_with_policy(scope, "testSealStream", policy, retention)
self.assertTrue(stream_update, "Stream update status")

print("Creating a writer for Stream")
w1=stream_manager.create_writer(scope,"testSealStream")

print("Write events")
w1.write_event("test event1", "key")
w1.write_event("test event2", "key")

seal_stream_status = stream_manager.seal_stream(scope, "testSealStream")
self.assertTrue(seal_stream_status, "Stream seal status")

try:
w1.write_event("test event3", "key")
self.fail("Writing to an already sealed stream should throw exception")
except Exception as e:
print("Exception ", e)

delete_stream_status = stream_manager.delete_stream(scope, "testSealStream")
self.assertTrue(delete_stream_status, "Stream deletion status")

def test_byteStream(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
Expand All @@ -95,23 +148,48 @@ def test_byteStream(self):
# write and read data.
print("Creating a writer for Stream")
bs=stream_manager.create_byte_stream(scope,"testStream")
self.assertEqual(5, bs.write(b"bytes"))
print(repr(bs))
self.assertEqual(15, bs.write(b"bytesfortesting"))
self.assertEqual(15, bs.write(b"bytesfortesting"))
self.assertEqual(15, bs.write(b"bytesfortesting"))
bs.flush()
self.assertEqual(5, bs.current_tail_offset())
buf=bytearray(5)
self.assertEqual(5, bs.readinto(buf))
self.assertEqual(45, bs.current_tail_offset())
buf=bytearray(10)
self.assertEqual(10, bs.readinto(buf))

# fetch the current read offset.
current_offset=bs.tell()
self.assertEqual(5, current_offset)
self.assertEqual(10, current_offset)
self.assertTrue(bs.seekable())

# seek to a given offset and read
bs.seek(3, 0)
bs.seek(10, 0)
buf=bytearray(2)
self.assertEqual(2, bs.readinto(buf))
current_offset=bs.tell()
self.assertEqual(12, current_offset)

# seek from {current position (i.e. 12 here) + 5 } which is 17 and read
bs.seek(5, 1)
buf=bytearray(8)
# reading 8 bytes here
self.assertEqual(8, bs.readinto(buf))
self.assertEqual(25, bs.tell())

# seek from {size of this object (i.e. 45 here) - 10 } which is 35 and read
bs.seek(-10, 2)
buf=bytearray(8)
# reading 8 bytes here
self.assertEqual(8, bs.readinto(buf))
self.assertEqual(43, bs.tell())

bs.truncate(2)
self.assertEqual(2, bs.current_head_offset())
try:
bs.seek(3, 6)
self.fail("whence should be one of 0: seek from start, 1: seek from current, or 2: seek from end")
except Exception as e:
print("Exception ", e)

def test_writeTxn(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
Expand All @@ -129,14 +207,18 @@ def test_writeTxn(self):

print("Creating a txn writer for Stream")
w1=stream_manager.create_transaction_writer(scope,"testTxn", 1)
print(repr(w1))
txn1 = w1.begin_txn()
print(repr(txn1))
txn1_id = txn1.get_txn_id()
print("Write events")
txn1.write_event("test event1")
txn1.write_event("test event2")
self.assertTrue(txn1.is_open(), "Transaction is open")
print("commit transaction")
txn1.commit()
self.assertEqual(False, txn1.is_open(), "Transaction is closed")
get_txn = w1.get_txn(txn1_id)
self.assertEqual(False, get_txn.is_open(), "Transaction is closed")

txn2 = w1.begin_txn()
print("Write events")
Expand Down
Loading

0 comments on commit f0446dd

Please sign in to comment.