1 use alloc::{ 2 collections::LinkedList, 3 sync::{Arc, Weak}, 4 }; 5 use core::hash::{Hash, Hasher}; 6 use core::{intrinsics::likely, sync::atomic::AtomicU64}; 7 use hashbrown::HashMap; 8 9 use crate::{ 10 arch::{sched::sched, CurrentIrqArch, MMArch}, 11 exception::InterruptArch, 12 libs::spinlock::{SpinLock, SpinLockGuard}, 13 mm::{ucontext::AddressSpace, MemoryManagementArch, VirtAddr}, 14 process::{ProcessControlBlock, ProcessManager}, 15 syscall::{user_access::UserBufferReader, SystemError}, 16 time::{ 17 timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper}, 18 TimeSpec, 19 }, 20 }; 21 22 use super::constant::*; 23 24 static mut FUTEX_DATA: Option<FutexData> = None; 25 26 pub struct FutexData { 27 data: SpinLock<HashMap<FutexKey, FutexHashBucket>>, 28 } 29 30 impl FutexData { 31 pub fn futex_map() -> SpinLockGuard<'static, HashMap<FutexKey, FutexHashBucket>> { 32 unsafe { FUTEX_DATA.as_ref().unwrap().data.lock() } 33 } 34 35 pub fn try_remove(key: &FutexKey) -> Option<FutexHashBucket> { 36 unsafe { 37 let mut guard = FUTEX_DATA.as_ref().unwrap().data.lock(); 38 if let Some(futex) = guard.get(key) { 39 if futex.chain.is_empty() { 40 return guard.remove(key); 41 } 42 } 43 } 44 None 45 } 46 } 47 48 pub struct Futex; 49 50 // 对于同一个futex的进程或线程将会在这个bucket等待 51 pub struct FutexHashBucket { 52 // 该futex维护的等待队列 53 chain: LinkedList<Arc<FutexObj>>, 54 } 55 56 impl FutexHashBucket { 57 /// ## 判断是否在bucket里 58 pub fn contains(&self, futex_q: &FutexObj) -> bool { 59 self.chain 60 .iter() 61 .filter(|x| futex_q.pcb.ptr_eq(&x.pcb) && x.key == futex_q.key) 62 .count() 63 != 0 64 } 65 66 /// 让futex_q在该bucket上挂起 67 /// 68 /// 进入该函数前,需要关中断 69 #[inline(always)] 70 pub fn sleep_no_sched(&mut self, futex_q: Arc<FutexObj>) -> Result<(), SystemError> { 71 assert!(CurrentIrqArch::is_irq_enabled() == false); 72 self.chain.push_back(futex_q); 73 74 ProcessManager::mark_sleep(true)?; 75 76 Ok(()) 77 } 78 79 /// ## 唤醒队列中的最多nr_wake个进程 80 /// 81 /// return: 唤醒的进程数 82 #[inline(always)] 83 pub fn wake_up( 84 &mut self, 85 key: FutexKey, 86 bitset: Option<u32>, 87 nr_wake: u32, 88 ) -> Result<usize, SystemError> { 89 let mut count = 0; 90 let mut pop_count = 0; 91 while let Some(futex_q) = self.chain.pop_front() { 92 if futex_q.key == key { 93 // TODO: 考虑优先级继承的机制 94 95 if let Some(bitset) = bitset { 96 if futex_q.bitset != bitset { 97 self.chain.push_back(futex_q); 98 continue; 99 } 100 } 101 102 // 唤醒 103 if futex_q.pcb.upgrade().is_some() { 104 self.remove(futex_q.clone()); 105 ProcessManager::wakeup(&futex_q.pcb.upgrade().unwrap())?; 106 } 107 108 // 判断唤醒数 109 count += 1; 110 if count >= nr_wake { 111 break; 112 } 113 } else { 114 self.chain.push_back(futex_q); 115 } 116 // 判断是否循环完队列了 117 pop_count += 1; 118 if pop_count >= self.chain.len() { 119 break; 120 } 121 } 122 Ok(count as usize) 123 } 124 125 /// 将FutexObj从bucket中删除 126 pub fn remove(&mut self, futex: Arc<FutexObj>) { 127 self.chain 128 .extract_if(|x| Arc::ptr_eq(x, &futex)) 129 .for_each(drop); 130 } 131 } 132 133 #[derive(Debug)] 134 pub struct FutexObj { 135 pcb: Weak<ProcessControlBlock>, 136 key: FutexKey, 137 bitset: u32, 138 // TODO: 优先级继承 139 } 140 141 pub enum FutexAccess { 142 FutexRead, 143 FutexWrite, 144 } 145 146 #[allow(dead_code)] 147 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 148 /// ### 用于定位内核唯一的futex 149 pub enum InnerFutexKey { 150 Shared(SharedKey), 151 Private(PrivateKey), 152 } 153 154 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 155 pub struct FutexKey { 156 ptr: u64, 157 word: u64, 158 offset: u32, 159 key: InnerFutexKey, 160 } 161 162 /// 不同进程间通过文件共享futex变量,表明该变量在文件中的位置 163 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 164 pub struct SharedKey { 165 i_seq: u64, 166 page_offset: u64, 167 } 168 169 /// 同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置 170 #[derive(Clone, Debug)] 171 pub struct PrivateKey { 172 // 所在的地址空间 173 address_space: Option<Weak<AddressSpace>>, 174 // 表示所在页面的初始地址 175 address: u64, 176 } 177 178 impl Hash for PrivateKey { 179 fn hash<H: Hasher>(&self, state: &mut H) { 180 self.address.hash(state); 181 } 182 } 183 184 impl Eq for PrivateKey {} 185 186 impl PartialEq for PrivateKey { 187 fn eq(&self, other: &Self) -> bool { 188 if self.address_space.is_none() && other.address_space.is_none() { 189 return self.address == other.address; 190 } else { 191 return self 192 .address_space 193 .as_ref() 194 .unwrap_or(&Weak::default()) 195 .ptr_eq(&other.address_space.as_ref().unwrap_or(&Weak::default())) 196 && self.address == other.address; 197 } 198 } 199 } 200 201 impl Futex { 202 /// ### 初始化FUTEX_DATA 203 pub fn init() { 204 unsafe { 205 FUTEX_DATA = Some(FutexData { 206 data: SpinLock::new(HashMap::new()), 207 }) 208 }; 209 } 210 211 /// ### 让当前进程在指定futex上等待直到futex_wake显式唤醒 212 pub fn futex_wait( 213 uaddr: VirtAddr, 214 flags: FutexFlag, 215 val: u32, 216 abs_time: Option<TimeSpec>, 217 bitset: u32, 218 ) -> Result<usize, SystemError> { 219 if bitset == 0 { 220 return Err(SystemError::EINVAL); 221 } 222 223 // 获取全局hash表的key值 224 let key = Self::get_futex_key( 225 uaddr, 226 flags.contains(FutexFlag::FLAGS_SHARED), 227 FutexAccess::FutexRead, 228 )?; 229 230 let mut futex_map_guard = FutexData::futex_map(); 231 let bucket = futex_map_guard.get_mut(&key); 232 let bucket_mut = match bucket { 233 Some(bucket) => bucket, 234 None => { 235 let bucket = FutexHashBucket { 236 chain: LinkedList::new(), 237 }; 238 futex_map_guard.insert(key.clone(), bucket); 239 futex_map_guard.get_mut(&key).unwrap() 240 } 241 }; 242 243 // 使用UserBuffer读取futex 244 let user_reader = 245 UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 246 247 // 从用户空间读取到futex的val 248 let mut uval = 0; 249 250 // 读取 251 // 这里只尝试一种方式去读取用户空间,与linux不太一致 252 // 对于linux,如果bucket被锁住时读取失败,将会将bucket解锁后重新读取 253 user_reader.copy_one_from_user::<u32>(&mut uval, 0)?; 254 255 // 不满足wait条件,返回错误 256 if uval != val { 257 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 258 } 259 260 let pcb = ProcessManager::current_pcb(); 261 // 创建超时计时器任务 262 let mut timer = None; 263 if !abs_time.is_none() { 264 let time = abs_time.unwrap(); 265 266 let wakeup_helper = WakeUpHelper::new(pcb.clone()); 267 268 let sec = time.tv_sec; 269 let nsec = time.tv_nsec; 270 let jiffies = next_n_us_timer_jiffies((nsec / 1000 + sec * 1_000_000) as u64); 271 272 let wake_up = Timer::new(wakeup_helper, jiffies); 273 274 wake_up.activate(); 275 timer = Some(wake_up); 276 } 277 278 let futex_q = Arc::new(FutexObj { 279 pcb: Arc::downgrade(&pcb), 280 key: key.clone(), 281 bitset, 282 }); 283 let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; 284 // 满足条件则将当前进程在该bucket上挂起 285 bucket_mut.sleep_no_sched(futex_q.clone()).map_err(|e| { 286 kwarn!("error:{e:?}"); 287 e 288 })?; 289 drop(futex_map_guard); 290 drop(irq_guard); 291 sched(); 292 293 // 被唤醒后的检查 294 let mut futex_map_guard = FutexData::futex_map(); 295 let bucket = futex_map_guard.get_mut(&key); 296 let bucket_mut = match bucket { 297 // 如果该pcb不在链表里面了或者该链表已经被释放,就证明是正常的Wake操作 298 Some(bucket_mut) => { 299 if !bucket_mut.contains(&futex_q) { 300 // 取消定时器任务 301 if timer.is_some() { 302 timer.unwrap().cancel(); 303 } 304 return Ok(0); 305 } 306 // 非正常唤醒,返回交给下层 307 bucket_mut 308 } 309 None => { 310 // 取消定时器任务 311 if timer.is_some() { 312 timer.unwrap().cancel(); 313 } 314 return Ok(0); 315 } 316 }; 317 318 // 如果是超时唤醒,则返回错误 319 if timer.is_some() { 320 if timer.clone().unwrap().timeout() { 321 bucket_mut.remove(futex_q); 322 323 return Err(SystemError::ETIMEDOUT); 324 } 325 } 326 327 // TODO: 如果没有挂起的信号,则重新判断是否满足wait要求,重新进入wait 328 329 // 经过前面的几个判断,到这里之后, 330 // 当前进程被唤醒大概率是其他进程更改了uval,需要重新去判断当前进程是否满足wait 331 332 // 到这里之后,前面的唤醒条件都不满足,则是被信号唤醒 333 // 需要处理信号然后重启futex系统调用 334 335 // 取消定时器任务 336 if timer.is_some() { 337 let timer = timer.unwrap(); 338 if !timer.timeout() { 339 timer.cancel(); 340 } 341 } 342 Ok(0) 343 } 344 345 // ### 唤醒指定futex上挂起的最多nr_wake个进程 346 pub fn futex_wake( 347 uaddr: VirtAddr, 348 flags: FutexFlag, 349 nr_wake: u32, 350 bitset: u32, 351 ) -> Result<usize, SystemError> { 352 if bitset == 0 { 353 return Err(SystemError::EINVAL); 354 } 355 356 // 获取futex_key,并且判断地址空间合法性 357 let key = Self::get_futex_key( 358 uaddr, 359 flags.contains(FutexFlag::FLAGS_SHARED), 360 FutexAccess::FutexRead, 361 )?; 362 let mut binding = FutexData::futex_map(); 363 let bucket_mut = binding.get_mut(&key).ok_or(SystemError::EINVAL)?; 364 365 // 确保后面的唤醒操作是有意义的 366 if bucket_mut.chain.is_empty() { 367 return Ok(0); 368 } 369 // 从队列中唤醒 370 let count = bucket_mut.wake_up(key.clone(), Some(bitset), nr_wake)?; 371 372 drop(binding); 373 374 FutexData::try_remove(&key); 375 376 Ok(count) 377 } 378 379 /// ### 唤醒制定uaddr1上的最多nr_wake个进程,然后将uaddr1最多nr_requeue个进程移动到uaddr2绑定的futex上 380 pub fn futex_requeue( 381 uaddr1: VirtAddr, 382 flags: FutexFlag, 383 uaddr2: VirtAddr, 384 nr_wake: i32, 385 nr_requeue: i32, 386 cmpval: Option<u32>, 387 requeue_pi: bool, 388 ) -> Result<usize, SystemError> { 389 if nr_requeue < 0 || nr_wake < 0 { 390 return Err(SystemError::EINVAL); 391 } 392 393 // 暂时不支持优先级继承 394 if requeue_pi { 395 return Err(SystemError::ENOSYS); 396 } 397 398 let key1 = Self::get_futex_key( 399 uaddr1, 400 flags.contains(FutexFlag::FLAGS_SHARED), 401 FutexAccess::FutexRead, 402 )?; 403 let key2 = Self::get_futex_key(uaddr2, flags.contains(FutexFlag::FLAGS_SHARED), { 404 match requeue_pi { 405 true => FutexAccess::FutexWrite, 406 false => FutexAccess::FutexRead, 407 } 408 })?; 409 410 if requeue_pi && key1 == key2 { 411 return Err(SystemError::EINVAL); 412 } 413 414 if likely(!cmpval.is_none()) { 415 let uval_reader = 416 UserBufferReader::new(uaddr1.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 417 let curval = uval_reader.read_one_from_user::<u32>(0)?; 418 419 // 判断是否满足条件 420 if *curval != cmpval.unwrap() { 421 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 422 } 423 } 424 425 let mut futex_data_guard = FutexData::futex_map(); 426 if !requeue_pi { 427 // 唤醒nr_wake个进程 428 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 429 let ret = bucket_1_mut.wake_up(key1.clone(), None, nr_wake as u32)?; 430 // 将bucket1中最多nr_requeue个任务转移到bucket2 431 for _ in 0..nr_requeue { 432 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 433 let futex_q = bucket_1_mut.chain.pop_front(); 434 match futex_q { 435 Some(futex_q) => { 436 let bucket_2_mut = 437 futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?; 438 bucket_2_mut.chain.push_back(futex_q); 439 } 440 None => { 441 break; 442 } 443 } 444 } 445 446 return Ok(ret); 447 } else { 448 // 暂时不支持优先级继承 449 todo!() 450 } 451 } 452 453 /// ### 唤醒futex上的进程的同时进行一些操作 454 pub fn futex_wake_op( 455 uaddr1: VirtAddr, 456 flags: FutexFlag, 457 uaddr2: VirtAddr, 458 nr_wake: i32, 459 nr_wake2: i32, 460 op: i32, 461 ) -> Result<usize, SystemError> { 462 let key1 = Futex::get_futex_key( 463 uaddr1, 464 flags.contains(FutexFlag::FLAGS_SHARED), 465 FutexAccess::FutexRead, 466 )?; 467 let key2 = Futex::get_futex_key( 468 uaddr2, 469 flags.contains(FutexFlag::FLAGS_SHARED), 470 FutexAccess::FutexWrite, 471 )?; 472 473 let mut futex_data_guard = FutexData::futex_map(); 474 let bucket1 = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 475 let mut wake_count = 0; 476 477 // 唤醒uaddr1中的进程 478 wake_count += bucket1.wake_up(key1, None, nr_wake as u32)?; 479 480 match Self::futex_atomic_op_inuser(op as u32, uaddr2) { 481 Ok(ret) => { 482 // 操作成功则唤醒uaddr2中的进程 483 if ret { 484 let bucket2 = futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?; 485 wake_count += bucket2.wake_up(key2, None, nr_wake2 as u32)?; 486 } 487 } 488 Err(e) => { 489 // TODO:retry? 490 return Err(e); 491 } 492 } 493 494 Ok(wake_count) 495 } 496 497 fn get_futex_key( 498 uaddr: VirtAddr, 499 fshared: bool, 500 _access: FutexAccess, 501 ) -> Result<FutexKey, SystemError> { 502 let mut address = uaddr.data(); 503 504 // 计算相对页的偏移量 505 let offset = address & (MMArch::PAGE_SIZE - 1); 506 // 判断内存对齐 507 if !(uaddr.data() & (core::mem::size_of::<u32>() - 1) == 0) { 508 return Err(SystemError::EINVAL); 509 } 510 511 // 目前address指向所在页面的起始地址 512 address -= offset; 513 514 // 若不是进程间共享的futex,则返回Private 515 if !fshared { 516 return Ok(FutexKey { 517 ptr: 0, 518 word: 0, 519 offset: offset as u32, 520 key: InnerFutexKey::Private(PrivateKey { 521 address: address as u64, 522 address_space: None, 523 }), 524 }); 525 } 526 527 // 获取到地址所在地址空间 528 let address_space = AddressSpace::current()?; 529 // TODO: 判断是否为匿名映射,是匿名映射才返回PrivateKey 530 return Ok(FutexKey { 531 ptr: 0, 532 word: 0, 533 offset: offset as u32, 534 key: InnerFutexKey::Private(PrivateKey { 535 address: address as u64, 536 address_space: Some(Arc::downgrade(&address_space)), 537 }), 538 }); 539 540 // 未实现共享内存机制,贡献内存部分应该通过inode构建SharedKey 541 // todo!("Shared memory not implemented"); 542 } 543 544 pub fn futex_atomic_op_inuser(encoded_op: u32, uaddr: VirtAddr) -> Result<bool, SystemError> { 545 let op = FutexOP::from_bits((encoded_op & 0x70000000) >> 28).ok_or(SystemError::ENOSYS)?; 546 let cmp = 547 FutexOpCMP::from_bits((encoded_op & 0x0f000000) >> 24).ok_or(SystemError::ENOSYS)?; 548 549 let sign_extend32 = |value: u32, index: i32| { 550 let shift = (31 - index) as u8; 551 return (value << shift) >> shift; 552 }; 553 554 let mut oparg = sign_extend32((encoded_op & 0x00fff000) >> 12, 11); 555 let cmparg = sign_extend32(encoded_op & 0x00000fff, 11); 556 557 if encoded_op & (FutexOP::FUTEX_OP_OPARG_SHIFT.bits() << 28) != 0 { 558 if oparg > 31 { 559 kwarn!( 560 "futex_wake_op: pid:{} tries to shift op by {}; fix this program", 561 ProcessManager::current_pcb().pid().data(), 562 oparg 563 ); 564 565 oparg &= 31; 566 } 567 } 568 569 // TODO: 这个汇编似乎是有问题的,目前不好测试 570 let old_val = Self::arch_futex_atomic_op_inuser(op, oparg, uaddr)?; 571 572 match cmp { 573 FutexOpCMP::FUTEX_OP_CMP_EQ => { 574 return Ok(cmparg == old_val); 575 } 576 FutexOpCMP::FUTEX_OP_CMP_NE => { 577 return Ok(cmparg != old_val); 578 } 579 FutexOpCMP::FUTEX_OP_CMP_LT => { 580 return Ok(cmparg < old_val); 581 } 582 FutexOpCMP::FUTEX_OP_CMP_LE => { 583 return Ok(cmparg <= old_val); 584 } 585 FutexOpCMP::FUTEX_OP_CMP_GE => { 586 return Ok(cmparg >= old_val); 587 } 588 FutexOpCMP::FUTEX_OP_CMP_GT => { 589 return Ok(cmparg > old_val); 590 } 591 _ => { 592 return Err(SystemError::ENOSYS); 593 } 594 } 595 } 596 597 /// ### 对futex进行操作 598 /// 599 /// 进入该方法会关闭中断保证修改的原子性,所以进入该方法前应确保中断锁已释放 600 /// 601 /// ### return uaddr原来的值 602 #[allow(unused_assignments)] 603 pub fn arch_futex_atomic_op_inuser( 604 op: FutexOP, 605 oparg: u32, 606 uaddr: VirtAddr, 607 ) -> Result<u32, SystemError> { 608 let guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; 609 610 let reader = 611 UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 612 613 let oldval = reader.read_one_from_user::<u32>(0)?; 614 615 let atomic_addr = AtomicU64::new(uaddr.data() as u64); 616 // 这个指针是指向指针的指针 617 let ptr = atomic_addr.as_ptr(); 618 match op { 619 FutexOP::FUTEX_OP_SET => unsafe { 620 *((*ptr) as *mut u32) = oparg; 621 }, 622 FutexOP::FUTEX_OP_ADD => unsafe { 623 *((*ptr) as *mut u32) += oparg; 624 }, 625 FutexOP::FUTEX_OP_OR => unsafe { 626 *((*ptr) as *mut u32) |= oparg; 627 }, 628 FutexOP::FUTEX_OP_ANDN => unsafe { 629 *((*ptr) as *mut u32) &= oparg; 630 }, 631 FutexOP::FUTEX_OP_XOR => unsafe { 632 *((*ptr) as *mut u32) ^= oparg; 633 }, 634 _ => return Err(SystemError::ENOSYS), 635 } 636 637 drop(guard); 638 639 Ok(*oldval) 640 } 641 } 642 643 #[no_mangle] 644 unsafe extern "C" fn rs_futex_init() { 645 Futex::init(); 646 } 647