1 use crate::{ 2 arch::CurrentIrqArch, 3 exception::InterruptArch, 4 filesystem::vfs::{ 5 core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem, 6 FileType, IndexNode, Metadata, 7 }, 8 libs::{ 9 spinlock::{SpinLock, SpinLockGuard}, 10 wait_queue::WaitQueue, 11 }, 12 net::event_poll::{EPollEventType, EPollItem, EventPoll}, 13 process::ProcessState, 14 sched::{schedule, SchedMode}, 15 time::PosixTimeSpec, 16 }; 17 18 use alloc::{ 19 collections::LinkedList, 20 sync::{Arc, Weak}, 21 vec::Vec, 22 }; 23 use system_error::SystemError; 24 25 /// 我们设定pipe_buff的总大小为1024字节 26 const PIPE_BUFF_SIZE: usize = 1024; 27 28 #[derive(Debug, Clone)] 29 pub struct PipeFsPrivateData { 30 mode: FileMode, 31 } 32 33 impl PipeFsPrivateData { 34 pub fn new(mode: FileMode) -> Self { 35 return PipeFsPrivateData { mode }; 36 } 37 38 pub fn set_mode(&mut self, mode: FileMode) { 39 self.mode = mode; 40 } 41 } 42 43 /// @brief 管道文件i节点(锁) 44 #[derive(Debug)] 45 pub struct LockedPipeInode { 46 inner: SpinLock<InnerPipeInode>, 47 read_wait_queue: WaitQueue, 48 write_wait_queue: WaitQueue, 49 } 50 51 /// @brief 管道文件i节点(无锁) 52 #[derive(Debug)] 53 pub struct InnerPipeInode { 54 self_ref: Weak<LockedPipeInode>, 55 /// 管道内可读的数据数 56 valid_cnt: i32, 57 read_pos: i32, 58 write_pos: i32, 59 data: [u8; PIPE_BUFF_SIZE], 60 /// INode 元数据 61 metadata: Metadata, 62 reader: u32, 63 writer: u32, 64 epitems: SpinLock<LinkedList<Arc<EPollItem>>>, 65 } 66 67 impl InnerPipeInode { 68 pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 69 let mut events = EPollEventType::empty(); 70 71 let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data { 72 mode 73 } else { 74 return Err(SystemError::EBADFD); 75 }; 76 77 if mode.contains(FileMode::O_RDONLY) { 78 if self.valid_cnt != 0 { 79 // 有数据可读 80 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM); 81 } 82 83 // 没有写者 84 if self.writer == 0 { 85 events.insert(EPollEventType::EPOLLHUP) 86 } 87 } 88 89 if mode.contains(FileMode::O_WRONLY) { 90 // 管道内数据未满 91 if self.valid_cnt as usize != PIPE_BUFF_SIZE { 92 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM); 93 } 94 95 // 没有读者 96 if self.reader == 0 { 97 events.insert(EPollEventType::EPOLLERR); 98 } 99 } 100 101 Ok(events.bits() as usize) 102 } 103 104 pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> { 105 self.epitems.lock().push_back(epitem); 106 Ok(()) 107 } 108 109 pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> { 110 let is_remove = !self 111 .epitems 112 .lock_irqsave() 113 .extract_if(|x| x.epoll().ptr_eq(epoll)) 114 .collect::<Vec<_>>() 115 .is_empty(); 116 117 if is_remove { 118 return Ok(()); 119 } 120 121 Err(SystemError::ENOENT) 122 } 123 } 124 125 impl LockedPipeInode { 126 pub fn new() -> Arc<Self> { 127 let inner = InnerPipeInode { 128 self_ref: Weak::default(), 129 valid_cnt: 0, 130 read_pos: 0, 131 write_pos: 0, 132 data: [0; PIPE_BUFF_SIZE], 133 134 metadata: Metadata { 135 dev_id: 0, 136 inode_id: generate_inode_id(), 137 size: PIPE_BUFF_SIZE as i64, 138 blk_size: 0, 139 blocks: 0, 140 atime: PosixTimeSpec::default(), 141 mtime: PosixTimeSpec::default(), 142 ctime: PosixTimeSpec::default(), 143 file_type: FileType::Pipe, 144 mode: ModeType::from_bits_truncate(0o666), 145 nlinks: 1, 146 uid: 0, 147 gid: 0, 148 raw_dev: Default::default(), 149 }, 150 reader: 0, 151 writer: 0, 152 epitems: SpinLock::new(LinkedList::new()), 153 }; 154 let result = Arc::new(Self { 155 inner: SpinLock::new(inner), 156 read_wait_queue: WaitQueue::default(), 157 write_wait_queue: WaitQueue::default(), 158 }); 159 let mut guard = result.inner.lock(); 160 guard.self_ref = Arc::downgrade(&result); 161 // 释放锁 162 drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁 163 return result; 164 } 165 166 pub fn inner(&self) -> &SpinLock<InnerPipeInode> { 167 &self.inner 168 } 169 } 170 171 impl IndexNode for LockedPipeInode { 172 fn read_at( 173 &self, 174 _offset: usize, 175 len: usize, 176 buf: &mut [u8], 177 data_guard: SpinLockGuard<FilePrivateData>, 178 ) -> Result<usize, SystemError> { 179 let data = data_guard.clone(); 180 drop(data_guard); 181 // 获取mode 182 let mode: FileMode; 183 if let FilePrivateData::Pipefs(pdata) = &data { 184 mode = pdata.mode; 185 } else { 186 return Err(SystemError::EBADF); 187 } 188 189 if buf.len() < len { 190 return Err(SystemError::EINVAL); 191 } 192 // 加锁 193 let mut inode = self.inner.lock(); 194 195 // 如果管道里面没有数据,则唤醒写端, 196 while inode.valid_cnt == 0 { 197 // 如果当前管道写者数为0,则返回EOF 198 if inode.writer == 0 { 199 return Ok(0); 200 } 201 202 self.write_wait_queue 203 .wakeup(Some(ProcessState::Blocked(true))); 204 205 // 如果为非阻塞管道,直接返回错误 206 if mode.contains(FileMode::O_NONBLOCK) { 207 drop(inode); 208 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 209 } 210 211 // 否则在读等待队列中睡眠,并释放锁 212 unsafe { 213 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 214 215 drop(inode); 216 self.read_wait_queue.sleep_without_schedule(); 217 drop(irq_guard); 218 } 219 schedule(SchedMode::SM_NONE); 220 inode = self.inner.lock(); 221 } 222 223 let mut num = inode.valid_cnt as usize; 224 //决定要输出的字节 225 let start = inode.read_pos as usize; 226 //如果读端希望读取的字节数大于有效字节数,则输出有效字节 227 let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE; 228 //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节 229 if len < inode.valid_cnt as usize { 230 end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE; 231 num = len; 232 } 233 234 // 从管道拷贝数据到用户的缓冲区 235 236 if end < start { 237 buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]); 238 buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]); 239 } else { 240 buf[0..num].copy_from_slice(&inode.data[start..end]); 241 } 242 243 //更新读位置以及valid_cnt 244 inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32; 245 inode.valid_cnt -= num as i32; 246 247 // 读完以后如果未读完,则唤醒下一个读者 248 if inode.valid_cnt > 0 { 249 self.read_wait_queue 250 .wakeup(Some(ProcessState::Blocked(true))); 251 } 252 253 //读完后解锁并唤醒等待在写等待队列中的进程 254 self.write_wait_queue 255 .wakeup(Some(ProcessState::Blocked(true))); 256 257 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 258 // 唤醒epoll中等待的进程 259 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 260 261 //返回读取的字节数 262 return Ok(num); 263 } 264 265 fn open( 266 &self, 267 mut data: SpinLockGuard<FilePrivateData>, 268 mode: &crate::filesystem::vfs::file::FileMode, 269 ) -> Result<(), SystemError> { 270 let mut guard = self.inner.lock(); 271 // 不能以读写方式打开管道 272 if mode.contains(FileMode::O_RDWR) { 273 return Err(SystemError::EACCES); 274 } 275 if mode.contains(FileMode::O_RDONLY) { 276 guard.reader += 1; 277 } 278 if mode.contains(FileMode::O_WRONLY) { 279 guard.writer += 1; 280 } 281 282 // 设置mode 283 *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode }); 284 285 return Ok(()); 286 } 287 288 fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> { 289 let inode = self.inner.lock(); 290 let mut metadata = inode.metadata.clone(); 291 metadata.size = inode.data.len() as i64; 292 293 return Ok(metadata); 294 } 295 296 fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> { 297 let mode: FileMode; 298 if let FilePrivateData::Pipefs(pipe_data) = &*data { 299 mode = pipe_data.mode; 300 } else { 301 return Err(SystemError::EBADF); 302 } 303 let mut guard = self.inner.lock(); 304 305 // 写端关闭 306 if mode.contains(FileMode::O_WRONLY) { 307 assert!(guard.writer > 0); 308 guard.writer -= 1; 309 // 如果已经没有写端了,则唤醒读端 310 if guard.writer == 0 { 311 self.read_wait_queue 312 .wakeup_all(Some(ProcessState::Blocked(true))); 313 } 314 } 315 316 // 读端关闭 317 if mode.contains(FileMode::O_RDONLY) { 318 assert!(guard.reader > 0); 319 guard.reader -= 1; 320 // 如果已经没有写端了,则唤醒读端 321 if guard.reader == 0 { 322 self.write_wait_queue 323 .wakeup_all(Some(ProcessState::Blocked(true))); 324 } 325 } 326 327 return Ok(()); 328 } 329 330 fn write_at( 331 &self, 332 _offset: usize, 333 len: usize, 334 buf: &[u8], 335 data: SpinLockGuard<FilePrivateData>, 336 ) -> Result<usize, SystemError> { 337 // 获取mode 338 let mode: FileMode; 339 if let FilePrivateData::Pipefs(pdata) = &*data { 340 mode = pdata.mode; 341 } else { 342 return Err(SystemError::EBADF); 343 } 344 345 if buf.len() < len || len > PIPE_BUFF_SIZE { 346 return Err(SystemError::EINVAL); 347 } 348 // 加锁 349 350 let mut inode = self.inner.lock(); 351 352 if inode.reader == 0 { 353 // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号 354 } 355 356 // 如果管道空间不够 357 358 while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE { 359 // 唤醒读端 360 self.read_wait_queue 361 .wakeup(Some(ProcessState::Blocked(true))); 362 363 // 如果为非阻塞管道,直接返回错误 364 if mode.contains(FileMode::O_NONBLOCK) { 365 drop(inode); 366 return Err(SystemError::ENOMEM); 367 } 368 369 // 解锁并睡眠 370 unsafe { 371 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 372 drop(inode); 373 self.write_wait_queue.sleep_without_schedule(); 374 drop(irq_guard); 375 } 376 schedule(SchedMode::SM_NONE); 377 inode = self.inner.lock(); 378 } 379 380 // 决定要输入的字节 381 let start = inode.write_pos as usize; 382 let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE; 383 // 从用户的缓冲区拷贝数据到管道 384 385 if end < start { 386 inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]); 387 inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]); 388 } else { 389 inode.data[start..end].copy_from_slice(&buf[0..len]); 390 } 391 // 更新写位置以及valid_cnt 392 inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32; 393 inode.valid_cnt += len as i32; 394 395 // 写完后还有位置,则唤醒下一个写者 396 if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE { 397 self.write_wait_queue 398 .wakeup(Some(ProcessState::Blocked(true))); 399 } 400 401 // 读完后解锁并唤醒等待在读等待队列中的进程 402 self.read_wait_queue 403 .wakeup(Some(ProcessState::Blocked(true))); 404 405 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 406 // 唤醒epoll中等待的进程 407 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 408 409 // 返回写入的字节数 410 return Ok(len); 411 } 412 413 fn as_any_ref(&self) -> &dyn core::any::Any { 414 self 415 } 416 417 fn get_entry_name_and_metadata( 418 &self, 419 ino: crate::filesystem::vfs::InodeId, 420 ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> { 421 // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。 422 let name = self.get_entry_name(ino)?; 423 let entry = self.find(&name)?; 424 return Ok((name, entry.metadata()?)); 425 } 426 427 fn fs(&self) -> Arc<(dyn FileSystem)> { 428 todo!() 429 } 430 431 fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> { 432 return Err(SystemError::ENOSYS); 433 } 434 435 fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 436 return self.inner.lock().poll(private_data); 437 } 438 } 439