xref: /DragonOS/kernel/src/ipc/pipe.rs (revision 701589559f912deb03eb5176d049d9d07fb29447)
1 use crate::{
2     arch::{sched::sched, CurrentIrqArch},
3     exception::InterruptArch,
4     filesystem::vfs::{
5         core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
6         FileType, IndexNode, Metadata,
7     },
8     libs::{spinlock::SpinLock, wait_queue::WaitQueue},
9     net::event_poll::{EPollEventType, EPollItem, EventPoll},
10     process::ProcessState,
11     time::TimeSpec,
12 };
13 
14 use alloc::{
15     collections::LinkedList,
16     sync::{Arc, Weak},
17 };
18 use system_error::SystemError;
19 
20 /// 我们设定pipe_buff的总大小为1024字节
21 const PIPE_BUFF_SIZE: usize = 1024;
22 
23 #[derive(Debug, Clone)]
24 pub struct PipeFsPrivateData {
25     mode: FileMode,
26 }
27 
28 impl PipeFsPrivateData {
29     pub fn new(mode: FileMode) -> Self {
30         return PipeFsPrivateData { mode };
31     }
32 
33     pub fn set_mode(&mut self, mode: FileMode) {
34         self.mode = mode;
35     }
36 }
37 
38 /// @brief 管道文件i节点(锁)
39 #[derive(Debug)]
40 pub struct LockedPipeInode(SpinLock<InnerPipeInode>);
41 
42 /// @brief 管道文件i节点(无锁)
43 #[derive(Debug)]
44 pub struct InnerPipeInode {
45     self_ref: Weak<LockedPipeInode>,
46     /// 管道内可读的数据数
47     valid_cnt: i32,
48     read_pos: i32,
49     write_pos: i32,
50     read_wait_queue: WaitQueue,
51     write_wait_queue: WaitQueue,
52     data: [u8; PIPE_BUFF_SIZE],
53     /// INode 元数据
54     metadata: Metadata,
55     reader: u32,
56     writer: u32,
57     epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
58 }
59 
60 impl InnerPipeInode {
61     pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
62         let mut events = EPollEventType::empty();
63 
64         let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
65             mode
66         } else {
67             return Err(SystemError::EBADFD);
68         };
69 
70         if mode.contains(FileMode::O_RDONLY) {
71             if self.valid_cnt != 0 {
72                 // 有数据可读
73                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM);
74             }
75 
76             // 没有写者
77             if self.writer == 0 {
78                 events.insert(EPollEventType::EPOLLHUP)
79             }
80         }
81 
82         if mode.contains(FileMode::O_WRONLY) {
83             // 管道内数据未满
84             if self.valid_cnt as usize != PIPE_BUFF_SIZE {
85                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM);
86             }
87 
88             // 没有读者
89             if self.reader == 0 {
90                 events.insert(EPollEventType::EPOLLERR);
91             }
92         }
93 
94         Ok(events.bits() as usize)
95     }
96 
97     pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
98         self.epitems.lock().push_back(epitem);
99         Ok(())
100     }
101 }
102 
103 impl LockedPipeInode {
104     pub fn new() -> Arc<Self> {
105         let inner = InnerPipeInode {
106             self_ref: Weak::default(),
107             valid_cnt: 0,
108             read_pos: 0,
109             write_pos: 0,
110             read_wait_queue: WaitQueue::INIT,
111             write_wait_queue: WaitQueue::INIT,
112             data: [0; PIPE_BUFF_SIZE],
113 
114             metadata: Metadata {
115                 dev_id: 0,
116                 inode_id: generate_inode_id(),
117                 size: PIPE_BUFF_SIZE as i64,
118                 blk_size: 0,
119                 blocks: 0,
120                 atime: TimeSpec::default(),
121                 mtime: TimeSpec::default(),
122                 ctime: TimeSpec::default(),
123                 file_type: FileType::Pipe,
124                 mode: ModeType::from_bits_truncate(0o666),
125                 nlinks: 1,
126                 uid: 0,
127                 gid: 0,
128                 raw_dev: Default::default(),
129             },
130             reader: 0,
131             writer: 0,
132             epitems: SpinLock::new(LinkedList::new()),
133         };
134         let result = Arc::new(Self(SpinLock::new(inner)));
135         let mut guard = result.0.lock();
136         guard.self_ref = Arc::downgrade(&result);
137         // 释放锁
138         drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
139         return result;
140     }
141 
142     pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
143         &self.0
144     }
145 }
146 
147 impl IndexNode for LockedPipeInode {
148     fn read_at(
149         &self,
150         _offset: usize,
151         len: usize,
152         buf: &mut [u8],
153         data: &mut FilePrivateData,
154     ) -> Result<usize, SystemError> {
155         // 获取mode
156         let mode: FileMode;
157         if let FilePrivateData::Pipefs(pdata) = data {
158             mode = pdata.mode;
159         } else {
160             return Err(SystemError::EBADF);
161         }
162 
163         if buf.len() < len {
164             return Err(SystemError::EINVAL);
165         }
166         // 加锁
167         let mut inode = self.0.lock();
168 
169         // 如果管道里面没有数据,则唤醒写端,
170         while inode.valid_cnt == 0 {
171             // 如果当前管道写者数为0,则返回EOF
172             if inode.writer == 0 {
173                 return Ok(0);
174             }
175 
176             inode
177                 .write_wait_queue
178                 .wakeup(Some(ProcessState::Blocked(true)));
179 
180             // 如果为非阻塞管道,直接返回错误
181             if mode.contains(FileMode::O_NONBLOCK) {
182                 drop(inode);
183                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
184             }
185 
186             // 否则在读等待队列中睡眠,并释放锁
187             unsafe {
188                 let irq_guard = CurrentIrqArch::save_and_disable_irq();
189 
190                 inode.read_wait_queue.sleep_without_schedule();
191                 drop(inode);
192 
193                 drop(irq_guard);
194             }
195             sched();
196             inode = self.0.lock();
197         }
198 
199         let mut num = inode.valid_cnt as usize;
200         //决定要输出的字节
201         let start = inode.read_pos as usize;
202         //如果读端希望读取的字节数大于有效字节数,则输出有效字节
203         let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE;
204         //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
205         if len < inode.valid_cnt as usize {
206             end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE;
207             num = len;
208         }
209 
210         // 从管道拷贝数据到用户的缓冲区
211 
212         if end < start {
213             buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]);
214             buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]);
215         } else {
216             buf[0..num].copy_from_slice(&inode.data[start..end]);
217         }
218 
219         //更新读位置以及valid_cnt
220         inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
221         inode.valid_cnt -= num as i32;
222 
223         // 读完以后如果未读完,则唤醒下一个读者
224         if inode.valid_cnt > 0 {
225             inode
226                 .read_wait_queue
227                 .wakeup(Some(ProcessState::Blocked(true)));
228         }
229 
230         //读完后解锁并唤醒等待在写等待队列中的进程
231         inode
232             .write_wait_queue
233             .wakeup(Some(ProcessState::Blocked(true)));
234 
235         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
236         // 唤醒epoll中等待的进程
237         EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?;
238 
239         //返回读取的字节数
240         return Ok(num);
241     }
242 
243     fn open(
244         &self,
245         data: &mut FilePrivateData,
246         mode: &crate::filesystem::vfs::file::FileMode,
247     ) -> Result<(), SystemError> {
248         let mut guard = self.0.lock();
249         // 不能以读写方式打开管道
250         if mode.contains(FileMode::O_RDWR) {
251             return Err(SystemError::EACCES);
252         }
253         if mode.contains(FileMode::O_RDONLY) {
254             guard.reader += 1;
255         }
256         if mode.contains(FileMode::O_WRONLY) {
257             guard.writer += 1;
258         }
259 
260         // 设置mode
261         *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode });
262 
263         return Ok(());
264     }
265 
266     fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
267         let inode = self.0.lock();
268         let mut metadata = inode.metadata.clone();
269         metadata.size = inode.data.len() as i64;
270 
271         return Ok(metadata);
272     }
273 
274     fn close(&self, data: &mut FilePrivateData) -> Result<(), SystemError> {
275         let mode: FileMode;
276         if let FilePrivateData::Pipefs(pipe_data) = data {
277             mode = pipe_data.mode;
278         } else {
279             return Err(SystemError::EBADF);
280         }
281         let mut guard = self.0.lock();
282 
283         // 写端关闭
284         if mode.contains(FileMode::O_WRONLY) {
285             assert!(guard.writer > 0);
286             guard.writer -= 1;
287             // 如果已经没有写端了,则唤醒读端
288             if guard.writer == 0 {
289                 guard
290                     .read_wait_queue
291                     .wakeup_all(Some(ProcessState::Blocked(true)));
292             }
293         }
294 
295         // 读端关闭
296         if mode.contains(FileMode::O_RDONLY) {
297             assert!(guard.reader > 0);
298             guard.reader -= 1;
299             // 如果已经没有写端了,则唤醒读端
300             if guard.reader == 0 {
301                 guard
302                     .write_wait_queue
303                     .wakeup_all(Some(ProcessState::Blocked(true)));
304             }
305         }
306 
307         return Ok(());
308     }
309 
310     fn write_at(
311         &self,
312         _offset: usize,
313         len: usize,
314         buf: &[u8],
315         data: &mut FilePrivateData,
316     ) -> Result<usize, SystemError> {
317         // 获取mode
318         let mode: FileMode;
319         if let FilePrivateData::Pipefs(pdata) = data {
320             mode = pdata.mode;
321         } else {
322             return Err(SystemError::EBADF);
323         }
324 
325         if buf.len() < len || len > PIPE_BUFF_SIZE {
326             return Err(SystemError::EINVAL);
327         }
328         // 加锁
329 
330         let mut inode = self.0.lock();
331 
332         // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号
333         if inode.reader == 0 {}
334 
335         // 如果管道空间不够
336 
337         while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE {
338             // 唤醒读端
339             inode
340                 .read_wait_queue
341                 .wakeup(Some(ProcessState::Blocked(true)));
342 
343             // 如果为非阻塞管道,直接返回错误
344             if mode.contains(FileMode::O_NONBLOCK) {
345                 drop(inode);
346                 return Err(SystemError::ENOMEM);
347             }
348 
349             // 解锁并睡眠
350             unsafe {
351                 let irq_guard = CurrentIrqArch::save_and_disable_irq();
352                 inode.write_wait_queue.sleep_without_schedule();
353                 drop(inode);
354                 drop(irq_guard);
355             }
356             sched();
357             inode = self.0.lock();
358         }
359 
360         // 决定要输入的字节
361         let start = inode.write_pos as usize;
362         let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE;
363         // 从用户的缓冲区拷贝数据到管道
364 
365         if end < start {
366             inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
367             inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
368         } else {
369             inode.data[start..end].copy_from_slice(&buf[0..len]);
370         }
371         // 更新写位置以及valid_cnt
372         inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
373         inode.valid_cnt += len as i32;
374 
375         // 写完后还有位置,则唤醒下一个写者
376         if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE {
377             inode
378                 .write_wait_queue
379                 .wakeup(Some(ProcessState::Blocked(true)));
380         }
381 
382         // 读完后解锁并唤醒等待在读等待队列中的进程
383         inode
384             .read_wait_queue
385             .wakeup(Some(ProcessState::Blocked(true)));
386 
387         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
388         // 唤醒epoll中等待的进程
389         EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?;
390 
391         // 返回写入的字节数
392         return Ok(len);
393     }
394 
395     fn as_any_ref(&self) -> &dyn core::any::Any {
396         self
397     }
398 
399     fn get_entry_name_and_metadata(
400         &self,
401         ino: crate::filesystem::vfs::InodeId,
402     ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
403         // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
404         let name = self.get_entry_name(ino)?;
405         let entry = self.find(&name)?;
406         return Ok((name, entry.metadata()?));
407     }
408 
409     fn fs(&self) -> Arc<(dyn FileSystem)> {
410         todo!()
411     }
412 
413     fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
414         return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP);
415     }
416 
417     fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
418         return self.0.lock().poll(private_data);
419     }
420 }
421