1 use crate::{ 2 filesystem::vfs::{ 3 core::generate_inode_id, file::FileMode, syscall::ModeType, FilePrivateData, FileSystem, 4 FileType, IndexNode, Metadata, 5 }, 6 libs::{ 7 spinlock::{SpinLock, SpinLockGuard}, 8 wait_queue::WaitQueue, 9 }, 10 net::event_poll::{EPollEventType, EPollItem, EventPoll}, 11 process::ProcessState, 12 sched::SchedMode, 13 time::PosixTimeSpec, 14 }; 15 16 use alloc::{ 17 collections::LinkedList, 18 sync::{Arc, Weak}, 19 vec::Vec, 20 }; 21 use system_error::SystemError; 22 23 /// 我们设定pipe_buff的总大小为1024字节 24 const PIPE_BUFF_SIZE: usize = 1024; 25 26 #[derive(Debug, Clone)] 27 pub struct PipeFsPrivateData { 28 mode: FileMode, 29 } 30 31 impl PipeFsPrivateData { 32 pub fn new(mode: FileMode) -> Self { 33 return PipeFsPrivateData { mode }; 34 } 35 36 pub fn set_mode(&mut self, mode: FileMode) { 37 self.mode = mode; 38 } 39 } 40 41 /// @brief 管道文件i节点(锁) 42 #[derive(Debug)] 43 pub struct LockedPipeInode { 44 inner: SpinLock<InnerPipeInode>, 45 read_wait_queue: WaitQueue, 46 write_wait_queue: WaitQueue, 47 } 48 49 /// @brief 管道文件i节点(无锁) 50 #[derive(Debug)] 51 pub struct InnerPipeInode { 52 self_ref: Weak<LockedPipeInode>, 53 /// 管道内可读的数据数 54 valid_cnt: i32, 55 read_pos: i32, 56 write_pos: i32, 57 data: [u8; PIPE_BUFF_SIZE], 58 /// INode 元数据 59 metadata: Metadata, 60 reader: u32, 61 writer: u32, 62 epitems: SpinLock<LinkedList<Arc<EPollItem>>>, 63 } 64 65 impl InnerPipeInode { 66 pub fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 67 let mut events = EPollEventType::empty(); 68 69 let mode = if let FilePrivateData::Pipefs(PipeFsPrivateData { mode }) = private_data { 70 mode 71 } else { 72 return Err(SystemError::EBADFD); 73 }; 74 75 if mode.contains(FileMode::O_RDONLY) { 76 if self.valid_cnt != 0 { 77 // 有数据可读 78 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLRDNORM); 79 } 80 81 // 没有写者 82 if self.writer == 0 { 83 events.insert(EPollEventType::EPOLLHUP) 84 } 85 } 86 87 if mode.contains(FileMode::O_WRONLY) { 88 // 管道内数据未满 89 if self.valid_cnt as usize != PIPE_BUFF_SIZE { 90 events.insert(EPollEventType::EPOLLIN & EPollEventType::EPOLLWRNORM); 91 } 92 93 // 没有读者 94 if self.reader == 0 { 95 events.insert(EPollEventType::EPOLLERR); 96 } 97 } 98 99 Ok(events.bits() as usize) 100 } 101 102 pub fn add_epoll(&mut self, epitem: Arc<EPollItem>) -> Result<(), SystemError> { 103 self.epitems.lock().push_back(epitem); 104 Ok(()) 105 } 106 107 fn buf_full(&self) -> bool { 108 return self.valid_cnt as usize == PIPE_BUFF_SIZE; 109 } 110 111 pub fn remove_epoll(&self, epoll: &Weak<SpinLock<EventPoll>>) -> Result<(), SystemError> { 112 let is_remove = !self 113 .epitems 114 .lock_irqsave() 115 .extract_if(|x| x.epoll().ptr_eq(epoll)) 116 .collect::<Vec<_>>() 117 .is_empty(); 118 119 if is_remove { 120 return Ok(()); 121 } 122 123 Err(SystemError::ENOENT) 124 } 125 } 126 127 impl LockedPipeInode { 128 pub fn new() -> Arc<Self> { 129 let inner = InnerPipeInode { 130 self_ref: Weak::default(), 131 valid_cnt: 0, 132 read_pos: 0, 133 write_pos: 0, 134 data: [0; PIPE_BUFF_SIZE], 135 136 metadata: Metadata { 137 dev_id: 0, 138 inode_id: generate_inode_id(), 139 size: PIPE_BUFF_SIZE as i64, 140 blk_size: 0, 141 blocks: 0, 142 atime: PosixTimeSpec::default(), 143 mtime: PosixTimeSpec::default(), 144 ctime: PosixTimeSpec::default(), 145 file_type: FileType::Pipe, 146 mode: ModeType::from_bits_truncate(0o666), 147 nlinks: 1, 148 uid: 0, 149 gid: 0, 150 raw_dev: Default::default(), 151 }, 152 reader: 0, 153 writer: 0, 154 epitems: SpinLock::new(LinkedList::new()), 155 }; 156 let result = Arc::new(Self { 157 inner: SpinLock::new(inner), 158 read_wait_queue: WaitQueue::default(), 159 write_wait_queue: WaitQueue::default(), 160 }); 161 let mut guard = result.inner.lock(); 162 guard.self_ref = Arc::downgrade(&result); 163 // 释放锁 164 drop(guard); //这一步其实不需要,只要离开作用域,guard生命周期结束,自会解锁 165 return result; 166 } 167 168 pub fn inner(&self) -> &SpinLock<InnerPipeInode> { 169 &self.inner 170 } 171 172 fn readable(&self) -> bool { 173 let inode = self.inner.lock(); 174 return inode.valid_cnt > 0 || inode.writer == 0; 175 } 176 177 fn writeable(&self) -> bool { 178 let inode = self.inner.lock(); 179 return !inode.buf_full() || inode.reader == 0; 180 } 181 } 182 183 impl IndexNode for LockedPipeInode { 184 fn read_at( 185 &self, 186 _offset: usize, 187 len: usize, 188 buf: &mut [u8], 189 data_guard: SpinLockGuard<FilePrivateData>, 190 ) -> Result<usize, SystemError> { 191 let data = data_guard.clone(); 192 drop(data_guard); 193 // 获取mode 194 let mode: FileMode; 195 if let FilePrivateData::Pipefs(pdata) = &data { 196 mode = pdata.mode; 197 } else { 198 return Err(SystemError::EBADF); 199 } 200 201 if buf.len() < len { 202 return Err(SystemError::EINVAL); 203 } 204 // log::debug!("pipe mode: {:?}", mode); 205 // 加锁 206 let mut inode = self.inner.lock(); 207 208 // 如果管道里面没有数据,则唤醒写端, 209 while inode.valid_cnt == 0 { 210 // 如果当前管道写者数为0,则返回EOF 211 if inode.writer == 0 { 212 return Ok(0); 213 } 214 215 self.write_wait_queue 216 .wakeup(Some(ProcessState::Blocked(true))); 217 218 // 如果为非阻塞管道,直接返回错误 219 if mode.contains(FileMode::O_NONBLOCK) { 220 drop(inode); 221 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 222 } 223 224 // 否则在读等待队列中睡眠,并释放锁 225 drop(inode); 226 let r = wq_wait_event_interruptible!(self.read_wait_queue, self.readable(), {}); 227 if r.is_err() { 228 return Err(SystemError::ERESTARTSYS); 229 } 230 231 inode = self.inner.lock(); 232 } 233 234 let mut num = inode.valid_cnt as usize; 235 //决定要输出的字节 236 let start = inode.read_pos as usize; 237 //如果读端希望读取的字节数大于有效字节数,则输出有效字节 238 let mut end = (inode.valid_cnt as usize + inode.read_pos as usize) % PIPE_BUFF_SIZE; 239 //如果读端希望读取的字节数少于有效字节数,则输出希望读取的字节 240 if len < inode.valid_cnt as usize { 241 end = (len + inode.read_pos as usize) % PIPE_BUFF_SIZE; 242 num = len; 243 } 244 245 // 从管道拷贝数据到用户的缓冲区 246 247 if end < start { 248 buf[0..(PIPE_BUFF_SIZE - start)].copy_from_slice(&inode.data[start..PIPE_BUFF_SIZE]); 249 buf[(PIPE_BUFF_SIZE - start)..num].copy_from_slice(&inode.data[0..end]); 250 } else { 251 buf[0..num].copy_from_slice(&inode.data[start..end]); 252 } 253 254 //更新读位置以及valid_cnt 255 inode.read_pos = (inode.read_pos + num as i32) % PIPE_BUFF_SIZE as i32; 256 inode.valid_cnt -= num as i32; 257 258 // 读完以后如果未读完,则唤醒下一个读者 259 if inode.valid_cnt > 0 { 260 self.read_wait_queue 261 .wakeup(Some(ProcessState::Blocked(true))); 262 } 263 264 //读完后解锁并唤醒等待在写等待队列中的进程 265 self.write_wait_queue 266 .wakeup(Some(ProcessState::Blocked(true))); 267 268 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 269 // 唤醒epoll中等待的进程 270 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 271 272 //返回读取的字节数 273 return Ok(num); 274 } 275 276 fn open( 277 &self, 278 mut data: SpinLockGuard<FilePrivateData>, 279 mode: &crate::filesystem::vfs::file::FileMode, 280 ) -> Result<(), SystemError> { 281 let mut guard = self.inner.lock(); 282 // 不能以读写方式打开管道 283 if mode.contains(FileMode::O_RDWR) { 284 return Err(SystemError::EACCES); 285 } 286 if mode.contains(FileMode::O_RDONLY) { 287 guard.reader += 1; 288 } 289 if mode.contains(FileMode::O_WRONLY) { 290 guard.writer += 1; 291 } 292 293 // 设置mode 294 *data = FilePrivateData::Pipefs(PipeFsPrivateData { mode: *mode }); 295 296 return Ok(()); 297 } 298 299 fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> { 300 let inode = self.inner.lock(); 301 let mut metadata = inode.metadata.clone(); 302 metadata.size = inode.data.len() as i64; 303 304 return Ok(metadata); 305 } 306 307 fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> { 308 let mode: FileMode; 309 if let FilePrivateData::Pipefs(pipe_data) = &*data { 310 mode = pipe_data.mode; 311 } else { 312 return Err(SystemError::EBADF); 313 } 314 let mut guard = self.inner.lock(); 315 316 // 写端关闭 317 if mode.contains(FileMode::O_WRONLY) { 318 assert!(guard.writer > 0); 319 guard.writer -= 1; 320 // 如果已经没有写端了,则唤醒读端 321 if guard.writer == 0 { 322 self.read_wait_queue 323 .wakeup_all(Some(ProcessState::Blocked(true))); 324 } 325 } 326 327 // 读端关闭 328 if mode.contains(FileMode::O_RDONLY) { 329 assert!(guard.reader > 0); 330 guard.reader -= 1; 331 // 如果已经没有写端了,则唤醒读端 332 if guard.reader == 0 { 333 self.write_wait_queue 334 .wakeup_all(Some(ProcessState::Blocked(true))); 335 } 336 } 337 338 return Ok(()); 339 } 340 341 fn write_at( 342 &self, 343 _offset: usize, 344 len: usize, 345 buf: &[u8], 346 data: SpinLockGuard<FilePrivateData>, 347 ) -> Result<usize, SystemError> { 348 // 获取mode 349 let mode: FileMode; 350 if let FilePrivateData::Pipefs(pdata) = &*data { 351 mode = pdata.mode; 352 } else { 353 return Err(SystemError::EBADF); 354 } 355 356 if buf.len() < len || len > PIPE_BUFF_SIZE { 357 return Err(SystemError::EINVAL); 358 } 359 // 加锁 360 361 let mut inode = self.inner.lock(); 362 363 if inode.reader == 0 { 364 // TODO: 如果已经没有读端存在了,则向写端进程发送SIGPIPE信号 365 } 366 367 // 如果管道空间不够 368 369 while len + inode.valid_cnt as usize > PIPE_BUFF_SIZE { 370 // 唤醒读端 371 self.read_wait_queue 372 .wakeup(Some(ProcessState::Blocked(true))); 373 374 // 如果为非阻塞管道,直接返回错误 375 if mode.contains(FileMode::O_NONBLOCK) { 376 drop(inode); 377 return Err(SystemError::ENOMEM); 378 } 379 380 // 解锁并睡眠 381 drop(inode); 382 let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {}); 383 if r.is_err() { 384 return Err(SystemError::ERESTARTSYS); 385 } 386 inode = self.inner.lock(); 387 } 388 389 // 决定要输入的字节 390 let start = inode.write_pos as usize; 391 let end = (inode.write_pos as usize + len) % PIPE_BUFF_SIZE; 392 // 从用户的缓冲区拷贝数据到管道 393 394 if end < start { 395 inode.data[start..PIPE_BUFF_SIZE].copy_from_slice(&buf[0..(PIPE_BUFF_SIZE - start)]); 396 inode.data[0..end].copy_from_slice(&buf[(PIPE_BUFF_SIZE - start)..len]); 397 } else { 398 inode.data[start..end].copy_from_slice(&buf[0..len]); 399 } 400 // 更新写位置以及valid_cnt 401 inode.write_pos = (inode.write_pos + len as i32) % PIPE_BUFF_SIZE as i32; 402 inode.valid_cnt += len as i32; 403 404 // 写完后还有位置,则唤醒下一个写者 405 if (inode.valid_cnt as usize) < PIPE_BUFF_SIZE { 406 self.write_wait_queue 407 .wakeup(Some(ProcessState::Blocked(true))); 408 } 409 410 // 读完后解锁并唤醒等待在读等待队列中的进程 411 self.read_wait_queue 412 .wakeup(Some(ProcessState::Blocked(true))); 413 414 let pollflag = EPollEventType::from_bits_truncate(inode.poll(&data)? as u32); 415 // 唤醒epoll中等待的进程 416 EventPoll::wakeup_epoll(&inode.epitems, pollflag)?; 417 418 // 返回写入的字节数 419 return Ok(len); 420 } 421 422 fn as_any_ref(&self) -> &dyn core::any::Any { 423 self 424 } 425 426 fn get_entry_name_and_metadata( 427 &self, 428 ino: crate::filesystem::vfs::InodeId, 429 ) -> Result<(alloc::string::String, crate::filesystem::vfs::Metadata), SystemError> { 430 // 如果有条件,请在文件系统中使用高效的方式实现本接口,而不是依赖这个低效率的默认实现。 431 let name = self.get_entry_name(ino)?; 432 let entry = self.find(&name)?; 433 return Ok((name, entry.metadata()?)); 434 } 435 436 fn fs(&self) -> Arc<(dyn FileSystem)> { 437 todo!() 438 } 439 440 fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> { 441 return Err(SystemError::ENOSYS); 442 } 443 444 fn poll(&self, private_data: &FilePrivateData) -> Result<usize, SystemError> { 445 return self.inner.lock().poll(private_data); 446 } 447 } 448