xref: /DragonOS/kernel/src/ipc/pipe.rs (revision 4afc5b7b7bed743d18c058e4843dcbdb2f3ad751)
1 use crate::{
2     arch::CurrentIrqArch,
3     exception::InterruptArch,
4     filesystem::vfs::{
5         core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem,
6         FileType, IndexNode, Metadata,
7     },
8     libs::{
9         spinlock::{SpinLock, SpinLockGuard},
10         wait_queue::WaitQueue,
11     },
12     net::event_poll::{EPollEventType, EPollItem, EventPoll},
13     process::ProcessState,
14     sched::{schedule, SchedMode},
15     time::PosixTimeSpec,
16 };
17 
18 use alloc::{
19     collections::LinkedList,
20     sync::{Arc, Weak},
21     vec::Vec,
22 };
23 use system_error::SystemError;
24 
25 /// 我们设定pipe_buff的总大小为1024字节
26 const PIPE_BUFF_SIZE: usize = 1024;
27 
28 #[derive(Debug, Clone)]
29 pub struct PipeFsPrivateData {
30     mode: FileMode,
31 }
32 
33 impl PipeFsPrivateData {
34     pub fn new(mode: FileMode) -> Self {
35         return PipeFsPrivateData { mode };
36     }
37 
38     pub fn set_mode(&mut self, mode: FileMode) {
39         self.mode = mode;
40     }
41 }
42 
43 /// @brief 管道文件i节点(锁)
44 #[derive(Debug)]
45 pub struct LockedPipeInode {
46     inner: SpinLock<InnerPipeInode>,
47     read_wait_queue: WaitQueue,
48     write_wait_queue: WaitQueue,
49 }
50 
51 /// @brief 管道文件i节点(无锁)
52 #[derive(Debug)]
53 pub struct InnerPipeInode {
54     self_ref: Weak<LockedPipeInode>,
55     /// 管道内可读的数据数
56     valid_cnt: i32,
57     read_pos: i32,
58     write_pos: i32,
59     data: [u8; PIPE_BUFF_SIZE],
60     /// INode 元数据
61     metadata: Metadata,
62     reader: u32,
63     writer: u32,
64     epitems: SpinLock<LinkedList<Arc<EPollItem>>>,
65 }
66 
67 impl InnerPipeInode {
68     pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
69         let mut events = EPollEventType::empty();
70 
71         let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data {
72             mode
73         } else {
74             return Err(SystemError::EBADFD);
75         };
76 
77         if mode.contains(FileMode::O_RDONLY) {
78             if self.valid_cnt != 0 {
79                 // 有数据可读
80                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM);
81             }
82 
83             // 没有写者
84             if self.writer == 0 {
85                 events.insert(EPollEventType::EPOLLHUP)
86             }
87         }
88 
89         if mode.contains(FileMode::O_WRONLY) {
90             // 管道内数据未满
91             if self.valid_cnt as usize != PIPE_BUFF_SIZE {
92                 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM);
93             }
94 
95             // 没有读者
96             if self.reader == 0 {
97                 events.insert(EPollEventType::EPOLLERR);
98             }
99         }
100 
101         Ok(events.bits() as usize)
102     }
103 
104     pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> {
105         self.epitems.lock().push_back(epitem);
106         Ok(())
107     }
108 
109     pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> {
110         let is_remove = !self
111             .epitems
112             .lock_irqsave()
113             .extract_if(|x| x.epoll().ptr_eq(epoll))
114             .collect::<Vec<_>>()
115             .is_empty();
116 
117         if is_remove {
118             return Ok(());
119         }
120 
121         Err(SystemError::ENOENT)
122     }
123 }
124 
125 impl LockedPipeInode {
126     pub fn new() -> Arc<Self> {
127         let inner = InnerPipeInode {
128             self_ref: Weak::default(),
129             valid_cnt: 0,
130             read_pos: 0,
131             write_pos: 0,
132             data: [0; PIPE_BUFF_SIZE],
133 
134             metadata: Metadata {
135                 dev_id: 0,
136                 inode_id: generate_inode_id(),
137                 size: PIPE_BUFF_SIZE as i64,
138                 blk_size: 0,
139                 blocks: 0,
140                 atime: PosixTimeSpec::default(),
141                 mtime: PosixTimeSpec::default(),
142                 ctime: PosixTimeSpec::default(),
143                 file_type: FileType::Pipe,
144                 mode: ModeType::from_bits_truncate(0o666),
145                 nlinks: 1,
146                 uid: 0,
147                 gid: 0,
148                 raw_dev: Default::default(),
149             },
150             reader: 0,
151             writer: 0,
152             epitems: SpinLock::new(LinkedList::new()),
153         };
154         let result = Arc::new(Self {
155             inner: SpinLock::new(inner),
156             read_wait_queue: WaitQueue::default(),
157             write_wait_queue: WaitQueue::default(),
158         });
159         let mut guard = result.inner.lock();
160         guard.self_ref = Arc::downgrade(&result);
161         // 释放锁
162         drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁
163         return result;
164     }
165 
166     pub fn inner(&self) -> &SpinLock<InnerPipeInode> {
167         &self.inner
168     }
169 }
170 
171 impl IndexNode for LockedPipeInode {
172     fn read_at(
173         &self,
174         _offset: usize,
175         len: usize,
176         buf: &mut [u8],
177         data_guard: SpinLockGuard<FilePrivateData>,
178     ) -> Result<usize, SystemError> {
179         let data = data_guard.clone();
180         drop(data_guard);
181         // 获取mode
182         let mode: FileMode;
183         if let FilePrivateData::Pipefs(pdata) = &data {
184             mode = pdata.mode;
185         } else {
186             return Err(SystemError::EBADF);
187         }
188 
189         if buf.len() < len {
190             return Err(SystemError::EINVAL);
191         }
192         // 加锁
193         let mut inode = self.inner.lock();
194 
195         // 如果管道里面没有数据,则唤醒写端,
196         while inode.valid_cnt == 0 {
197             // 如果当前管道写者数为0,则返回EOF
198             if inode.writer == 0 {
199                 return Ok(0);
200             }
201 
202             self.write_wait_queue
203                 .wakeup(Some(ProcessState::Blocked(true)));
204 
205             // 如果为非阻塞管道,直接返回错误
206             if mode.contains(FileMode::O_NONBLOCK) {
207                 drop(inode);
208                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
209             }
210 
211             // 否则在读等待队列中睡眠,并释放锁
212             unsafe {
213                 let irq_guard = CurrentIrqArch::save_and_disable_irq();
214 
215                 drop(inode);
216                 self.read_wait_queue.sleep_without_schedule();
217                 drop(irq_guard);
218             }
219             schedule(SchedMode::SM_NONE);
220             inode = self.inner.lock();
221         }
222 
223         let mut num = inode.valid_cnt as usize;
224         //决定要输出的字节
225         let start = inode.read_pos as usize;
226         //如果读端希望读取的字节数大于有效字节数,则输出有效字节
227         let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE;
228         //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节
229         if len < inode.valid_cnt as usize {
230             end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE;
231             num = len;
232         }
233 
234         // 从管道拷贝数据到用户的缓冲区
235 
236         if end < start {
237             buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]);
238             buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]);
239         } else {
240             buf[0..num].copy_from_slice(&inode.data[start..end]);
241         }
242 
243         //更新读位置以及valid_cnt
244         inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
245         inode.valid_cnt -= num as i32;
246 
247         // 读完以后如果未读完,则唤醒下一个读者
248         if inode.valid_cnt > 0 {
249             self.read_wait_queue
250                 .wakeup(Some(ProcessState::Blocked(true)));
251         }
252 
253         //读完后解锁并唤醒等待在写等待队列中的进程
254         self.write_wait_queue
255             .wakeup(Some(ProcessState::Blocked(true)));
256 
257         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
258         // 唤醒epoll中等待的进程
259         EventPoll::wakeup_epoll(&inode.epitems, pollflag)?;
260 
261         //返回读取的字节数
262         return Ok(num);
263     }
264 
265     fn open(
266         &self,
267         mut data: SpinLockGuard<FilePrivateData>,
268         mode: &crate::filesystem::vfs::file::FileMode,
269     ) -> Result<(), SystemError> {
270         let mut guard = self.inner.lock();
271         // 不能以读写方式打开管道
272         if mode.contains(FileMode::O_RDWR) {
273             return Err(SystemError::EACCES);
274         }
275         if mode.contains(FileMode::O_RDONLY) {
276             guard.reader += 1;
277         }
278         if mode.contains(FileMode::O_WRONLY) {
279             guard.writer += 1;
280         }
281 
282         // 设置mode
283         *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode });
284 
285         return Ok(());
286     }
287 
288     fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
289         let inode = self.inner.lock();
290         let mut metadata = inode.metadata.clone();
291         metadata.size = inode.data.len() as i64;
292 
293         return Ok(metadata);
294     }
295 
296     fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> {
297         let mode: FileMode;
298         if let FilePrivateData::Pipefs(pipe_data) = &*data {
299             mode = pipe_data.mode;
300         } else {
301             return Err(SystemError::EBADF);
302         }
303         let mut guard = self.inner.lock();
304 
305         // 写端关闭
306         if mode.contains(FileMode::O_WRONLY) {
307             assert!(guard.writer > 0);
308             guard.writer -= 1;
309             // 如果已经没有写端了,则唤醒读端
310             if guard.writer == 0 {
311                 self.read_wait_queue
312                     .wakeup_all(Some(ProcessState::Blocked(true)));
313             }
314         }
315 
316         // 读端关闭
317         if mode.contains(FileMode::O_RDONLY) {
318             assert!(guard.reader > 0);
319             guard.reader -= 1;
320             // 如果已经没有写端了,则唤醒读端
321             if guard.reader == 0 {
322                 self.write_wait_queue
323                     .wakeup_all(Some(ProcessState::Blocked(true)));
324             }
325         }
326 
327         return Ok(());
328     }
329 
330     fn write_at(
331         &self,
332         _offset: usize,
333         len: usize,
334         buf: &[u8],
335         data: SpinLockGuard<FilePrivateData>,
336     ) -> Result<usize, SystemError> {
337         // 获取mode
338         let mode: FileMode;
339         if let FilePrivateData::Pipefs(pdata) = &*data {
340             mode = pdata.mode;
341         } else {
342             return Err(SystemError::EBADF);
343         }
344 
345         if buf.len() < len || len > PIPE_BUFF_SIZE {
346             return Err(SystemError::EINVAL);
347         }
348         // 加锁
349 
350         let mut inode = self.inner.lock();
351 
352         if inode.reader == 0 {
353             // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号
354         }
355 
356         // 如果管道空间不够
357 
358         while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE {
359             // 唤醒读端
360             self.read_wait_queue
361                 .wakeup(Some(ProcessState::Blocked(true)));
362 
363             // 如果为非阻塞管道,直接返回错误
364             if mode.contains(FileMode::O_NONBLOCK) {
365                 drop(inode);
366                 return Err(SystemError::ENOMEM);
367             }
368 
369             // 解锁并睡眠
370             unsafe {
371                 let irq_guard = CurrentIrqArch::save_and_disable_irq();
372                 drop(inode);
373                 self.write_wait_queue.sleep_without_schedule();
374                 drop(irq_guard);
375             }
376             schedule(SchedMode::SM_NONE);
377             inode = self.inner.lock();
378         }
379 
380         // 决定要输入的字节
381         let start = inode.write_pos as usize;
382         let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE;
383         // 从用户的缓冲区拷贝数据到管道
384 
385         if end < start {
386             inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]);
387             inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]);
388         } else {
389             inode.data[start..end].copy_from_slice(&buf[0..len]);
390         }
391         // 更新写位置以及valid_cnt
392         inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
393         inode.valid_cnt += len as i32;
394 
395         // 写完后还有位置,则唤醒下一个写者
396         if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE {
397             self.write_wait_queue
398                 .wakeup(Some(ProcessState::Blocked(true)));
399         }
400 
401         // 读完后解锁并唤醒等待在读等待队列中的进程
402         self.read_wait_queue
403             .wakeup(Some(ProcessState::Blocked(true)));
404 
405         let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32);
406         // 唤醒epoll中等待的进程
407         EventPoll::wakeup_epoll(&inode.epitems, pollflag)?;
408 
409         // 返回写入的字节数
410         return Ok(len);
411     }
412 
413     fn as_any_ref(&self) -> &dyn core::any::Any {
414         self
415     }
416 
417     fn get_entry_name_and_metadata(
418         &self,
419         ino: crate::filesystem::vfs::InodeId,
420     ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> {
421         // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。
422         let name = self.get_entry_name(ino)?;
423         let entry = self.find(&name)?;
424         return Ok((name, entry.metadata()?));
425     }
426 
427     fn fs(&self) -> Arc<(dyn FileSystem)> {
428         todo!()
429     }
430 
431     fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
432         return Err(SystemError::ENOSYS);
433     }
434 
435     fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> {
436         return self.inner.lock().poll(private_data);
437     }
438 }
439