combine/stream/
buffered.rs1use alloc::collections::VecDeque;
2
3use crate::{
4    error::StreamError,
5    stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
6};
7
8#[derive(Debug, PartialEq)]
28pub struct Stream<Input>
29where
30    Input: StreamOnce + Positioned,
31{
32    offset: usize,
33    iter: Input,
34    buffer_offset: usize,
35    buffer: VecDeque<(Input::Token, Input::Position)>,
36}
37
38impl<Input> ResetStream for Stream<Input>
39where
40    Input: Positioned,
41{
42    type Checkpoint = usize;
43
44    fn checkpoint(&self) -> Self::Checkpoint {
45        self.offset
46    }
47
48    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
49        if checkpoint < self.buffer_offset - self.buffer.len() {
50            Err(Self::Error::from_error(
52                self.position(),
53                StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
54            ))
55        } else {
56            self.offset = checkpoint;
57            Ok(())
58        }
59    }
60}
61
62impl<Input> Stream<Input>
63where
64    Input: StreamOnce + Positioned,
65    Input::Position: Clone,
66    Input::Token: Clone,
67{
68    pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
71        Stream {
72            offset: 0,
73            iter,
74            buffer_offset: 0,
75            buffer: VecDeque::with_capacity(lookahead),
76        }
77    }
78}
79
80impl<Input> Positioned for Stream<Input>
81where
82    Input: StreamOnce + Positioned,
83{
84    #[inline]
85    fn position(&self) -> Self::Position {
86        if self.offset >= self.buffer_offset {
87            self.iter.position()
88        } else if self.offset < self.buffer_offset - self.buffer.len() {
89            self.buffer
90                .front()
91                .expect("At least 1 element in the buffer")
92                .1
93                .clone()
94        } else {
95            self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
96                .1
97                .clone()
98        }
99    }
100}
101
102impl<Input> StreamOnce for Stream<Input>
103where
104    Input: StreamOnce + Positioned,
105    Input::Token: Clone,
106{
107    type Token = Input::Token;
108    type Range = Input::Range;
109    type Position = Input::Position;
110    type Error = Input::Error;
111
112    #[inline]
113    fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
114        if self.offset >= self.buffer_offset {
115            let position = self.iter.position();
116            let token = self.iter.uncons()?;
117            self.buffer_offset += 1;
118            if self.buffer.len() == self.buffer.capacity() {
121                self.buffer.pop_front();
122            }
123            self.buffer.push_back((token.clone(), position));
124            self.offset += 1;
125            Ok(token)
126        } else if self.offset < self.buffer_offset - self.buffer.len() {
127            Err(StreamError::message_static_message("Backtracked to far"))
129        } else {
130            let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
131                .0
132                .clone();
133            self.offset += 1;
134            Ok(value)
135        }
136    }
137
138    fn is_partial(&self) -> bool {
139        self.iter.is_partial()
140    }
141}