xref: /DragonOS/kernel/src/ipc/pipe.rs (revision 86ee1395de7c614865236ee15071c3603b794e44)
1 use crate::{
2     arch::CurrentIrqArch,
3     exception::InterruptArch,
4     filesystem::vfs::{
5         core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
6         FileType, IndexNode, Metadata,
7     },
8     libs::{
9         spinlock::{SpinLock, SpinLockGuard},
10         wait_queue::WaitQueue,
11     },
12     net::event_poll::{EPollEventType, EPollItem, EventPoll},
13     process::ProcessState,
14     sched::{schedule, SchedMode},
15     time::PosixTimeSpec,
16 };
17 
18 use alloc::{
19     collections::LinkedList,
20     sync::{Arc, Weak},
21 };
22 use system_error::SystemError;
23 
24 /// 我们设定pipe_buff的总大小为1024字节
25 const PIPE_BUFF_SIZE: usize = 1024;
26 
27 #[derive(Debug, Clone)]
28 pub struct PipeFsPrivateData {
29     mode: FileMode,
30 }
31 
32 impl PipeFsPrivateData {
33     pub fn new(mode: FileMode) -> Self {
34         return PipeFsPrivateData { mode };
35     }
36 
37     pub fn set_mode(&mut self, mode: FileMode) {
38         self.mode = mode;
39     }
40 }
41 
42 /// @brief 管道文件i节点(锁)
43 #[derive(Debug)]
44 pub struct LockedPipeInode {
45     inner: SpinLock<InnerPipeInode>,
46     read_wait_queue: WaitQueue,
47     write_wait_queue: WaitQueue,
48 }
49 
50 /// @brief 管道文件i节点(无锁)
51 #[derive(Debug)]
52 pub struct InnerPipeInode {
53     self_ref: Weak<LockedPipeInode>,
54     /// 管道内可读的数据数
55     valid_cnt: i32,
56     read_pos: i32,
57     write_pos: i32,
58     data: [u8; PIPE_BUFF_SIZE],
59     /// INode 元数据
60     metadata: Metadata,
61     reader: u32,
62     writer: u32,
63     epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
64 }
65 
66 impl InnerPipeInode {
67     pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
68         let mut events = EPollEventType::empty();
69 
70         let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
71             mode
72         } else {
73             return Err(SystemError::EBADFD);
74         };
75 
76         if mode.contains(FileMode::O_RDONLY) {
77             if self.valid_cnt != 0 {
78                 // 有数据可读
79                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM);
80             }
81 
82             // 没有写者
83             if self.writer == 0 {
84                 events.insert(EPollEventType::EPOLLHUP)
85             }
86         }
87 
88         if mode.contains(FileMode::O_WRONLY) {
89             // 管道内数据未满
90             if self.valid_cnt as usize != PIPE_BUFF_SIZE {
91                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM);
92             }
93 
94             // 没有读者
95             if self.reader == 0 {
96                 events.insert(EPollEventType::EPOLLERR);
97             }
98         }
99 
100         Ok(events.bits() as usize)
101     }
102 
103     pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
104         self.epitems.lock().push_back(epitem);
105         Ok(())
106     }
107 }
108 
109 impl LockedPipeInode {
110     pub fn new() -> Arc<Self> {
111         let inner = InnerPipeInode {
112             self_ref: Weak::default(),
113             valid_cnt: 0,
114             read_pos: 0,
115             write_pos: 0,
116             data: [0; PIPE_BUFF_SIZE],
117 
118             metadata: Metadata {
119                 dev_id: 0,
120                 inode_id: generate_inode_id(),
121                 size: PIPE_BUFF_SIZE as i64,
122                 blk_size: 0,
123                 blocks: 0,
124                 atime: PosixTimeSpec::default(),
125                 mtime: PosixTimeSpec::default(),
126                 ctime: PosixTimeSpec::default(),
127                 file_type: FileType::Pipe,
128                 mode: ModeType::from_bits_truncate(0o666),
129                 nlinks: 1,
130                 uid: 0,
131                 gid: 0,
132                 raw_dev: Default::default(),
133             },
134             reader: 0,
135             writer: 0,
136             epitems: SpinLock::new(LinkedList::new()),
137         };
138         let result = Arc::new(Self {
139             inner: SpinLock::new(inner),
140             read_wait_queue: WaitQueue::default(),
141             write_wait_queue: WaitQueue::default(),
142         });
143         let mut guard = result.inner.lock();
144         guard.self_ref = Arc::downgrade(&result);
145         // 释放锁
146         drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
147         return result;
148     }
149 
150     pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
151         &self.inner
152     }
153 }
154 
155 impl IndexNode for LockedPipeInode {
156     fn read_at(
157         &self,
158         _offset: usize,
159         len: usize,
160         buf: &mut [u8],
161         data_guard: SpinLockGuard<FilePrivateData>,
162     ) -> Result<usize, SystemError> {
163         let data = data_guard.clone();
164         drop(data_guard);
165         // 获取mode
166         let mode: FileMode;
167         if let FilePrivateData::Pipefs(pdata) = &data {
168             mode = pdata.mode;
169         } else {
170             return Err(SystemError::EBADF);
171         }
172 
173         if buf.len() < len {
174             return Err(SystemError::EINVAL);
175         }
176         // 加锁
177         let mut inode = self.inner.lock();
178 
179         // 如果管道里面没有数据,则唤醒写端,
180         while inode.valid_cnt == 0 {
181             // 如果当前管道写者数为0,则返回EOF
182             if inode.writer == 0 {
183                 return Ok(0);
184             }
185 
186             self.write_wait_queue
187                 .wakeup(Some(ProcessState::Blocked(true)));
188 
189             // 如果为非阻塞管道,直接返回错误
190             if mode.contains(FileMode::O_NONBLOCK) {
191                 drop(inode);
192                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
193             }
194 
195             // 否则在读等待队列中睡眠,并释放锁
196             unsafe {
197                 let irq_guard = CurrentIrqArch::save_and_disable_irq();
198 
199                 drop(inode);
200                 self.read_wait_queue.sleep_without_schedule();
201                 drop(irq_guard);
202             }
203             schedule(SchedMode::SM_NONE);
204             inode = self.inner.lock();
205         }
206 
207         let mut num = inode.valid_cnt as usize;
208         //决定要输出的字节
209         let start = inode.read_pos as usize;
210         //如果读端希望读取的字节数大于有效字节数,则输出有效字节
211         let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE;
212         //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
213         if len < inode.valid_cnt as usize {
214             end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE;
215             num = len;
216         }
217 
218         // 从管道拷贝数据到用户的缓冲区
219 
220         if end < start {
221             buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]);
222             buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]);
223         } else {
224             buf[0..num].copy_from_slice(&inode.data[start..end]);
225         }
226 
227         //更新读位置以及valid_cnt
228         inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
229         inode.valid_cnt -= num as i32;
230 
231         // 读完以后如果未读完,则唤醒下一个读者
232         if inode.valid_cnt > 0 {
233             self.read_wait_queue
234                 .wakeup(Some(ProcessState::Blocked(true)));
235         }
236 
237         //读完后解锁并唤醒等待在写等待队列中的进程
238         self.write_wait_queue
239             .wakeup(Some(ProcessState::Blocked(true)));
240 
241         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
242         // 唤醒epoll中等待的进程
243         EventPoll::wakeup_epoll(&inode.epitems, pollflag)?;
244 
245         //返回读取的字节数
246         return Ok(num);
247     }
248 
249     fn open(
250         &self,
251         mut data: SpinLockGuard<FilePrivateData>,
252         mode: &crate::filesystem::vfs::file::FileMode,
253     ) -> Result<(), SystemError> {
254         let mut guard = self.inner.lock();
255         // 不能以读写方式打开管道
256         if mode.contains(FileMode::O_RDWR) {
257             return Err(SystemError::EACCES);
258         }
259         if mode.contains(FileMode::O_RDONLY) {
260             guard.reader += 1;
261         }
262         if mode.contains(FileMode::O_WRONLY) {
263             guard.writer += 1;
264         }
265 
266         // 设置mode
267         *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode });
268 
269         return Ok(());
270     }
271 
272     fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
273         let inode = self.inner.lock();
274         let mut metadata = inode.metadata.clone();
275         metadata.size = inode.data.len() as i64;
276 
277         return Ok(metadata);
278     }
279 
280     fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> {
281         let mode: FileMode;
282         if let FilePrivateData::Pipefs(pipe_data) = &*data {
283             mode = pipe_data.mode;
284         } else {
285             return Err(SystemError::EBADF);
286         }
287         let mut guard = self.inner.lock();
288 
289         // 写端关闭
290         if mode.contains(FileMode::O_WRONLY) {
291             assert!(guard.writer > 0);
292             guard.writer -= 1;
293             // 如果已经没有写端了,则唤醒读端
294             if guard.writer == 0 {
295                 self.read_wait_queue
296                     .wakeup_all(Some(ProcessState::Blocked(true)));
297             }
298         }
299 
300         // 读端关闭
301         if mode.contains(FileMode::O_RDONLY) {
302             assert!(guard.reader > 0);
303             guard.reader -= 1;
304             // 如果已经没有写端了,则唤醒读端
305             if guard.reader == 0 {
306                 self.write_wait_queue
307                     .wakeup_all(Some(ProcessState::Blocked(true)));
308             }
309         }
310 
311         return Ok(());
312     }
313 
314     fn write_at(
315         &self,
316         _offset: usize,
317         len: usize,
318         buf: &[u8],
319         data: SpinLockGuard<FilePrivateData>,
320     ) -> Result<usize, SystemError> {
321         // 获取mode
322         let mode: FileMode;
323         if let FilePrivateData::Pipefs(pdata) = &*data {
324             mode = pdata.mode;
325         } else {
326             return Err(SystemError::EBADF);
327         }
328 
329         if buf.len() < len || len > PIPE_BUFF_SIZE {
330             return Err(SystemError::EINVAL);
331         }
332         // 加锁
333 
334         let mut inode = self.inner.lock();
335 
336         if inode.reader == 0 {
337             // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号
338         }
339 
340         // 如果管道空间不够
341 
342         while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE {
343             // 唤醒读端
344             self.read_wait_queue
345                 .wakeup(Some(ProcessState::Blocked(true)));
346 
347             // 如果为非阻塞管道,直接返回错误
348             if mode.contains(FileMode::O_NONBLOCK) {
349                 drop(inode);
350                 return Err(SystemError::ENOMEM);
351             }
352 
353             // 解锁并睡眠
354             unsafe {
355                 let irq_guard = CurrentIrqArch::save_and_disable_irq();
356                 drop(inode);
357                 self.write_wait_queue.sleep_without_schedule();
358                 drop(irq_guard);
359             }
360             schedule(SchedMode::SM_NONE);
361             inode = self.inner.lock();
362         }
363 
364         // 决定要输入的字节
365         let start = inode.write_pos as usize;
366         let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE;
367         // 从用户的缓冲区拷贝数据到管道
368 
369         if end < start {
370             inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
371             inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
372         } else {
373             inode.data[start..end].copy_from_slice(&buf[0..len]);
374         }
375         // 更新写位置以及valid_cnt
376         inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
377         inode.valid_cnt += len as i32;
378 
379         // 写完后还有位置,则唤醒下一个写者
380         if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE {
381             self.write_wait_queue
382                 .wakeup(Some(ProcessState::Blocked(true)));
383         }
384 
385         // 读完后解锁并唤醒等待在读等待队列中的进程
386         self.read_wait_queue
387             .wakeup(Some(ProcessState::Blocked(true)));
388 
389         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
390         // 唤醒epoll中等待的进程
391         EventPoll::wakeup_epoll(&inode.epitems, pollflag)?;
392 
393         // 返回写入的字节数
394         return Ok(len);
395     }
396 
397     fn as_any_ref(&self) -> &dyn core::any::Any {
398         self
399     }
400 
401     fn get_entry_name_and_metadata(
402         &self,
403         ino: crate::filesystem::vfs::InodeId,
404     ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
405         // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
406         let name = self.get_entry_name(ino)?;
407         let entry = self.find(&name)?;
408         return Ok((name, entry.metadata()?));
409     }
410 
411     fn fs(&self) -> Arc<(dyn FileSystem)> {
412         todo!()
413     }
414 
415     fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
416         return Err(SystemError::ENOSYS);
417     }
418 
419     fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
420         return self.inner.lock().poll(private_data);
421     }
422 }
423