1 use crate::{ 2 arch::CurrentIrqArch, 3 exception::InterruptArch, 4 filesystem::vfs::{ 5 core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem, 6 FileType, IndexNode, Metadata, 7 }, 8 libs::{ 9 spinlock::{SpinLock, SpinLockGuard}, 10 wait_queue::WaitQueue, 11 }, 12 net::event_poll::{EPollEventType, EPollItem, EventPoll}, 13 process::ProcessState, 14 sched::{schedule, SchedMode}, 15 time::PosixTimeSpec, 16 }; 17 18 use alloc::{ 19 collections::LinkedList, 20 sync::{Arc, Weak}, 21 }; 22 use system_error::SystemError; 23 24 /// 我们设定pipe_buff的总大小为1024字节 25 const PIPE_BUFF_SIZE: usize = 1024; 26 27 #[derive(Debug, Clone)] 28 pub struct PipeFsPrivateData { 29 mode: FileMode, 30 } 31 32 impl PipeFsPrivateData { 33 pub fn new(mode: FileMode) -> Self { 34 return PipeFsPrivateData { mode }; 35 } 36 37 pub fn set_mode(&mut self, mode: FileMode) { 38 self.mode = mode; 39 } 40 } 41 42 /// @brief 管道文件i节点(锁) 43 #[derive(Debug)] 44 pub struct LockedPipeInode { 45 inner: SpinLock<InnerPipeInode>, 46 read_wait_queue: WaitQueue, 47 write_wait_queue: WaitQueue, 48 } 49 50 /// @brief 管道文件i节点(无锁) 51 #[derive(Debug)] 52 pub struct InnerPipeInode { 53 self_ref: Weak<LockedPipeInode>, 54 /// 管道内可读的数据数 55 valid_cnt: i32, 56 read_pos: i32, 57 write_pos: i32, 58 data: [u8; PIPE_BUFF_SIZE], 59 /// INode 元数据 60 metadata: Metadata, 61 reader: u32, 62 writer: u32, 63 epitems: SpinLock<LinkedList<Arc<EPollItem>>>, 64 } 65 66 impl InnerPipeInode { 67 pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 68 let mut events = EPollEventType::empty(); 69 70 let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data { 71 mode 72 } else { 73 return Err(SystemError::EBADFD); 74 }; 75 76 if mode.contains(FileMode::O_RDONLY) { 77 if self.valid_cnt != 0 { 78 // 有数据可读 79 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM); 80 } 81 82 // 没有写者 83 if self.writer == 0 { 84 events.insert(EPollEventType::EPOLLHUP) 85 } 86 } 87 88 if mode.contains(FileMode::O_WRONLY) { 89 // 管道内数据未满 90 if self.valid_cnt as usize != PIPE_BUFF_SIZE { 91 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM); 92 } 93 94 // 没有读者 95 if self.reader == 0 { 96 events.insert(EPollEventType::EPOLLERR); 97 } 98 } 99 100 Ok(events.bits() as usize) 101 } 102 103 pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> { 104 self.epitems.lock().push_back(epitem); 105 Ok(()) 106 } 107 } 108 109 impl LockedPipeInode { 110 pub fn new() -> Arc<Self> { 111 let inner = InnerPipeInode { 112 self_ref: Weak::default(), 113 valid_cnt: 0, 114 read_pos: 0, 115 write_pos: 0, 116 data: [0; PIPE_BUFF_SIZE], 117 118 metadata: Metadata { 119 dev_id: 0, 120 inode_id: generate_inode_id(), 121 size: PIPE_BUFF_SIZE as i64, 122 blk_size: 0, 123 blocks: 0, 124 atime: PosixTimeSpec::default(), 125 mtime: PosixTimeSpec::default(), 126 ctime: PosixTimeSpec::default(), 127 file_type: FileType::Pipe, 128 mode: ModeType::from_bits_truncate(0o666), 129 nlinks: 1, 130 uid: 0, 131 gid: 0, 132 raw_dev: Default::default(), 133 }, 134 reader: 0, 135 writer: 0, 136 epitems: SpinLock::new(LinkedList::new()), 137 }; 138 let result = Arc::new(Self { 139 inner: SpinLock::new(inner), 140 read_wait_queue: WaitQueue::default(), 141 write_wait_queue: WaitQueue::default(), 142 }); 143 let mut guard = result.inner.lock(); 144 guard.self_ref = Arc::downgrade(&result); 145 // 释放锁 146 drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁 147 return result; 148 } 149 150 pub fn inner(&self) -> &SpinLock<InnerPipeInode> { 151 &self.inner 152 } 153 } 154 155 impl IndexNode for LockedPipeInode { 156 fn read_at( 157 &self, 158 _offset: usize, 159 len: usize, 160 buf: &mut [u8], 161 data_guard: SpinLockGuard<FilePrivateData>, 162 ) -> Result<usize, SystemError> { 163 let data = data_guard.clone(); 164 drop(data_guard); 165 // 获取mode 166 let mode: FileMode; 167 if let FilePrivateData::Pipefs(pdata) = &data { 168 mode = pdata.mode; 169 } else { 170 return Err(SystemError::EBADF); 171 } 172 173 if buf.len() < len { 174 return Err(SystemError::EINVAL); 175 } 176 // 加锁 177 let mut inode = self.inner.lock(); 178 179 // 如果管道里面没有数据,则唤醒写端, 180 while inode.valid_cnt == 0 { 181 // 如果当前管道写者数为0,则返回EOF 182 if inode.writer == 0 { 183 return Ok(0); 184 } 185 186 self.write_wait_queue 187 .wakeup(Some(ProcessState::Blocked(true))); 188 189 // 如果为非阻塞管道,直接返回错误 190 if mode.contains(FileMode::O_NONBLOCK) { 191 drop(inode); 192 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 193 } 194 195 // 否则在读等待队列中睡眠,并释放锁 196 unsafe { 197 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 198 199 drop(inode); 200 self.read_wait_queue.sleep_without_schedule(); 201 drop(irq_guard); 202 } 203 schedule(SchedMode::SM_NONE); 204 inode = self.inner.lock(); 205 } 206 207 let mut num = inode.valid_cnt as usize; 208 //决定要输出的字节 209 let start = inode.read_pos as usize; 210 //如果读端希望读取的字节数大于有效字节数,则输出有效字节 211 let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE; 212 //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节 213 if len < inode.valid_cnt as usize { 214 end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE; 215 num = len; 216 } 217 218 // 从管道拷贝数据到用户的缓冲区 219 220 if end < start { 221 buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]); 222 buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]); 223 } else { 224 buf[0..num].copy_from_slice(&inode.data[start..end]); 225 } 226 227 //更新读位置以及valid_cnt 228 inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32; 229 inode.valid_cnt -= num as i32; 230 231 // 读完以后如果未读完,则唤醒下一个读者 232 if inode.valid_cnt > 0 { 233 self.read_wait_queue 234 .wakeup(Some(ProcessState::Blocked(true))); 235 } 236 237 //读完后解锁并唤醒等待在写等待队列中的进程 238 self.write_wait_queue 239 .wakeup(Some(ProcessState::Blocked(true))); 240 241 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 242 // 唤醒epoll中等待的进程 243 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 244 245 //返回读取的字节数 246 return Ok(num); 247 } 248 249 fn open( 250 &self, 251 mut data: SpinLockGuard<FilePrivateData>, 252 mode: &crate::filesystem::vfs::file::FileMode, 253 ) -> Result<(), SystemError> { 254 let mut guard = self.inner.lock(); 255 // 不能以读写方式打开管道 256 if mode.contains(FileMode::O_RDWR) { 257 return Err(SystemError::EACCES); 258 } 259 if mode.contains(FileMode::O_RDONLY) { 260 guard.reader += 1; 261 } 262 if mode.contains(FileMode::O_WRONLY) { 263 guard.writer += 1; 264 } 265 266 // 设置mode 267 *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode }); 268 269 return Ok(()); 270 } 271 272 fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> { 273 let inode = self.inner.lock(); 274 let mut metadata = inode.metadata.clone(); 275 metadata.size = inode.data.len() as i64; 276 277 return Ok(metadata); 278 } 279 280 fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> { 281 let mode: FileMode; 282 if let FilePrivateData::Pipefs(pipe_data) = &*data { 283 mode = pipe_data.mode; 284 } else { 285 return Err(SystemError::EBADF); 286 } 287 let mut guard = self.inner.lock(); 288 289 // 写端关闭 290 if mode.contains(FileMode::O_WRONLY) { 291 assert!(guard.writer > 0); 292 guard.writer -= 1; 293 // 如果已经没有写端了,则唤醒读端 294 if guard.writer == 0 { 295 self.read_wait_queue 296 .wakeup_all(Some(ProcessState::Blocked(true))); 297 } 298 } 299 300 // 读端关闭 301 if mode.contains(FileMode::O_RDONLY) { 302 assert!(guard.reader > 0); 303 guard.reader -= 1; 304 // 如果已经没有写端了,则唤醒读端 305 if guard.reader == 0 { 306 self.write_wait_queue 307 .wakeup_all(Some(ProcessState::Blocked(true))); 308 } 309 } 310 311 return Ok(()); 312 } 313 314 fn write_at( 315 &self, 316 _offset: usize, 317 len: usize, 318 buf: &[u8], 319 data: SpinLockGuard<FilePrivateData>, 320 ) -> Result<usize, SystemError> { 321 // 获取mode 322 let mode: FileMode; 323 if let FilePrivateData::Pipefs(pdata) = &*data { 324 mode = pdata.mode; 325 } else { 326 return Err(SystemError::EBADF); 327 } 328 329 if buf.len() < len || len > PIPE_BUFF_SIZE { 330 return Err(SystemError::EINVAL); 331 } 332 // 加锁 333 334 let mut inode = self.inner.lock(); 335 336 if inode.reader == 0 { 337 // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号 338 } 339 340 // 如果管道空间不够 341 342 while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE { 343 // 唤醒读端 344 self.read_wait_queue 345 .wakeup(Some(ProcessState::Blocked(true))); 346 347 // 如果为非阻塞管道,直接返回错误 348 if mode.contains(FileMode::O_NONBLOCK) { 349 drop(inode); 350 return Err(SystemError::ENOMEM); 351 } 352 353 // 解锁并睡眠 354 unsafe { 355 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 356 drop(inode); 357 self.write_wait_queue.sleep_without_schedule(); 358 drop(irq_guard); 359 } 360 schedule(SchedMode::SM_NONE); 361 inode = self.inner.lock(); 362 } 363 364 // 决定要输入的字节 365 let start = inode.write_pos as usize; 366 let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE; 367 // 从用户的缓冲区拷贝数据到管道 368 369 if end < start { 370 inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]); 371 inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]); 372 } else { 373 inode.data[start..end].copy_from_slice(&buf[0..len]); 374 } 375 // 更新写位置以及valid_cnt 376 inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32; 377 inode.valid_cnt += len as i32; 378 379 // 写完后还有位置,则唤醒下一个写者 380 if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE { 381 self.write_wait_queue 382 .wakeup(Some(ProcessState::Blocked(true))); 383 } 384 385 // 读完后解锁并唤醒等待在读等待队列中的进程 386 self.read_wait_queue 387 .wakeup(Some(ProcessState::Blocked(true))); 388 389 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 390 // 唤醒epoll中等待的进程 391 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 392 393 // 返回写入的字节数 394 return Ok(len); 395 } 396 397 fn as_any_ref(&self) -> &dyn core::any::Any { 398 self 399 } 400 401 fn get_entry_name_and_metadata( 402 &self, 403 ino: crate::filesystem::vfs::InodeId, 404 ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> { 405 // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。 406 let name = self.get_entry_name(ino)?; 407 let entry = self.find(&name)?; 408 return Ok((name, entry.metadata()?)); 409 } 410 411 fn fs(&self) -> Arc<(dyn FileSystem)> { 412 todo!() 413 } 414 415 fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> { 416 return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP); 417 } 418 419 fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 420 return self.inner.lock().poll(private_data); 421 } 422 } 423