Skip to content

Commit cdff285

Browse files
committed
Implement ability to read data directly from the underlying reader
1 parent 10ddcb7 commit cdff285

File tree

5 files changed

+296
-4
lines changed

5 files changed

+296
-4
lines changed

Changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515

1616
### New Features
1717

18+
- [#623]: Added `Reader::stream()` that can be used to read arbitrary data
19+
from the inner reader while track position for XML reader.
20+
1821
### Bug Fixes
1922

2023
### Misc Changes
2124

25+
[#623]: https://github.com/tafia/quick-xml/issues/623
26+
2227

2328
## 0.36.0 -- 2024-07-08
2429

src/reader/async_tokio.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
//! as underlying byte stream. This reader fully implements async/await so reading
33
//! can use non-blocking I/O.
44
5-
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
69

710
use crate::errors::{Error, Result, SyntaxError};
811
use crate::events::Event;
912
use crate::name::{QName, ResolveResult};
1013
use crate::parser::{ElementParser, Parser, PiParser};
1114
use crate::reader::buffered_reader::impl_buffered_source;
12-
use crate::reader::{BangType, NsReader, ParseState, ReadTextResult, Reader, Span};
15+
use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
1316
use crate::utils::is_whitespace;
1417

1518
/// A struct for read XML asynchronously from an [`AsyncBufRead`].
@@ -24,6 +27,47 @@ impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
2427

2528
////////////////////////////////////////////////////////////////////////////////////////////////////
2629

30+
impl<'r, R> AsyncRead for BinaryStream<'r, R>
31+
where
32+
R: AsyncRead + Unpin,
33+
{
34+
fn poll_read(
35+
self: Pin<&mut Self>,
36+
cx: &mut Context<'_>,
37+
buf: &mut ReadBuf<'_>,
38+
) -> Poll<io::Result<()>> {
39+
let start = buf.remaining();
40+
let this = self.get_mut();
41+
let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
42+
43+
// If something was read, update offset
44+
if let Poll::Ready(Ok(_)) = poll {
45+
let amt = start - buf.remaining();
46+
*this.offset += amt as u64;
47+
}
48+
poll
49+
}
50+
}
51+
52+
impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
53+
where
54+
R: AsyncBufRead + Unpin,
55+
{
56+
#[inline]
57+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
58+
Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
59+
}
60+
61+
#[inline]
62+
fn consume(self: Pin<&mut Self>, amt: usize) {
63+
let this = self.get_mut();
64+
this.inner.consume(amt);
65+
*this.offset += amt as u64;
66+
}
67+
}
68+
69+
////////////////////////////////////////////////////////////////////////////////////////////////////
70+
2771
impl<R: AsyncBufRead + Unpin> Reader<R> {
2872
/// An asynchronous version of [`read_event_into()`]. Reads the next event into
2973
/// given buffer.

src/reader/mod.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,65 @@ impl EncodingRef {
567567

568568
////////////////////////////////////////////////////////////////////////////////////////////////////
569569

570+
/// A direct stream to the underlying [`Reader`]s reader which updates
571+
/// [`Reader::buffer_position()`] when read from it.
572+
#[derive(Debug)]
573+
#[must_use = "streams do nothing unless read or polled"]
574+
pub struct BinaryStream<'r, R> {
575+
inner: &'r mut R,
576+
offset: &'r mut u64,
577+
}
578+
579+
impl<'r, R> BinaryStream<'r, R> {
580+
/// Returns current position in bytes in the original source.
581+
#[inline]
582+
pub const fn offset(&self) -> u64 {
583+
*self.offset
584+
}
585+
586+
/// Gets a reference to the underlying reader.
587+
#[inline]
588+
pub const fn get_ref(&self) -> &R {
589+
self.inner
590+
}
591+
592+
/// Gets a mutable reference to the underlying reader.
593+
#[inline]
594+
pub fn get_mut(&mut self) -> &mut R {
595+
self.inner
596+
}
597+
}
598+
599+
impl<'r, R> io::Read for BinaryStream<'r, R>
600+
where
601+
R: io::Read,
602+
{
603+
#[inline]
604+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
605+
let amt = self.inner.read(buf)?;
606+
*self.offset += amt as u64;
607+
Ok(amt)
608+
}
609+
}
610+
611+
impl<'r, R> io::BufRead for BinaryStream<'r, R>
612+
where
613+
R: io::BufRead,
614+
{
615+
#[inline]
616+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
617+
self.inner.fill_buf()
618+
}
619+
620+
#[inline]
621+
fn consume(&mut self, amt: usize) {
622+
self.inner.consume(amt);
623+
*self.offset += amt as u64;
624+
}
625+
}
626+
627+
////////////////////////////////////////////////////////////////////////////////////////////////////
628+
570629
/// A low level encoding-agnostic XML event reader.
571630
///
572631
/// Consumes bytes and streams XML [`Event`]s.
@@ -759,6 +818,60 @@ impl<R> Reader<R> {
759818
pub const fn decoder(&self) -> Decoder {
760819
self.state.decoder()
761820
}
821+
822+
/// Get the direct access to the underlying reader, but tracks the amount of
823+
/// read data and update [`Reader::buffer_position()`] accordingly.
824+
///
825+
/// # Example
826+
///
827+
/// This example demonstrates, how it is possible to read embedded binary data.
828+
/// Such XML documents are exist in the wild.
829+
///
830+
/// ```
831+
/// # use pretty_assertions::assert_eq;
832+
/// use std::io::{BufRead, Read};
833+
/// use quick_xml::events::{BytesEnd, BytesStart, Event};
834+
/// use quick_xml::reader::Reader;
835+
///
836+
/// let mut reader = Reader::from_str("<tag>binary << data&></tag>");
837+
/// // ^ ^ ^ ^
838+
/// // 0 5 21 27
839+
///
840+
/// assert_eq!(
841+
/// (reader.read_event().unwrap(), reader.buffer_position()),
842+
/// // 5 - end of the `<tag>`
843+
/// (Event::Start(BytesStart::new("tag")), 5)
844+
/// );
845+
///
846+
/// // Reading directly from underlying reader will not update position
847+
/// // let mut inner = reader.get_mut();
848+
///
849+
/// // Reading from the stream() advances position
850+
/// let mut inner = reader.stream();
851+
///
852+
/// // Read binary data. We somehow should known its size
853+
/// let mut binary = [0u8; 16];
854+
/// inner.read_exact(&mut binary).unwrap();
855+
/// assert_eq!(&binary, b"binary << data&>");
856+
/// // 21 - end of the `binary << data&>`
857+
/// assert_eq!(inner.offset(), 21);
858+
/// assert_eq!(reader.buffer_position(), 21);
859+
///
860+
/// assert_eq!(
861+
/// (reader.read_event().unwrap(), reader.buffer_position()),
862+
/// // 27 - end of the `</tag>`
863+
/// (Event::End(BytesEnd::new("tag")), 27)
864+
/// );
865+
///
866+
/// assert_eq!(reader.read_event().unwrap(), Event::Eof);
867+
/// ```
868+
#[inline]
869+
pub fn stream(&mut self) -> BinaryStream<R> {
870+
BinaryStream {
871+
inner: &mut self.reader,
872+
offset: &mut self.state.offset,
873+
}
874+
}
762875
}
763876

764877
/// Private sync reading methods

tests/async-tokio.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use std::io::Cursor;
12
use std::iter;
23

34
use pretty_assertions::assert_eq;
45
use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event::*};
56
use quick_xml::name::QName;
67
use quick_xml::reader::Reader;
7-
use tokio::io::BufReader;
8+
use quick_xml::utils::Bytes;
9+
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
810

911
// Import `small_buffers_tests!`
1012
#[macro_use]
@@ -88,6 +90,48 @@ mod read_to_end {
8890
}
8991
}
9092

93+
#[tokio::test]
94+
async fn issue623() {
95+
let mut buf = Vec::new();
96+
let mut reader = Reader::from_reader(Cursor::new(
97+
b"
98+
<AppendedData>
99+
_binary << data&>
100+
</AppendedData>
101+
",
102+
));
103+
reader.config_mut().trim_text(true);
104+
105+
assert_eq!(
106+
(
107+
reader.read_event_into_async(&mut buf).await.unwrap(),
108+
reader.buffer_position()
109+
),
110+
(Start(BytesStart::new("AppendedData")), 23)
111+
);
112+
113+
let mut inner = reader.stream();
114+
// Read to start of data marker
115+
inner.read_until(b'_', &mut buf).await.unwrap();
116+
117+
// Read binary data. We somehow should known its size
118+
let mut binary = [0u8; 16];
119+
inner.read_exact(&mut binary).await.unwrap();
120+
assert_eq!(Bytes(&binary), Bytes(b"binary << data&>"));
121+
assert_eq!(inner.offset(), 53);
122+
assert_eq!(reader.buffer_position(), 53);
123+
124+
assert_eq!(
125+
(
126+
reader.read_event_into_async(&mut buf).await.unwrap(),
127+
reader.buffer_position()
128+
),
129+
(End(BytesEnd::new("AppendedData")), 77)
130+
);
131+
132+
assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Eof);
133+
}
134+
91135
/// Regression test for https://github.com/tafia/quick-xml/issues/751
92136
///
93137
/// Actually, that error was not found in async reader, but we would to test it as well.

tests/issues.rs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
//!
33
//! Name each module / test as `issue<GH number>` and keep sorted by issue number
44
5-
use std::io::BufReader;
5+
use std::io::{BufRead, BufReader, Cursor, Read};
66
use std::iter;
77
use std::sync::mpsc;
88

99
use quick_xml::errors::{Error, IllFormedError, SyntaxError};
1010
use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event};
1111
use quick_xml::name::QName;
1212
use quick_xml::reader::Reader;
13+
use quick_xml::utils::Bytes;
14+
15+
use pretty_assertions::assert_eq;
1316

1417
/// Regression test for https://github.com/tafia/quick-xml/issues/94
1518
#[test]
@@ -258,6 +261,89 @@ fn issue622() {
258261
}
259262
}
260263

264+
/// Regression test for https://github.com/tafia/quick-xml/issues/623
265+
mod issue623 {
266+
use super::*;
267+
use pretty_assertions::assert_eq;
268+
269+
#[test]
270+
fn borrowed() {
271+
let mut reader = Reader::from_str(
272+
"
273+
<AppendedData>
274+
_binary << data&>
275+
</AppendedData>
276+
",
277+
);
278+
reader.config_mut().trim_text(true);
279+
280+
assert_eq!(
281+
(reader.read_event().unwrap(), reader.buffer_position()),
282+
(Event::Start(BytesStart::new("AppendedData")), 27)
283+
);
284+
285+
let mut inner = reader.stream();
286+
// Read to start of data marker
287+
inner.read_until(b'_', &mut Vec::new()).unwrap();
288+
289+
// Read binary data. We somehow should known its size
290+
let mut binary = [0u8; 16];
291+
inner.read_exact(&mut binary).unwrap();
292+
assert_eq!(Bytes(&binary), Bytes(b"binary << data&>"));
293+
assert_eq!(inner.offset(), 61);
294+
assert_eq!(reader.buffer_position(), 61);
295+
296+
assert_eq!(
297+
(reader.read_event().unwrap(), reader.buffer_position()),
298+
(Event::End(BytesEnd::new("AppendedData")), 89)
299+
);
300+
301+
assert_eq!(reader.read_event().unwrap(), Event::Eof);
302+
}
303+
304+
#[test]
305+
fn buffered() {
306+
let mut buf = Vec::new();
307+
let mut reader = Reader::from_reader(Cursor::new(
308+
b"
309+
<AppendedData>
310+
_binary << data&>
311+
</AppendedData>
312+
",
313+
));
314+
reader.config_mut().trim_text(true);
315+
316+
assert_eq!(
317+
(
318+
reader.read_event_into(&mut buf).unwrap(),
319+
reader.buffer_position()
320+
),
321+
(Event::Start(BytesStart::new("AppendedData")), 27)
322+
);
323+
324+
let mut inner = reader.stream();
325+
// Read to start of data marker
326+
inner.read_until(b'_', &mut buf).unwrap();
327+
328+
// Read binary data. We somehow should known its size
329+
let mut binary = [0u8; 16];
330+
inner.read_exact(&mut binary).unwrap();
331+
assert_eq!(Bytes(&binary), Bytes(b"binary << data&>"));
332+
assert_eq!(inner.offset(), 61);
333+
assert_eq!(reader.buffer_position(), 61);
334+
335+
assert_eq!(
336+
(
337+
reader.read_event_into(&mut buf).unwrap(),
338+
reader.buffer_position()
339+
),
340+
(Event::End(BytesEnd::new("AppendedData")), 89)
341+
);
342+
343+
assert_eq!(reader.read_event_into(&mut buf).unwrap(), Event::Eof);
344+
}
345+
}
346+
261347
/// Regression test for https://github.com/tafia/quick-xml/issues/706
262348
#[test]
263349
fn issue706() {

0 commit comments

Comments
 (0)