Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions aerospike-core/src/query/recordset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ pub struct Recordset {
active: AtomicBool,
task_id: AtomicUsize,
pub(crate) tracker: Arc<Mutex<PartitionTracker>>,
stream_count: AtomicUsize,
}

/// A stream over incoming records for a [`Recordset`] that can be iterated over either synchronously or asynchronously.
pub struct RecordStream(Arc<Recordset>);

impl Drop for Recordset {
fn drop(&mut self) {
// close the recordset to finish all the commands sending data
Expand All @@ -67,6 +71,7 @@ impl Recordset {
active: AtomicBool::new(true),
task_id: AtomicUsize::new(task_id),
tracker: tracker,
stream_count: AtomicUsize::new(0),
}
}

Expand Down Expand Up @@ -133,17 +138,34 @@ impl Recordset {
}
None
}

/// Converts a reference to a [`Recordset`] into a [`RecordStream`] that can be used
/// to iterate over records.
/// Only one stream can exist at a time. If one already exists,
/// this method will return `None`.
pub fn into_stream(self: Arc<Self>) -> Option<RecordStream> {
if self.stream_count.load(Ordering::Relaxed) > 0 {
return None;
}
self.stream_count.fetch_add(1, Ordering::Relaxed);
Some(RecordStream(self))
}

/// Notify the recordset that a stream is closing.
fn close_stream(&self) {
self.stream_count.fetch_sub(1, Ordering::Relaxed);
}
}

impl<'a> Iterator for &'a Recordset {
impl<'a> Iterator for RecordStream {
type Item = Result<Record>;

fn next(&mut self) -> Option<Result<Record>> {
loop {
if self.is_active() || !self.record_queue.is_empty() {
let result = self.record_queue.pop();
if self.0.is_active() || !self.0.record_queue.is_empty() {
let result = self.0.record_queue.pop();
if result.is_some() {
self.record_queue_count.fetch_sub(1, Ordering::Relaxed);
self.0.record_queue_count.fetch_sub(1, Ordering::Relaxed);
return result;
}
// aerospike_rt::task::yield_now().await;
Expand All @@ -154,3 +176,35 @@ impl<'a> Iterator for &'a Recordset {
}
}
}

impl futures::Stream for RecordStream {
type Item = Result<Record>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.0.is_active() || !self.0.record_queue.is_empty() {
if let Some(result) = self.0.record_queue.pop() {
self.0.record_queue_count.fetch_sub(1, Ordering::Relaxed);
return std::task::Poll::Ready(Some(result));
}
cx.waker().wake_by_ref();
std::task::Poll::Pending
} else {
std::task::Poll::Ready(None)
}
}
}

impl AsRef<Recordset> for RecordStream {
fn as_ref(&self) -> &Recordset {
&self.0
}
}

impl Drop for RecordStream {
fn drop(&mut self) {
self.0.close_stream();
}
}