xref: /DragonOS/kernel/src/ipc/pipe.rs (revision 4dd4856f933be0b4624c7f7ffa9e3d0c8c218873)
1 use crate::{
2     filesystem::vfs::{
3         core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
4         FileType, IndexNode, Metadata,
5     },
6     libs::{
7         spinlock::{SpinLock, SpinLockGuard},
8         wait_queue::WaitQueue,
9     },
10     net::event_poll::{EPollEventType, EPollItem, EventPoll},
11     process::ProcessState,
12     sched::SchedMode,
13     time::PosixTimeSpec,
14 };
15 
16 use alloc::{
17     collections::LinkedList,
18     sync::{Arc, Weak},
19     vec::Vec,
20 };
21 use system_error::SystemError;
22 
23 /// 我们设定pipe_buff的总大小为1024字节
24 const PIPE_BUFF_SIZE: usize = 1024;
25 
26 #[derive(Debug, Clone)]
27 pub struct PipeFsPrivateData {
28     mode: FileMode,
29 }
30 
31 impl PipeFsPrivateData {
32     pub fn new(mode: FileMode) -> Self {
33         return PipeFsPrivateData { mode };
34     }
35 
36     pub fn set_mode(&mut self, mode: FileMode) {
37         self.mode = mode;
38     }
39 }
40 
41 /// @brief 管道文件i节点(锁)
42 #[derive(Debug)]
43 pub struct LockedPipeInode {
44     inner: SpinLock<InnerPipeInode>,
45     read_wait_queue: WaitQueue,
46     write_wait_queue: WaitQueue,
47 }
48 
49 /// @brief 管道文件i节点(无锁)
50 #[derive(Debug)]
51 pub struct InnerPipeInode {
52     self_ref: Weak<LockedPipeInode>,
53     /// 管道内可读的数据数
54     valid_cnt: i32,
55     read_pos: i32,
56     write_pos: i32,
57     data: [u8; PIPE_BUFF_SIZE],
58     /// INode 元数据
59     metadata: Metadata,
60     reader: u32,
61     writer: u32,
62     epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
63 }
64 
65 impl InnerPipeInode {
66     pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
67         let mut events = EPollEventType::empty();
68 
69         let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
70             mode
71         } else {
72             return Err(SystemError::EBADFD);
73         };
74 
75         if mode.contains(FileMode::O_RDONLY) {
76             if self.valid_cnt != 0 {
77                 // 有数据可读
78                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM);
79             }
80 
81             // 没有写者
82             if self.writer == 0 {
83                 events.insert(EPollEventType::EPOLLHUP)
84             }
85         }
86 
87         if mode.contains(FileMode::O_WRONLY) {
88             // 管道内数据未满
89             if self.valid_cnt as usize != PIPE_BUFF_SIZE {
90                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM);
91             }
92 
93             // 没有读者
94             if self.reader == 0 {
95                 events.insert(EPollEventType::EPOLLERR);
96             }
97         }
98 
99         Ok(events.bits() as usize)
100     }
101 
102     pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
103         self.epitems.lock().push_back(epitem);
104         Ok(())
105     }
106 
107     fn buf_full(&self) -> bool {
108         return self.valid_cnt as usize == PIPE_BUFF_SIZE;
109     }
110 
111     pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
112         let is_remove = !self
113             .epitems
114             .lock_irqsave()
115             .extract_if(|x| x.epoll().ptr_eq(epoll))
116             .collect::<Vec<_>>()
117             .is_empty();
118 
119         if is_remove {
120             return Ok(());
121         }
122 
123         Err(SystemError::ENOENT)
124     }
125 }
126 
127 impl LockedPipeInode {
128     pub fn new() -> Arc<Self> {
129         let inner = InnerPipeInode {
130             self_ref: Weak::default(),
131             valid_cnt: 0,
132             read_pos: 0,
133             write_pos: 0,
134             data: [0; PIPE_BUFF_SIZE],
135 
136             metadata: Metadata {
137                 dev_id: 0,
138                 inode_id: generate_inode_id(),
139                 size: PIPE_BUFF_SIZE as i64,
140                 blk_size: 0,
141                 blocks: 0,
142                 atime: PosixTimeSpec::default(),
143                 mtime: PosixTimeSpec::default(),
144                 ctime: PosixTimeSpec::default(),
145                 file_type: FileType::Pipe,
146                 mode: ModeType::from_bits_truncate(0o666),
147                 nlinks: 1,
148                 uid: 0,
149                 gid: 0,
150                 raw_dev: Default::default(),
151             },
152             reader: 0,
153             writer: 0,
154             epitems: SpinLock::new(LinkedList::new()),
155         };
156         let result = Arc::new(Self {
157             inner: SpinLock::new(inner),
158             read_wait_queue: WaitQueue::default(),
159             write_wait_queue: WaitQueue::default(),
160         });
161         let mut guard = result.inner.lock();
162         guard.self_ref = Arc::downgrade(&result);
163         // 释放锁
164         drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
165         return result;
166     }
167 
168     pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
169         &self.inner
170     }
171 
172     fn readable(&self) -> bool {
173         let inode = self.inner.lock();
174         return inode.valid_cnt > 0 || inode.writer == 0;
175     }
176 
177     fn writeable(&self) -> bool {
178         let inode = self.inner.lock();
179         return !inode.buf_full() || inode.reader == 0;
180     }
181 }
182 
183 impl IndexNode for LockedPipeInode {
184     fn read_at(
185         &self,
186         _offset: usize,
187         len: usize,
188         buf: &mut [u8],
189         data_guard: SpinLockGuard<FilePrivateData>,
190     ) -> Result<usize, SystemError> {
191         let data = data_guard.clone();
192         drop(data_guard);
193         // 获取mode
194         let mode: FileMode;
195         if let FilePrivateData::Pipefs(pdata) = &data {
196             mode = pdata.mode;
197         } else {
198             return Err(SystemError::EBADF);
199         }
200 
201         if buf.len() < len {
202             return Err(SystemError::EINVAL);
203         }
204         // log::debug!("pipe mode: {:?}", mode);
205         // 加锁
206         let mut inode = self.inner.lock();
207 
208         // 如果管道里面没有数据,则唤醒写端,
209         while inode.valid_cnt == 0 {
210             // 如果当前管道写者数为0,则返回EOF
211             if inode.writer == 0 {
212                 return Ok(0);
213             }
214 
215             self.write_wait_queue
216                 .wakeup(Some(ProcessState::Blocked(true)));
217 
218             // 如果为非阻塞管道,直接返回错误
219             if mode.contains(FileMode::O_NONBLOCK) {
220                 drop(inode);
221                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
222             }
223 
224             // 否则在读等待队列中睡眠,并释放锁
225             drop(inode);
226             let r = wq_wait_event_interruptible!(self.read_wait_queue, self.readable(), {});
227             if r.is_err() {
228                 return Err(SystemError::ERESTARTSYS);
229             }
230 
231             inode = self.inner.lock();
232         }
233 
234         let mut num = inode.valid_cnt as usize;
235         //决定要输出的字节
236         let start = inode.read_pos as usize;
237         //如果读端希望读取的字节数大于有效字节数,则输出有效字节
238         let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE;
239         //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
240         if len < inode.valid_cnt as usize {
241             end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE;
242             num = len;
243         }
244 
245         // 从管道拷贝数据到用户的缓冲区
246 
247         if end < start {
248             buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]);
249             buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]);
250         } else {
251             buf[0..num].copy_from_slice(&inode.data[start..end]);
252         }
253 
254         //更新读位置以及valid_cnt
255         inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
256         inode.valid_cnt -= num as i32;
257 
258         // 读完以后如果未读完,则唤醒下一个读者
259         if inode.valid_cnt > 0 {
260             self.read_wait_queue
261                 .wakeup(Some(ProcessState::Blocked(true)));
262         }
263 
264         //读完后解锁并唤醒等待在写等待队列中的进程
265         self.write_wait_queue
266             .wakeup(Some(ProcessState::Blocked(true)));
267 
268         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
269         // 唤醒epoll中等待的进程
270         EventPoll::wakeup_epoll(&inode.epitems, Some(pollflag))?;
271 
272         //返回读取的字节数
273         return Ok(num);
274     }
275 
276     fn open(
277         &self,
278         mut data: SpinLockGuard<FilePrivateData>,
279         mode: &crate::filesystem::vfs::file::FileMode,
280     ) -> Result<(), SystemError> {
281         let mut guard = self.inner.lock();
282         // 不能以读写方式打开管道
283         if mode.contains(FileMode::O_RDWR) {
284             return Err(SystemError::EACCES);
285         }
286         if mode.contains(FileMode::O_RDONLY) {
287             guard.reader += 1;
288         }
289         if mode.contains(FileMode::O_WRONLY) {
290             guard.writer += 1;
291         }
292 
293         // 设置mode
294         *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode });
295 
296         return Ok(());
297     }
298 
299     fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
300         let inode = self.inner.lock();
301         let mut metadata = inode.metadata.clone();
302         metadata.size = inode.data.len() as i64;
303 
304         return Ok(metadata);
305     }
306 
307     fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> {
308         let mode: FileMode;
309         if let FilePrivateData::Pipefs(pipe_data) = &*data {
310             mode = pipe_data.mode;
311         } else {
312             return Err(SystemError::EBADF);
313         }
314         let mut guard = self.inner.lock();
315 
316         // 写端关闭
317         if mode.contains(FileMode::O_WRONLY) {
318             assert!(guard.writer > 0);
319             guard.writer -= 1;
320             // 如果已经没有写端了,则唤醒读端
321             if guard.writer == 0 {
322                 self.read_wait_queue
323                     .wakeup_all(Some(ProcessState::Blocked(true)));
324             }
325         }
326 
327         // 读端关闭
328         if mode.contains(FileMode::O_RDONLY) {
329             assert!(guard.reader > 0);
330             guard.reader -= 1;
331             // 如果已经没有写端了,则唤醒读端
332             if guard.reader == 0 {
333                 self.write_wait_queue
334                     .wakeup_all(Some(ProcessState::Blocked(true)));
335             }
336         }
337 
338         return Ok(());
339     }
340 
341     fn write_at(
342         &self,
343         _offset: usize,
344         len: usize,
345         buf: &[u8],
346         data: SpinLockGuard<FilePrivateData>,
347     ) -> Result<usize, SystemError> {
348         // 获取mode
349         let mode: FileMode;
350         if let FilePrivateData::Pipefs(pdata) = &*data {
351             mode = pdata.mode;
352         } else {
353             return Err(SystemError::EBADF);
354         }
355 
356         if buf.len() < len || len > PIPE_BUFF_SIZE {
357             return Err(SystemError::EINVAL);
358         }
359         // 加锁
360 
361         let mut inode = self.inner.lock();
362 
363         if inode.reader == 0 {
364             // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号
365         }
366 
367         // 如果管道空间不够
368 
369         while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE {
370             // 唤醒读端
371             self.read_wait_queue
372                 .wakeup(Some(ProcessState::Blocked(true)));
373 
374             // 如果为非阻塞管道,直接返回错误
375             if mode.contains(FileMode::O_NONBLOCK) {
376                 drop(inode);
377                 return Err(SystemError::ENOMEM);
378             }
379 
380             // 解锁并睡眠
381             drop(inode);
382             let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {});
383             if r.is_err() {
384                 return Err(SystemError::ERESTARTSYS);
385             }
386             inode = self.inner.lock();
387         }
388 
389         // 决定要输入的字节
390         let start = inode.write_pos as usize;
391         let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE;
392         // 从用户的缓冲区拷贝数据到管道
393 
394         if end < start {
395             inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
396             inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
397         } else {
398             inode.data[start..end].copy_from_slice(&buf[0..len]);
399         }
400         // 更新写位置以及valid_cnt
401         inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
402         inode.valid_cnt += len as i32;
403 
404         // 写完后还有位置,则唤醒下一个写者
405         if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE {
406             self.write_wait_queue
407                 .wakeup(Some(ProcessState::Blocked(true)));
408         }
409 
410         // 读完后解锁并唤醒等待在读等待队列中的进程
411         self.read_wait_queue
412             .wakeup(Some(ProcessState::Blocked(true)));
413 
414         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
415         // 唤醒epoll中等待的进程
416         EventPoll::wakeup_epoll(&inode.epitems, Some(pollflag))?;
417 
418         // 返回写入的字节数
419         return Ok(len);
420     }
421 
422     fn as_any_ref(&self) -> &dyn core::any::Any {
423         self
424     }
425 
426     fn get_entry_name_and_metadata(
427         &self,
428         ino: crate::filesystem::vfs::InodeId,
429     ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
430         // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
431         let name = self.get_entry_name(ino)?;
432         let entry = self.find(&name)?;
433         return Ok((name, entry.metadata()?));
434     }
435 
436     fn fs(&self) -> Arc<(dyn FileSystem)> {
437         todo!()
438     }
439 
440     fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
441         return Err(SystemError::ENOSYS);
442     }
443 
444     fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
445         return self.inner.lock().poll(private_data);
446     }
447 }
448