1 use crate::{ 2 arch::CurrentIrqArch, 3 exception::InterruptArch, 4 filesystem::vfs::{ 5 core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem, 6 FileType, IndexNode, Metadata, 7 }, 8 libs::{ 9 spinlock::{SpinLock, SpinLockGuard}, 10 wait_queue::WaitQueue, 11 }, 12 net::event_poll::{EPollEventType, EPollItem, EventPoll}, 13 process::ProcessState, 14 sched::{schedule, SchedMode}, 15 time::PosixTimeSpec, 16 }; 17 18 use alloc::{ 19 collections::LinkedList, 20 sync::{Arc, Weak}, 21 }; 22 use system_error::SystemError; 23 24 /// 我们设定pipe_buff的总大小为1024字节 25 const PIPE_BUFF_SIZE: usize = 1024; 26 27 #[derive(Debug, Clone)] 28 pub struct PipeFsPrivateData { 29 mode: FileMode, 30 } 31 32 impl PipeFsPrivateData { 33 pub fn new(mode: FileMode) -> Self { 34 return PipeFsPrivateData { mode }; 35 } 36 37 pub fn set_mode(&mut self, mode: FileMode) { 38 self.mode = mode; 39 } 40 } 41 42 /// @brief 管道文件i节点(锁) 43 #[derive(Debug)] 44 pub struct LockedPipeInode { 45 inner: SpinLock<InnerPipeInode>, 46 read_wait_queue: WaitQueue, 47 write_wait_queue: WaitQueue, 48 } 49 50 /// @brief 管道文件i节点(无锁) 51 #[derive(Debug)] 52 pub struct InnerPipeInode { 53 self_ref: Weak<LockedPipeInode>, 54 /// 管道内可读的数据数 55 valid_cnt: i32, 56 read_pos: i32, 57 write_pos: i32, 58 data: [u8; PIPE_BUFF_SIZE], 59 /// INode 元数据 60 metadata: Metadata, 61 reader: u32, 62 writer: u32, 63 epitems: SpinLock<LinkedList<Arc<EPollItem>>>, 64 } 65 66 impl InnerPipeInode { 67 pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 68 let mut events = EPollEventType::empty(); 69 70 let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data { 71 mode 72 } else { 73 return Err(SystemError::EBADFD); 74 }; 75 76 if mode.contains(FileMode::O_RDONLY) { 77 if self.valid_cnt != 0 { 78 // 有数据可读 79 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM); 80 } 81 82 // 没有写者 83 if self.writer == 0 { 84 events.insert(EPollEventType::EPOLLHUP) 85 } 86 } 87 88 if mode.contains(FileMode::O_WRONLY) { 89 // 管道内数据未满 90 if self.valid_cnt as usize != PIPE_BUFF_SIZE { 91 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM); 92 } 93 94 // 没有读者 95 if self.reader == 0 { 96 events.insert(EPollEventType::EPOLLERR); 97 } 98 } 99 100 Ok(events.bits() as usize) 101 } 102 103 pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> { 104 self.epitems.lock().push_back(epitem); 105 Ok(()) 106 } 107 } 108 109 impl LockedPipeInode { 110 pub fn new() -> Arc<Self> { 111 let inner = InnerPipeInode { 112 self_ref: Weak::default(), 113 valid_cnt: 0, 114 read_pos: 0, 115 write_pos: 0, 116 data: [0; PIPE_BUFF_SIZE], 117 118 metadata: Metadata { 119 dev_id: 0, 120 inode_id: generate_inode_id(), 121 size: PIPE_BUFF_SIZE as i64, 122 blk_size: 0, 123 blocks: 0, 124 atime: PosixTimeSpec::default(), 125 mtime: PosixTimeSpec::default(), 126 ctime: PosixTimeSpec::default(), 127 file_type: FileType::Pipe, 128 mode: ModeType::from_bits_truncate(0o666), 129 nlinks: 1, 130 uid: 0, 131 gid: 0, 132 raw_dev: Default::default(), 133 }, 134 reader: 0, 135 writer: 0, 136 epitems: SpinLock::new(LinkedList::new()), 137 }; 138 let result = Arc::new(Self { 139 inner: SpinLock::new(inner), 140 read_wait_queue: WaitQueue::default(), 141 write_wait_queue: WaitQueue::default(), 142 }); 143 let mut guard = result.inner.lock(); 144 guard.self_ref = Arc::downgrade(&result); 145 // 释放锁 146 drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁 147 return result; 148 } 149 150 pub fn inner(&self) -> &SpinLock<InnerPipeInode> { 151 &self.inner 152 } 153 } 154 155 impl IndexNode for LockedPipeInode { 156 fn read_at( 157 &self, 158 _offset: usize, 159 len: usize, 160 buf: &mut [u8], 161 data: SpinLockGuard<FilePrivateData>, 162 ) -> Result<usize, SystemError> { 163 // 获取mode 164 let mode: FileMode; 165 if let FilePrivateData::Pipefs(pdata) = &*data { 166 mode = pdata.mode; 167 } else { 168 return Err(SystemError::EBADF); 169 } 170 171 if buf.len() < len { 172 return Err(SystemError::EINVAL); 173 } 174 // 加锁 175 let mut inode = self.inner.lock(); 176 177 // 如果管道里面没有数据,则唤醒写端, 178 while inode.valid_cnt == 0 { 179 // 如果当前管道写者数为0,则返回EOF 180 if inode.writer == 0 { 181 return Ok(0); 182 } 183 184 self.write_wait_queue 185 .wakeup(Some(ProcessState::Blocked(true))); 186 187 // 如果为非阻塞管道,直接返回错误 188 if mode.contains(FileMode::O_NONBLOCK) { 189 drop(inode); 190 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 191 } 192 193 // 否则在读等待队列中睡眠,并释放锁 194 unsafe { 195 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 196 197 drop(inode); 198 self.read_wait_queue.sleep_without_schedule(); 199 drop(irq_guard); 200 } 201 schedule(SchedMode::SM_NONE); 202 inode = self.inner.lock(); 203 } 204 205 let mut num = inode.valid_cnt as usize; 206 //决定要输出的字节 207 let start = inode.read_pos as usize; 208 //如果读端希望读取的字节数大于有效字节数,则输出有效字节 209 let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE; 210 //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节 211 if len < inode.valid_cnt as usize { 212 end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE; 213 num = len; 214 } 215 216 // 从管道拷贝数据到用户的缓冲区 217 218 if end < start { 219 buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]); 220 buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]); 221 } else { 222 buf[0..num].copy_from_slice(&inode.data[start..end]); 223 } 224 225 //更新读位置以及valid_cnt 226 inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32; 227 inode.valid_cnt -= num as i32; 228 229 // 读完以后如果未读完,则唤醒下一个读者 230 if inode.valid_cnt > 0 { 231 self.read_wait_queue 232 .wakeup(Some(ProcessState::Blocked(true))); 233 } 234 235 //读完后解锁并唤醒等待在写等待队列中的进程 236 self.write_wait_queue 237 .wakeup(Some(ProcessState::Blocked(true))); 238 239 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 240 // 唤醒epoll中等待的进程 241 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 242 243 //返回读取的字节数 244 return Ok(num); 245 } 246 247 fn open( 248 &self, 249 mut data: SpinLockGuard<FilePrivateData>, 250 mode: &crate::filesystem::vfs::file::FileMode, 251 ) -> Result<(), SystemError> { 252 let mut guard = self.inner.lock(); 253 // 不能以读写方式打开管道 254 if mode.contains(FileMode::O_RDWR) { 255 return Err(SystemError::EACCES); 256 } 257 if mode.contains(FileMode::O_RDONLY) { 258 guard.reader += 1; 259 } 260 if mode.contains(FileMode::O_WRONLY) { 261 guard.writer += 1; 262 } 263 264 // 设置mode 265 *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode }); 266 267 return Ok(()); 268 } 269 270 fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> { 271 let inode = self.inner.lock(); 272 let mut metadata = inode.metadata.clone(); 273 metadata.size = inode.data.len() as i64; 274 275 return Ok(metadata); 276 } 277 278 fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> { 279 let mode: FileMode; 280 if let FilePrivateData::Pipefs(pipe_data) = &*data { 281 mode = pipe_data.mode; 282 } else { 283 return Err(SystemError::EBADF); 284 } 285 let mut guard = self.inner.lock(); 286 287 // 写端关闭 288 if mode.contains(FileMode::O_WRONLY) { 289 assert!(guard.writer > 0); 290 guard.writer -= 1; 291 // 如果已经没有写端了,则唤醒读端 292 if guard.writer == 0 { 293 self.read_wait_queue 294 .wakeup_all(Some(ProcessState::Blocked(true))); 295 } 296 } 297 298 // 读端关闭 299 if mode.contains(FileMode::O_RDONLY) { 300 assert!(guard.reader > 0); 301 guard.reader -= 1; 302 // 如果已经没有写端了,则唤醒读端 303 if guard.reader == 0 { 304 self.write_wait_queue 305 .wakeup_all(Some(ProcessState::Blocked(true))); 306 } 307 } 308 309 return Ok(()); 310 } 311 312 fn write_at( 313 &self, 314 _offset: usize, 315 len: usize, 316 buf: &[u8], 317 data: SpinLockGuard<FilePrivateData>, 318 ) -> Result<usize, SystemError> { 319 // 获取mode 320 let mode: FileMode; 321 if let FilePrivateData::Pipefs(pdata) = &*data { 322 mode = pdata.mode; 323 } else { 324 return Err(SystemError::EBADF); 325 } 326 327 if buf.len() < len || len > PIPE_BUFF_SIZE { 328 return Err(SystemError::EINVAL); 329 } 330 // 加锁 331 332 let mut inode = self.inner.lock(); 333 334 if inode.reader == 0 { 335 // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号 336 } 337 338 // 如果管道空间不够 339 340 while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE { 341 // 唤醒读端 342 self.read_wait_queue 343 .wakeup(Some(ProcessState::Blocked(true))); 344 345 // 如果为非阻塞管道,直接返回错误 346 if mode.contains(FileMode::O_NONBLOCK) { 347 drop(inode); 348 return Err(SystemError::ENOMEM); 349 } 350 351 // 解锁并睡眠 352 unsafe { 353 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 354 drop(inode); 355 self.write_wait_queue.sleep_without_schedule(); 356 drop(irq_guard); 357 } 358 schedule(SchedMode::SM_NONE); 359 inode = self.inner.lock(); 360 } 361 362 // 决定要输入的字节 363 let start = inode.write_pos as usize; 364 let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE; 365 // 从用户的缓冲区拷贝数据到管道 366 367 if end < start { 368 inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]); 369 inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]); 370 } else { 371 inode.data[start..end].copy_from_slice(&buf[0..len]); 372 } 373 // 更新写位置以及valid_cnt 374 inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32; 375 inode.valid_cnt += len as i32; 376 377 // 写完后还有位置,则唤醒下一个写者 378 if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE { 379 self.write_wait_queue 380 .wakeup(Some(ProcessState::Blocked(true))); 381 } 382 383 // 读完后解锁并唤醒等待在读等待队列中的进程 384 self.read_wait_queue 385 .wakeup(Some(ProcessState::Blocked(true))); 386 387 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 388 // 唤醒epoll中等待的进程 389 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 390 391 // 返回写入的字节数 392 return Ok(len); 393 } 394 395 fn as_any_ref(&self) -> &dyn core::any::Any { 396 self 397 } 398 399 fn get_entry_name_and_metadata( 400 &self, 401 ino: crate::filesystem::vfs::InodeId, 402 ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> { 403 // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。 404 let name = self.get_entry_name(ino)?; 405 let entry = self.find(&name)?; 406 return Ok((name, entry.metadata()?)); 407 } 408 409 fn fs(&self) -> Arc<(dyn FileSystem)> { 410 todo!() 411 } 412 413 fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> { 414 return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP); 415 } 416 417 fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 418 return self.inner.lock().poll(private_data); 419 } 420 } 421