1 use alloc::{ 2 collections::LinkedList, 3 sync::{Arc, Weak}, 4 }; 5 use core::hash::{Hash, Hasher}; 6 use core::{intrinsics::likely, sync::atomic::AtomicU64}; 7 use hashbrown::HashMap; 8 use system_error::SystemError; 9 10 use crate::{ 11 arch::{CurrentIrqArch, MMArch}, 12 exception::InterruptArch, 13 libs::spinlock::{SpinLock, SpinLockGuard}, 14 mm::{ucontext::AddressSpace, MemoryManagementArch, VirtAddr}, 15 process::{ProcessControlBlock, ProcessManager}, 16 sched::{schedule, SchedMode}, 17 syscall::user_access::UserBufferReader, 18 time::{ 19 timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper}, 20 TimeSpec, 21 }, 22 }; 23 24 use super::constant::*; 25 26 static mut FUTEX_DATA: Option<FutexData> = None; 27 28 pub struct FutexData { 29 data: SpinLock<HashMap<FutexKey, FutexHashBucket>>, 30 } 31 32 impl FutexData { 33 pub fn futex_map() -> SpinLockGuard<'static, HashMap<FutexKey, FutexHashBucket>> { 34 unsafe { FUTEX_DATA.as_ref().unwrap().data.lock() } 35 } 36 37 pub fn try_remove(key: &FutexKey) -> Option<FutexHashBucket> { 38 unsafe { 39 let mut guard = FUTEX_DATA.as_ref().unwrap().data.lock(); 40 if let Some(futex) = guard.get(key) { 41 if futex.chain.is_empty() { 42 return guard.remove(key); 43 } 44 } 45 } 46 None 47 } 48 } 49 50 pub struct Futex; 51 52 // 对于同一个futex的进程或线程将会在这个bucket等待 53 pub struct FutexHashBucket { 54 // 该futex维护的等待队列 55 chain: LinkedList<Arc<FutexObj>>, 56 } 57 58 impl FutexHashBucket { 59 /// ## 判断是否在bucket里 60 pub fn contains(&self, futex_q: &FutexObj) -> bool { 61 self.chain 62 .iter() 63 .filter(|x| futex_q.pcb.ptr_eq(&x.pcb) && x.key == futex_q.key) 64 .count() 65 != 0 66 } 67 68 /// 让futex_q在该bucket上挂起 69 /// 70 /// 进入该函数前,需要关中断 71 #[inline(always)] 72 pub fn sleep_no_sched(&mut self, futex_q: Arc<FutexObj>) -> Result<(), SystemError> { 73 assert!(!CurrentIrqArch::is_irq_enabled()); 74 self.chain.push_back(futex_q); 75 76 ProcessManager::mark_sleep(true)?; 77 78 Ok(()) 79 } 80 81 /// ## 唤醒队列中的最多nr_wake个进程 82 /// 83 /// return: 唤醒的进程数 84 #[inline(always)] 85 pub fn wake_up( 86 &mut self, 87 key: FutexKey, 88 bitset: Option<u32>, 89 nr_wake: u32, 90 ) -> Result<usize, SystemError> { 91 let mut count = 0; 92 let mut pop_count = 0; 93 while let Some(futex_q) = self.chain.pop_front() { 94 if futex_q.key == key { 95 // TODO: 考虑优先级继承的机制 96 97 if let Some(bitset) = bitset { 98 if futex_q.bitset != bitset { 99 self.chain.push_back(futex_q); 100 continue; 101 } 102 } 103 104 // 唤醒 105 if futex_q.pcb.upgrade().is_some() { 106 self.remove(futex_q.clone()); 107 ProcessManager::wakeup(&futex_q.pcb.upgrade().unwrap())?; 108 } 109 110 // 判断唤醒数 111 count += 1; 112 if count >= nr_wake { 113 break; 114 } 115 } else { 116 self.chain.push_back(futex_q); 117 } 118 // 判断是否循环完队列了 119 pop_count += 1; 120 if pop_count >= self.chain.len() { 121 break; 122 } 123 } 124 Ok(count as usize) 125 } 126 127 /// 将FutexObj从bucket中删除 128 pub fn remove(&mut self, futex: Arc<FutexObj>) { 129 self.chain 130 .extract_if(|x| Arc::ptr_eq(x, &futex)) 131 .for_each(drop); 132 } 133 } 134 135 #[derive(Debug)] 136 pub struct FutexObj { 137 pcb: Weak<ProcessControlBlock>, 138 key: FutexKey, 139 bitset: u32, 140 // TODO: 优先级继承 141 } 142 143 pub enum FutexAccess { 144 FutexRead, 145 FutexWrite, 146 } 147 148 #[allow(dead_code)] 149 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 150 /// ### 用于定位内核唯一的futex 151 pub enum InnerFutexKey { 152 Shared(SharedKey), 153 Private(PrivateKey), 154 } 155 156 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 157 pub struct FutexKey { 158 ptr: u64, 159 word: u64, 160 offset: u32, 161 key: InnerFutexKey, 162 } 163 164 /// 不同进程间通过文件共享futex变量,表明该变量在文件中的位置 165 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 166 pub struct SharedKey { 167 i_seq: u64, 168 page_offset: u64, 169 } 170 171 /// 同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置 172 #[derive(Clone, Debug)] 173 pub struct PrivateKey { 174 // 所在的地址空间 175 address_space: Option<Weak<AddressSpace>>, 176 // 表示所在页面的初始地址 177 address: u64, 178 } 179 180 impl Hash for PrivateKey { 181 fn hash<H: Hasher>(&self, state: &mut H) { 182 self.address.hash(state); 183 } 184 } 185 186 impl Eq for PrivateKey {} 187 188 impl PartialEq for PrivateKey { 189 fn eq(&self, other: &Self) -> bool { 190 if self.address_space.is_none() && other.address_space.is_none() { 191 return self.address == other.address; 192 } else { 193 return self 194 .address_space 195 .as_ref() 196 .unwrap_or(&Weak::default()) 197 .ptr_eq(other.address_space.as_ref().unwrap_or(&Weak::default())) 198 && self.address == other.address; 199 } 200 } 201 } 202 203 impl Futex { 204 /// ### 初始化FUTEX_DATA 205 pub fn init() { 206 unsafe { 207 FUTEX_DATA = Some(FutexData { 208 data: SpinLock::new(HashMap::new()), 209 }) 210 }; 211 } 212 213 /// ### 让当前进程在指定futex上等待直到futex_wake显式唤醒 214 pub fn futex_wait( 215 uaddr: VirtAddr, 216 flags: FutexFlag, 217 val: u32, 218 abs_time: Option<TimeSpec>, 219 bitset: u32, 220 ) -> Result<usize, SystemError> { 221 if bitset == 0 { 222 return Err(SystemError::EINVAL); 223 } 224 225 // 获取全局hash表的key值 226 let key = Self::get_futex_key( 227 uaddr, 228 flags.contains(FutexFlag::FLAGS_SHARED), 229 FutexAccess::FutexRead, 230 )?; 231 232 let mut futex_map_guard = FutexData::futex_map(); 233 let bucket = futex_map_guard.get_mut(&key); 234 let bucket_mut = match bucket { 235 Some(bucket) => bucket, 236 None => { 237 let bucket = FutexHashBucket { 238 chain: LinkedList::new(), 239 }; 240 futex_map_guard.insert(key.clone(), bucket); 241 futex_map_guard.get_mut(&key).unwrap() 242 } 243 }; 244 245 // 使用UserBuffer读取futex 246 let user_reader = 247 UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 248 249 // 从用户空间读取到futex的val 250 let mut uval = 0; 251 252 // 读取 253 // 这里只尝试一种方式去读取用户空间,与linux不太一致 254 // 对于linux,如果bucket被锁住时读取失败,将会将bucket解锁后重新读取 255 user_reader.copy_one_from_user::<u32>(&mut uval, 0)?; 256 257 // 不满足wait条件,返回错误 258 if uval != val { 259 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 260 } 261 262 let pcb = ProcessManager::current_pcb(); 263 // 创建超时计时器任务 264 let mut timer = None; 265 if let Some(time) = abs_time { 266 let wakeup_helper = WakeUpHelper::new(pcb.clone()); 267 268 let sec = time.tv_sec; 269 let nsec = time.tv_nsec; 270 let jiffies = next_n_us_timer_jiffies((nsec / 1000 + sec * 1_000_000) as u64); 271 272 let wake_up = Timer::new(wakeup_helper, jiffies); 273 274 wake_up.activate(); 275 timer = Some(wake_up); 276 } 277 278 let futex_q = Arc::new(FutexObj { 279 pcb: Arc::downgrade(&pcb), 280 key: key.clone(), 281 bitset, 282 }); 283 let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; 284 // 满足条件则将当前进程在该bucket上挂起 285 bucket_mut.sleep_no_sched(futex_q.clone()).map_err(|e| { 286 kwarn!("error:{e:?}"); 287 e 288 })?; 289 drop(futex_map_guard); 290 drop(irq_guard); 291 schedule(SchedMode::SM_NONE); 292 293 // 被唤醒后的检查 294 let mut futex_map_guard = FutexData::futex_map(); 295 let bucket = futex_map_guard.get_mut(&key); 296 let bucket_mut = match bucket { 297 // 如果该pcb不在链表里面了或者该链表已经被释放,就证明是正常的Wake操作 298 Some(bucket_mut) => { 299 if !bucket_mut.contains(&futex_q) { 300 // 取消定时器任务 301 if let Some(timer) = timer { 302 timer.cancel(); 303 } 304 return Ok(0); 305 } 306 // 非正常唤醒,返回交给下层 307 bucket_mut 308 } 309 None => { 310 // 取消定时器任务 311 if let Some(timer) = timer { 312 timer.cancel(); 313 } 314 return Ok(0); 315 } 316 }; 317 318 // 如果是超时唤醒,则返回错误 319 if timer.is_some() && timer.clone().unwrap().timeout() { 320 bucket_mut.remove(futex_q); 321 322 return Err(SystemError::ETIMEDOUT); 323 } 324 325 // TODO: 如果没有挂起的信号,则重新判断是否满足wait要求,重新进入wait 326 327 // 经过前面的几个判断,到这里之后, 328 // 当前进程被唤醒大概率是其他进程更改了uval,需要重新去判断当前进程是否满足wait 329 330 // 到这里之后,前面的唤醒条件都不满足,则是被信号唤醒 331 // 需要处理信号然后重启futex系统调用 332 333 // 取消定时器任务 334 if let Some(timer) = timer { 335 if !timer.timeout() { 336 timer.cancel(); 337 } 338 } 339 340 Ok(0) 341 } 342 343 // ### 唤醒指定futex上挂起的最多nr_wake个进程 344 pub fn futex_wake( 345 uaddr: VirtAddr, 346 flags: FutexFlag, 347 nr_wake: u32, 348 bitset: u32, 349 ) -> Result<usize, SystemError> { 350 if bitset == 0 { 351 return Err(SystemError::EINVAL); 352 } 353 354 // 获取futex_key,并且判断地址空间合法性 355 let key = Self::get_futex_key( 356 uaddr, 357 flags.contains(FutexFlag::FLAGS_SHARED), 358 FutexAccess::FutexRead, 359 )?; 360 let mut binding = FutexData::futex_map(); 361 let bucket_mut = binding.get_mut(&key).ok_or(SystemError::EINVAL)?; 362 363 // 确保后面的唤醒操作是有意义的 364 if bucket_mut.chain.is_empty() { 365 return Ok(0); 366 } 367 // 从队列中唤醒 368 let count = bucket_mut.wake_up(key.clone(), Some(bitset), nr_wake)?; 369 370 drop(binding); 371 372 FutexData::try_remove(&key); 373 374 Ok(count) 375 } 376 377 /// ### 唤醒制定uaddr1上的最多nr_wake个进程,然后将uaddr1最多nr_requeue个进程移动到uaddr2绑定的futex上 378 pub fn futex_requeue( 379 uaddr1: VirtAddr, 380 flags: FutexFlag, 381 uaddr2: VirtAddr, 382 nr_wake: i32, 383 nr_requeue: i32, 384 cmpval: Option<u32>, 385 requeue_pi: bool, 386 ) -> Result<usize, SystemError> { 387 if nr_requeue < 0 || nr_wake < 0 { 388 return Err(SystemError::EINVAL); 389 } 390 391 // 暂时不支持优先级继承 392 if requeue_pi { 393 return Err(SystemError::ENOSYS); 394 } 395 396 let key1 = Self::get_futex_key( 397 uaddr1, 398 flags.contains(FutexFlag::FLAGS_SHARED), 399 FutexAccess::FutexRead, 400 )?; 401 let key2 = Self::get_futex_key(uaddr2, flags.contains(FutexFlag::FLAGS_SHARED), { 402 match requeue_pi { 403 true => FutexAccess::FutexWrite, 404 false => FutexAccess::FutexRead, 405 } 406 })?; 407 408 if requeue_pi && key1 == key2 { 409 return Err(SystemError::EINVAL); 410 } 411 412 if likely(cmpval.is_some()) { 413 let uval_reader = 414 UserBufferReader::new(uaddr1.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 415 let curval = uval_reader.read_one_from_user::<u32>(0)?; 416 417 // 判断是否满足条件 418 if *curval != cmpval.unwrap() { 419 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 420 } 421 } 422 423 let mut futex_data_guard = FutexData::futex_map(); 424 if !requeue_pi { 425 // 唤醒nr_wake个进程 426 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 427 let ret = bucket_1_mut.wake_up(key1.clone(), None, nr_wake as u32)?; 428 // 将bucket1中最多nr_requeue个任务转移到bucket2 429 for _ in 0..nr_requeue { 430 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 431 let futex_q = bucket_1_mut.chain.pop_front(); 432 match futex_q { 433 Some(futex_q) => { 434 let bucket_2_mut = 435 futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?; 436 bucket_2_mut.chain.push_back(futex_q); 437 } 438 None => { 439 break; 440 } 441 } 442 } 443 444 return Ok(ret); 445 } else { 446 // 暂时不支持优先级继承 447 todo!() 448 } 449 } 450 451 /// ### 唤醒futex上的进程的同时进行一些操作 452 pub fn futex_wake_op( 453 uaddr1: VirtAddr, 454 flags: FutexFlag, 455 uaddr2: VirtAddr, 456 nr_wake: i32, 457 nr_wake2: i32, 458 op: i32, 459 ) -> Result<usize, SystemError> { 460 let key1 = Futex::get_futex_key( 461 uaddr1, 462 flags.contains(FutexFlag::FLAGS_SHARED), 463 FutexAccess::FutexRead, 464 )?; 465 let key2 = Futex::get_futex_key( 466 uaddr2, 467 flags.contains(FutexFlag::FLAGS_SHARED), 468 FutexAccess::FutexWrite, 469 )?; 470 471 let mut futex_data_guard = FutexData::futex_map(); 472 let bucket1 = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 473 let mut wake_count = 0; 474 475 // 唤醒uaddr1中的进程 476 wake_count += bucket1.wake_up(key1, None, nr_wake as u32)?; 477 478 match Self::futex_atomic_op_inuser(op as u32, uaddr2) { 479 Ok(ret) => { 480 // 操作成功则唤醒uaddr2中的进程 481 if ret { 482 let bucket2 = futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?; 483 wake_count += bucket2.wake_up(key2, None, nr_wake2 as u32)?; 484 } 485 } 486 Err(e) => { 487 // TODO:retry? 488 return Err(e); 489 } 490 } 491 492 Ok(wake_count) 493 } 494 495 fn get_futex_key( 496 uaddr: VirtAddr, 497 fshared: bool, 498 _access: FutexAccess, 499 ) -> Result<FutexKey, SystemError> { 500 let mut address = uaddr.data(); 501 502 // 计算相对页的偏移量 503 let offset = address & (MMArch::PAGE_SIZE - 1); 504 // 判断内存对齐 505 if uaddr.data() & (core::mem::size_of::<u32>() - 1) != 0 { 506 return Err(SystemError::EINVAL); 507 } 508 509 // 目前address指向所在页面的起始地址 510 address -= offset; 511 512 // 若不是进程间共享的futex,则返回Private 513 if !fshared { 514 return Ok(FutexKey { 515 ptr: 0, 516 word: 0, 517 offset: offset as u32, 518 key: InnerFutexKey::Private(PrivateKey { 519 address: address as u64, 520 address_space: None, 521 }), 522 }); 523 } 524 525 // 获取到地址所在地址空间 526 let address_space = AddressSpace::current()?; 527 // TODO: 判断是否为匿名映射,是匿名映射才返回PrivateKey 528 return Ok(FutexKey { 529 ptr: 0, 530 word: 0, 531 offset: offset as u32, 532 key: InnerFutexKey::Private(PrivateKey { 533 address: address as u64, 534 address_space: Some(Arc::downgrade(&address_space)), 535 }), 536 }); 537 538 // 未实现共享内存机制,贡献内存部分应该通过inode构建SharedKey 539 // todo!("Shared memory not implemented"); 540 } 541 542 pub fn futex_atomic_op_inuser(encoded_op: u32, uaddr: VirtAddr) -> Result<bool, SystemError> { 543 let op = FutexOP::from_bits((encoded_op & 0x70000000) >> 28).ok_or(SystemError::ENOSYS)?; 544 let cmp = 545 FutexOpCMP::from_bits((encoded_op & 0x0f000000) >> 24).ok_or(SystemError::ENOSYS)?; 546 547 let sign_extend32 = |value: u32, index: i32| { 548 let shift = (31 - index) as u8; 549 return (value << shift) >> shift; 550 }; 551 552 let mut oparg = sign_extend32((encoded_op & 0x00fff000) >> 12, 11); 553 let cmparg = sign_extend32(encoded_op & 0x00000fff, 11); 554 555 if (encoded_op & (FutexOP::FUTEX_OP_OPARG_SHIFT.bits() << 28) != 0) && oparg > 31 { 556 kwarn!( 557 "futex_wake_op: pid:{} tries to shift op by {}; fix this program", 558 ProcessManager::current_pcb().pid().data(), 559 oparg 560 ); 561 562 oparg &= 31; 563 } 564 565 // TODO: 这个汇编似乎是有问题的,目前不好测试 566 let old_val = Self::arch_futex_atomic_op_inuser(op, oparg, uaddr)?; 567 568 match cmp { 569 FutexOpCMP::FUTEX_OP_CMP_EQ => { 570 return Ok(cmparg == old_val); 571 } 572 FutexOpCMP::FUTEX_OP_CMP_NE => { 573 return Ok(cmparg != old_val); 574 } 575 FutexOpCMP::FUTEX_OP_CMP_LT => { 576 return Ok(cmparg < old_val); 577 } 578 FutexOpCMP::FUTEX_OP_CMP_LE => { 579 return Ok(cmparg <= old_val); 580 } 581 FutexOpCMP::FUTEX_OP_CMP_GE => { 582 return Ok(cmparg >= old_val); 583 } 584 FutexOpCMP::FUTEX_OP_CMP_GT => { 585 return Ok(cmparg > old_val); 586 } 587 _ => { 588 return Err(SystemError::ENOSYS); 589 } 590 } 591 } 592 593 /// ### 对futex进行操作 594 /// 595 /// 进入该方法会关闭中断保证修改的原子性,所以进入该方法前应确保中断锁已释放 596 /// 597 /// ### return uaddr原来的值 598 #[allow(unused_assignments)] 599 pub fn arch_futex_atomic_op_inuser( 600 op: FutexOP, 601 oparg: u32, 602 uaddr: VirtAddr, 603 ) -> Result<u32, SystemError> { 604 let guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; 605 606 let reader = 607 UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 608 609 let oldval = reader.read_one_from_user::<u32>(0)?; 610 611 let atomic_addr = AtomicU64::new(uaddr.data() as u64); 612 // 这个指针是指向指针的指针 613 let ptr = atomic_addr.as_ptr(); 614 match op { 615 FutexOP::FUTEX_OP_SET => unsafe { 616 *((*ptr) as *mut u32) = oparg; 617 }, 618 FutexOP::FUTEX_OP_ADD => unsafe { 619 *((*ptr) as *mut u32) += oparg; 620 }, 621 FutexOP::FUTEX_OP_OR => unsafe { 622 *((*ptr) as *mut u32) |= oparg; 623 }, 624 FutexOP::FUTEX_OP_ANDN => unsafe { 625 *((*ptr) as *mut u32) &= oparg; 626 }, 627 FutexOP::FUTEX_OP_XOR => unsafe { 628 *((*ptr) as *mut u32) ^= oparg; 629 }, 630 _ => return Err(SystemError::ENOSYS), 631 } 632 633 drop(guard); 634 635 Ok(*oldval) 636 } 637 } 638