1 use crate::{ 2 arch::{sched::sched, CurrentIrqArch}, 3 exception::InterruptArch, 4 filesystem::vfs::{ 5 core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem, 6 FileType, IndexNode, Metadata, 7 }, 8 libs::{spinlock::SpinLock, wait_queue::WaitQueue}, 9 net::event_poll::{EPollEventType, EPollItem, EventPoll}, 10 process::ProcessState, 11 time::TimeSpec, 12 }; 13 14 use alloc::{ 15 collections::LinkedList, 16 sync::{Arc, Weak}, 17 }; 18 use system_error::SystemError; 19 20 /// 我们设定pipe_buff的总大小为1024字节 21 const PIPE_BUFF_SIZE: usize = 1024; 22 23 #[derive(Debug, Clone)] 24 pub struct PipeFsPrivateData { 25 mode: FileMode, 26 } 27 28 impl PipeFsPrivateData { 29 pub fn new(mode: FileMode) -> Self { 30 return PipeFsPrivateData { mode: mode }; 31 } 32 33 pub fn set_mode(&mut self, mode: FileMode) { 34 self.mode = mode; 35 } 36 } 37 38 /// @brief 管道文件i节点(锁) 39 #[derive(Debug)] 40 pub struct LockedPipeInode(SpinLock<InnerPipeInode>); 41 42 /// @brief 管道文件i节点(无锁) 43 #[derive(Debug)] 44 pub struct InnerPipeInode { 45 self_ref: Weak<LockedPipeInode>, 46 /// 管道内可读的数据数 47 valid_cnt: i32, 48 read_pos: i32, 49 write_pos: i32, 50 read_wait_queue: WaitQueue, 51 write_wait_queue: WaitQueue, 52 data: [u8; PIPE_BUFF_SIZE], 53 /// INode 元数据 54 metadata: Metadata, 55 reader: u32, 56 writer: u32, 57 epitems: SpinLock<LinkedList<Arc<EPollItem>>>, 58 } 59 60 impl InnerPipeInode { 61 pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 62 let mut events = EPollEventType::empty(); 63 64 let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data { 65 mode 66 } else { 67 return Err(SystemError::EBADFD); 68 }; 69 70 if mode.contains(FileMode::O_RDONLY) { 71 if self.valid_cnt != 0 { 72 // 有数据可读 73 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM); 74 } 75 76 // 没有写者 77 if self.writer == 0 { 78 events.insert(EPollEventType::EPOLLHUP) 79 } 80 } 81 82 if mode.contains(FileMode::O_WRONLY) { 83 // 管道内数据未满 84 if self.valid_cnt as usize != PIPE_BUFF_SIZE { 85 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM); 86 } 87 88 // 没有读者 89 if self.reader == 0 { 90 events.insert(EPollEventType::EPOLLERR); 91 } 92 } 93 94 Ok(events.bits() as usize) 95 } 96 97 pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> { 98 self.epitems.lock().push_back(epitem); 99 Ok(()) 100 } 101 } 102 103 impl LockedPipeInode { 104 pub fn new() -> Arc<Self> { 105 let inner = InnerPipeInode { 106 self_ref: Weak::default(), 107 valid_cnt: 0, 108 read_pos: 0, 109 write_pos: 0, 110 read_wait_queue: WaitQueue::INIT, 111 write_wait_queue: WaitQueue::INIT, 112 data: [0; PIPE_BUFF_SIZE], 113 114 metadata: Metadata { 115 dev_id: 0, 116 inode_id: generate_inode_id(), 117 size: PIPE_BUFF_SIZE as i64, 118 blk_size: 0, 119 blocks: 0, 120 atime: TimeSpec::default(), 121 mtime: TimeSpec::default(), 122 ctime: TimeSpec::default(), 123 file_type: FileType::Pipe, 124 mode: ModeType::from_bits_truncate(0o666), 125 nlinks: 1, 126 uid: 0, 127 gid: 0, 128 raw_dev: 0, 129 }, 130 reader: 0, 131 writer: 0, 132 epitems: SpinLock::new(LinkedList::new()), 133 }; 134 let result = Arc::new(Self(SpinLock::new(inner))); 135 let mut guard = result.0.lock(); 136 guard.self_ref = Arc::downgrade(&result); 137 // 释放锁 138 drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁 139 return result; 140 } 141 142 pub fn inner(&self) -> &SpinLock<InnerPipeInode> { 143 &self.0 144 } 145 } 146 147 impl IndexNode for LockedPipeInode { 148 fn read_at( 149 &self, 150 _offset: usize, 151 len: usize, 152 buf: &mut [u8], 153 data: &mut FilePrivateData, 154 ) -> Result<usize, SystemError> { 155 // 获取mode 156 let mode: FileMode; 157 if let FilePrivateData::Pipefs(pdata) = data { 158 mode = pdata.mode; 159 } else { 160 return Err(SystemError::EBADF); 161 } 162 163 if buf.len() < len { 164 return Err(SystemError::EINVAL); 165 } 166 // 加锁 167 let mut inode = self.0.lock(); 168 169 // 如果管道里面没有数据,则唤醒写端, 170 while inode.valid_cnt == 0 { 171 // 如果当前管道写者数为0,则返回EOF 172 if inode.writer == 0 { 173 return Ok(0); 174 } 175 176 inode 177 .write_wait_queue 178 .wakeup(Some(ProcessState::Blocked(true))); 179 180 // 如果为非阻塞管道,直接返回错误 181 if mode.contains(FileMode::O_NONBLOCK) { 182 drop(inode); 183 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 184 } 185 186 // 否则在读等待队列中睡眠,并释放锁 187 unsafe { 188 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 189 190 inode.read_wait_queue.sleep_without_schedule(); 191 drop(inode); 192 193 drop(irq_guard); 194 } 195 sched(); 196 inode = self.0.lock(); 197 } 198 199 let mut num = inode.valid_cnt as usize; 200 //决定要输出的字节 201 let start = inode.read_pos as usize; 202 //如果读端希望读取的字节数大于有效字节数,则输出有效字节 203 let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE; 204 //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节 205 if len < inode.valid_cnt as usize { 206 end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE; 207 num = len; 208 } 209 210 // 从管道拷贝数据到用户的缓冲区 211 212 if end < start { 213 buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]); 214 buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]); 215 } else { 216 buf[0..num].copy_from_slice(&inode.data[start..end]); 217 } 218 219 //更新读位置以及valid_cnt 220 inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32; 221 inode.valid_cnt -= num as i32; 222 223 // 读完以后如果未读完,则唤醒下一个读者 224 if inode.valid_cnt > 0 { 225 inode 226 .read_wait_queue 227 .wakeup(Some(ProcessState::Blocked(true))); 228 } 229 230 //读完后解锁并唤醒等待在写等待队列中的进程 231 inode 232 .write_wait_queue 233 .wakeup(Some(ProcessState::Blocked(true))); 234 235 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 236 // 唤醒epoll中等待的进程 237 EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?; 238 239 //返回读取的字节数 240 return Ok(num); 241 } 242 243 fn open( 244 &self, 245 data: &mut FilePrivateData, 246 mode: &crate::filesystem::vfs::file::FileMode, 247 ) -> Result<(), SystemError> { 248 let mut guard = self.0.lock(); 249 // 不能以读写方式打开管道 250 if mode.contains(FileMode::O_RDWR) { 251 return Err(SystemError::EACCES); 252 } 253 if mode.contains(FileMode::O_RDONLY) { 254 guard.reader += 1; 255 } 256 if mode.contains(FileMode::O_WRONLY) { 257 guard.writer += 1; 258 } 259 260 // 设置mode 261 *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode }); 262 263 return Ok(()); 264 } 265 266 fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> { 267 let inode = self.0.lock(); 268 let mut metadata = inode.metadata.clone(); 269 metadata.size = inode.data.len() as i64; 270 271 return Ok(metadata); 272 } 273 274 fn close(&self, data: &mut FilePrivateData) -> Result<(), SystemError> { 275 let mode: FileMode; 276 if let FilePrivateData::Pipefs(pipe_data) = data { 277 mode = pipe_data.mode; 278 } else { 279 return Err(SystemError::EBADF); 280 } 281 let mut guard = self.0.lock(); 282 283 // 写端关闭 284 if mode.contains(FileMode::O_WRONLY) { 285 assert!(guard.writer > 0); 286 guard.writer -= 1; 287 // 如果已经没有写端了,则唤醒读端 288 if guard.writer == 0 { 289 guard 290 .read_wait_queue 291 .wakeup_all(Some(ProcessState::Blocked(true))); 292 } 293 } 294 295 // 读端关闭 296 if mode.contains(FileMode::O_RDONLY) { 297 assert!(guard.reader > 0); 298 guard.reader -= 1; 299 // 如果已经没有写端了,则唤醒读端 300 if guard.reader == 0 { 301 guard 302 .write_wait_queue 303 .wakeup_all(Some(ProcessState::Blocked(true))); 304 } 305 } 306 307 return Ok(()); 308 } 309 310 fn write_at( 311 &self, 312 _offset: usize, 313 len: usize, 314 buf: &[u8], 315 data: &mut FilePrivateData, 316 ) -> Result<usize, SystemError> { 317 // 获取mode 318 let mode: FileMode; 319 if let FilePrivateData::Pipefs(pdata) = data { 320 mode = pdata.mode; 321 } else { 322 return Err(SystemError::EBADF); 323 } 324 325 if buf.len() < len || len > PIPE_BUFF_SIZE { 326 return Err(SystemError::EINVAL); 327 } 328 // 加锁 329 330 let mut inode = self.0.lock(); 331 332 // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号 333 if inode.reader == 0 {} 334 335 // 如果管道空间不够 336 337 while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE { 338 // 唤醒读端 339 inode 340 .read_wait_queue 341 .wakeup(Some(ProcessState::Blocked(true))); 342 343 // 如果为非阻塞管道,直接返回错误 344 if mode.contains(FileMode::O_NONBLOCK) { 345 drop(inode); 346 return Err(SystemError::ENOMEM); 347 } 348 349 // 解锁并睡眠 350 unsafe { 351 let irq_guard = CurrentIrqArch::save_and_disable_irq(); 352 inode.write_wait_queue.sleep_without_schedule(); 353 drop(inode); 354 drop(irq_guard); 355 } 356 sched(); 357 inode = self.0.lock(); 358 } 359 360 // 决定要输入的字节 361 let start = inode.write_pos as usize; 362 let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE; 363 // 从用户的缓冲区拷贝数据到管道 364 365 if end < start { 366 inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]); 367 inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]); 368 } else { 369 inode.data[start..end].copy_from_slice(&buf[0..len]); 370 } 371 // 更新写位置以及valid_cnt 372 inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32; 373 inode.valid_cnt += len as i32; 374 375 // 写完后还有位置,则唤醒下一个写者 376 if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE { 377 inode 378 .write_wait_queue 379 .wakeup(Some(ProcessState::Blocked(true))); 380 } 381 382 // 读完后解锁并唤醒等待在读等待队列中的进程 383 inode 384 .read_wait_queue 385 .wakeup(Some(ProcessState::Blocked(true))); 386 387 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 388 // 唤醒epoll中等待的进程 389 EventPoll::wakeup_epoll(&mut inode.epitems, pollflag)?; 390 391 // 返回写入的字节数 392 return Ok(len); 393 } 394 395 fn as_any_ref(&self) -> &dyn core::any::Any { 396 self 397 } 398 399 fn get_entry_name_and_metadata( 400 &self, 401 ino: crate::filesystem::vfs::InodeId, 402 ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> { 403 // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。 404 let name = self.get_entry_name(ino)?; 405 let entry = self.find(&name)?; 406 return Ok((name, entry.metadata()?)); 407 } 408 409 fn fs(&self) -> Arc<(dyn FileSystem)> { 410 todo!() 411 } 412 413 fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> { 414 return Err(SystemError::EOPNOTSUPP_OR_ENOTSUP); 415 } 416 417 fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 418 return self.0.lock().poll(private_data); 419 } 420 } 421