1 use alloc::{ 2 collections::LinkedList, 3 sync::{Arc, Weak}, 4 }; 5 use core::hash::{Hash, Hasher}; 6 use core::{intrinsics::likely, sync::atomic::AtomicU64}; 7 use hashbrown::HashMap; 8 use system_error::SystemError; 9 10 use crate::{ 11 arch::{sched::sched, CurrentIrqArch, MMArch}, 12 exception::InterruptArch, 13 libs::spinlock::{SpinLock, SpinLockGuard}, 14 mm::{ucontext::AddressSpace, MemoryManagementArch, VirtAddr}, 15 process::{ProcessControlBlock, ProcessManager}, 16 syscall::user_access::UserBufferReader, 17 time::{ 18 timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper}, 19 TimeSpec, 20 }, 21 }; 22 23 use super::constant::*; 24 25 static mut FUTEX_DATA: Option<FutexData> = None; 26 27 pub struct FutexData { 28 data: SpinLock<HashMap<FutexKey, FutexHashBucket>>, 29 } 30 31 impl FutexData { 32 pub fn futex_map() -> SpinLockGuard<'static, HashMap<FutexKey, FutexHashBucket>> { 33 unsafe { FUTEX_DATA.as_ref().unwrap().data.lock() } 34 } 35 36 pub fn try_remove(key: &FutexKey) -> Option<FutexHashBucket> { 37 unsafe { 38 let mut guard = FUTEX_DATA.as_ref().unwrap().data.lock(); 39 if let Some(futex) = guard.get(key) { 40 if futex.chain.is_empty() { 41 return guard.remove(key); 42 } 43 } 44 } 45 None 46 } 47 } 48 49 pub struct Futex; 50 51 // 对于同一个futex的进程或线程将会在这个bucket等待 52 pub struct FutexHashBucket { 53 // 该futex维护的等待队列 54 chain: LinkedList<Arc<FutexObj>>, 55 } 56 57 impl FutexHashBucket { 58 /// ## 判断是否在bucket里 59 pub fn contains(&self, futex_q: &FutexObj) -> bool { 60 self.chain 61 .iter() 62 .filter(|x| futex_q.pcb.ptr_eq(&x.pcb) && x.key == futex_q.key) 63 .count() 64 != 0 65 } 66 67 /// 让futex_q在该bucket上挂起 68 /// 69 /// 进入该函数前,需要关中断 70 #[inline(always)] 71 pub fn sleep_no_sched(&mut self, futex_q: Arc<FutexObj>) -> Result<(), SystemError> { 72 assert!(!CurrentIrqArch::is_irq_enabled()); 73 self.chain.push_back(futex_q); 74 75 ProcessManager::mark_sleep(true)?; 76 77 Ok(()) 78 } 79 80 /// ## 唤醒队列中的最多nr_wake个进程 81 /// 82 /// return: 唤醒的进程数 83 #[inline(always)] 84 pub fn wake_up( 85 &mut self, 86 key: FutexKey, 87 bitset: Option<u32>, 88 nr_wake: u32, 89 ) -> Result<usize, SystemError> { 90 let mut count = 0; 91 let mut pop_count = 0; 92 while let Some(futex_q) = self.chain.pop_front() { 93 if futex_q.key == key { 94 // TODO: 考虑优先级继承的机制 95 96 if let Some(bitset) = bitset { 97 if futex_q.bitset != bitset { 98 self.chain.push_back(futex_q); 99 continue; 100 } 101 } 102 103 // 唤醒 104 if futex_q.pcb.upgrade().is_some() { 105 self.remove(futex_q.clone()); 106 ProcessManager::wakeup(&futex_q.pcb.upgrade().unwrap())?; 107 } 108 109 // 判断唤醒数 110 count += 1; 111 if count >= nr_wake { 112 break; 113 } 114 } else { 115 self.chain.push_back(futex_q); 116 } 117 // 判断是否循环完队列了 118 pop_count += 1; 119 if pop_count >= self.chain.len() { 120 break; 121 } 122 } 123 Ok(count as usize) 124 } 125 126 /// 将FutexObj从bucket中删除 127 pub fn remove(&mut self, futex: Arc<FutexObj>) { 128 self.chain 129 .extract_if(|x| Arc::ptr_eq(x, &futex)) 130 .for_each(drop); 131 } 132 } 133 134 #[derive(Debug)] 135 pub struct FutexObj { 136 pcb: Weak<ProcessControlBlock>, 137 key: FutexKey, 138 bitset: u32, 139 // TODO: 优先级继承 140 } 141 142 pub enum FutexAccess { 143 FutexRead, 144 FutexWrite, 145 } 146 147 #[allow(dead_code)] 148 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 149 /// ### 用于定位内核唯一的futex 150 pub enum InnerFutexKey { 151 Shared(SharedKey), 152 Private(PrivateKey), 153 } 154 155 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 156 pub struct FutexKey { 157 ptr: u64, 158 word: u64, 159 offset: u32, 160 key: InnerFutexKey, 161 } 162 163 /// 不同进程间通过文件共享futex变量,表明该变量在文件中的位置 164 #[derive(Hash, PartialEq, Eq, Clone, Debug)] 165 pub struct SharedKey { 166 i_seq: u64, 167 page_offset: u64, 168 } 169 170 /// 同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置 171 #[derive(Clone, Debug)] 172 pub struct PrivateKey { 173 // 所在的地址空间 174 address_space: Option<Weak<AddressSpace>>, 175 // 表示所在页面的初始地址 176 address: u64, 177 } 178 179 impl Hash for PrivateKey { 180 fn hash<H: Hasher>(&self, state: &mut H) { 181 self.address.hash(state); 182 } 183 } 184 185 impl Eq for PrivateKey {} 186 187 impl PartialEq for PrivateKey { 188 fn eq(&self, other: &Self) -> bool { 189 if self.address_space.is_none() && other.address_space.is_none() { 190 return self.address == other.address; 191 } else { 192 return self 193 .address_space 194 .as_ref() 195 .unwrap_or(&Weak::default()) 196 .ptr_eq(other.address_space.as_ref().unwrap_or(&Weak::default())) 197 && self.address == other.address; 198 } 199 } 200 } 201 202 impl Futex { 203 /// ### 初始化FUTEX_DATA 204 pub fn init() { 205 unsafe { 206 FUTEX_DATA = Some(FutexData { 207 data: SpinLock::new(HashMap::new()), 208 }) 209 }; 210 } 211 212 /// ### 让当前进程在指定futex上等待直到futex_wake显式唤醒 213 pub fn futex_wait( 214 uaddr: VirtAddr, 215 flags: FutexFlag, 216 val: u32, 217 abs_time: Option<TimeSpec>, 218 bitset: u32, 219 ) -> Result<usize, SystemError> { 220 if bitset == 0 { 221 return Err(SystemError::EINVAL); 222 } 223 224 // 获取全局hash表的key值 225 let key = Self::get_futex_key( 226 uaddr, 227 flags.contains(FutexFlag::FLAGS_SHARED), 228 FutexAccess::FutexRead, 229 )?; 230 231 let mut futex_map_guard = FutexData::futex_map(); 232 let bucket = futex_map_guard.get_mut(&key); 233 let bucket_mut = match bucket { 234 Some(bucket) => bucket, 235 None => { 236 let bucket = FutexHashBucket { 237 chain: LinkedList::new(), 238 }; 239 futex_map_guard.insert(key.clone(), bucket); 240 futex_map_guard.get_mut(&key).unwrap() 241 } 242 }; 243 244 // 使用UserBuffer读取futex 245 let user_reader = 246 UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 247 248 // 从用户空间读取到futex的val 249 let mut uval = 0; 250 251 // 读取 252 // 这里只尝试一种方式去读取用户空间,与linux不太一致 253 // 对于linux,如果bucket被锁住时读取失败,将会将bucket解锁后重新读取 254 user_reader.copy_one_from_user::<u32>(&mut uval, 0)?; 255 256 // 不满足wait条件,返回错误 257 if uval != val { 258 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 259 } 260 261 let pcb = ProcessManager::current_pcb(); 262 // 创建超时计时器任务 263 let mut timer = None; 264 if let Some(time) = abs_time { 265 let wakeup_helper = WakeUpHelper::new(pcb.clone()); 266 267 let sec = time.tv_sec; 268 let nsec = time.tv_nsec; 269 let jiffies = next_n_us_timer_jiffies((nsec / 1000 + sec * 1_000_000) as u64); 270 271 let wake_up = Timer::new(wakeup_helper, jiffies); 272 273 wake_up.activate(); 274 timer = Some(wake_up); 275 } 276 277 let futex_q = Arc::new(FutexObj { 278 pcb: Arc::downgrade(&pcb), 279 key: key.clone(), 280 bitset, 281 }); 282 let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; 283 // 满足条件则将当前进程在该bucket上挂起 284 bucket_mut.sleep_no_sched(futex_q.clone()).map_err(|e| { 285 kwarn!("error:{e:?}"); 286 e 287 })?; 288 drop(futex_map_guard); 289 drop(irq_guard); 290 sched(); 291 292 // 被唤醒后的检查 293 let mut futex_map_guard = FutexData::futex_map(); 294 let bucket = futex_map_guard.get_mut(&key); 295 let bucket_mut = match bucket { 296 // 如果该pcb不在链表里面了或者该链表已经被释放,就证明是正常的Wake操作 297 Some(bucket_mut) => { 298 if !bucket_mut.contains(&futex_q) { 299 // 取消定时器任务 300 if let Some(timer) = timer { 301 timer.cancel(); 302 } 303 return Ok(0); 304 } 305 // 非正常唤醒,返回交给下层 306 bucket_mut 307 } 308 None => { 309 // 取消定时器任务 310 if let Some(timer) = timer { 311 timer.cancel(); 312 } 313 return Ok(0); 314 } 315 }; 316 317 // 如果是超时唤醒,则返回错误 318 if timer.is_some() && timer.clone().unwrap().timeout() { 319 bucket_mut.remove(futex_q); 320 321 return Err(SystemError::ETIMEDOUT); 322 } 323 324 // TODO: 如果没有挂起的信号,则重新判断是否满足wait要求,重新进入wait 325 326 // 经过前面的几个判断,到这里之后, 327 // 当前进程被唤醒大概率是其他进程更改了uval,需要重新去判断当前进程是否满足wait 328 329 // 到这里之后,前面的唤醒条件都不满足,则是被信号唤醒 330 // 需要处理信号然后重启futex系统调用 331 332 // 取消定时器任务 333 if let Some(timer) = timer { 334 if !timer.timeout() { 335 timer.cancel(); 336 } 337 } 338 339 Ok(0) 340 } 341 342 // ### 唤醒指定futex上挂起的最多nr_wake个进程 343 pub fn futex_wake( 344 uaddr: VirtAddr, 345 flags: FutexFlag, 346 nr_wake: u32, 347 bitset: u32, 348 ) -> Result<usize, SystemError> { 349 if bitset == 0 { 350 return Err(SystemError::EINVAL); 351 } 352 353 // 获取futex_key,并且判断地址空间合法性 354 let key = Self::get_futex_key( 355 uaddr, 356 flags.contains(FutexFlag::FLAGS_SHARED), 357 FutexAccess::FutexRead, 358 )?; 359 let mut binding = FutexData::futex_map(); 360 let bucket_mut = binding.get_mut(&key).ok_or(SystemError::EINVAL)?; 361 362 // 确保后面的唤醒操作是有意义的 363 if bucket_mut.chain.is_empty() { 364 return Ok(0); 365 } 366 // 从队列中唤醒 367 let count = bucket_mut.wake_up(key.clone(), Some(bitset), nr_wake)?; 368 369 drop(binding); 370 371 FutexData::try_remove(&key); 372 373 Ok(count) 374 } 375 376 /// ### 唤醒制定uaddr1上的最多nr_wake个进程,然后将uaddr1最多nr_requeue个进程移动到uaddr2绑定的futex上 377 pub fn futex_requeue( 378 uaddr1: VirtAddr, 379 flags: FutexFlag, 380 uaddr2: VirtAddr, 381 nr_wake: i32, 382 nr_requeue: i32, 383 cmpval: Option<u32>, 384 requeue_pi: bool, 385 ) -> Result<usize, SystemError> { 386 if nr_requeue < 0 || nr_wake < 0 { 387 return Err(SystemError::EINVAL); 388 } 389 390 // 暂时不支持优先级继承 391 if requeue_pi { 392 return Err(SystemError::ENOSYS); 393 } 394 395 let key1 = Self::get_futex_key( 396 uaddr1, 397 flags.contains(FutexFlag::FLAGS_SHARED), 398 FutexAccess::FutexRead, 399 )?; 400 let key2 = Self::get_futex_key(uaddr2, flags.contains(FutexFlag::FLAGS_SHARED), { 401 match requeue_pi { 402 true => FutexAccess::FutexWrite, 403 false => FutexAccess::FutexRead, 404 } 405 })?; 406 407 if requeue_pi && key1 == key2 { 408 return Err(SystemError::EINVAL); 409 } 410 411 if likely(cmpval.is_some()) { 412 let uval_reader = 413 UserBufferReader::new(uaddr1.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 414 let curval = uval_reader.read_one_from_user::<u32>(0)?; 415 416 // 判断是否满足条件 417 if *curval != cmpval.unwrap() { 418 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); 419 } 420 } 421 422 let mut futex_data_guard = FutexData::futex_map(); 423 if !requeue_pi { 424 // 唤醒nr_wake个进程 425 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 426 let ret = bucket_1_mut.wake_up(key1.clone(), None, nr_wake as u32)?; 427 // 将bucket1中最多nr_requeue个任务转移到bucket2 428 for _ in 0..nr_requeue { 429 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 430 let futex_q = bucket_1_mut.chain.pop_front(); 431 match futex_q { 432 Some(futex_q) => { 433 let bucket_2_mut = 434 futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?; 435 bucket_2_mut.chain.push_back(futex_q); 436 } 437 None => { 438 break; 439 } 440 } 441 } 442 443 return Ok(ret); 444 } else { 445 // 暂时不支持优先级继承 446 todo!() 447 } 448 } 449 450 /// ### 唤醒futex上的进程的同时进行一些操作 451 pub fn futex_wake_op( 452 uaddr1: VirtAddr, 453 flags: FutexFlag, 454 uaddr2: VirtAddr, 455 nr_wake: i32, 456 nr_wake2: i32, 457 op: i32, 458 ) -> Result<usize, SystemError> { 459 let key1 = Futex::get_futex_key( 460 uaddr1, 461 flags.contains(FutexFlag::FLAGS_SHARED), 462 FutexAccess::FutexRead, 463 )?; 464 let key2 = Futex::get_futex_key( 465 uaddr2, 466 flags.contains(FutexFlag::FLAGS_SHARED), 467 FutexAccess::FutexWrite, 468 )?; 469 470 let mut futex_data_guard = FutexData::futex_map(); 471 let bucket1 = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?; 472 let mut wake_count = 0; 473 474 // 唤醒uaddr1中的进程 475 wake_count += bucket1.wake_up(key1, None, nr_wake as u32)?; 476 477 match Self::futex_atomic_op_inuser(op as u32, uaddr2) { 478 Ok(ret) => { 479 // 操作成功则唤醒uaddr2中的进程 480 if ret { 481 let bucket2 = futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?; 482 wake_count += bucket2.wake_up(key2, None, nr_wake2 as u32)?; 483 } 484 } 485 Err(e) => { 486 // TODO:retry? 487 return Err(e); 488 } 489 } 490 491 Ok(wake_count) 492 } 493 494 fn get_futex_key( 495 uaddr: VirtAddr, 496 fshared: bool, 497 _access: FutexAccess, 498 ) -> Result<FutexKey, SystemError> { 499 let mut address = uaddr.data(); 500 501 // 计算相对页的偏移量 502 let offset = address & (MMArch::PAGE_SIZE - 1); 503 // 判断内存对齐 504 if uaddr.data() & (core::mem::size_of::<u32>() - 1) != 0 { 505 return Err(SystemError::EINVAL); 506 } 507 508 // 目前address指向所在页面的起始地址 509 address -= offset; 510 511 // 若不是进程间共享的futex,则返回Private 512 if !fshared { 513 return Ok(FutexKey { 514 ptr: 0, 515 word: 0, 516 offset: offset as u32, 517 key: InnerFutexKey::Private(PrivateKey { 518 address: address as u64, 519 address_space: None, 520 }), 521 }); 522 } 523 524 // 获取到地址所在地址空间 525 let address_space = AddressSpace::current()?; 526 // TODO: 判断是否为匿名映射,是匿名映射才返回PrivateKey 527 return Ok(FutexKey { 528 ptr: 0, 529 word: 0, 530 offset: offset as u32, 531 key: InnerFutexKey::Private(PrivateKey { 532 address: address as u64, 533 address_space: Some(Arc::downgrade(&address_space)), 534 }), 535 }); 536 537 // 未实现共享内存机制,贡献内存部分应该通过inode构建SharedKey 538 // todo!("Shared memory not implemented"); 539 } 540 541 pub fn futex_atomic_op_inuser(encoded_op: u32, uaddr: VirtAddr) -> Result<bool, SystemError> { 542 let op = FutexOP::from_bits((encoded_op & 0x70000000) >> 28).ok_or(SystemError::ENOSYS)?; 543 let cmp = 544 FutexOpCMP::from_bits((encoded_op & 0x0f000000) >> 24).ok_or(SystemError::ENOSYS)?; 545 546 let sign_extend32 = |value: u32, index: i32| { 547 let shift = (31 - index) as u8; 548 return (value << shift) >> shift; 549 }; 550 551 let mut oparg = sign_extend32((encoded_op & 0x00fff000) >> 12, 11); 552 let cmparg = sign_extend32(encoded_op & 0x00000fff, 11); 553 554 if (encoded_op & (FutexOP::FUTEX_OP_OPARG_SHIFT.bits() << 28) != 0) && oparg > 31 { 555 kwarn!( 556 "futex_wake_op: pid:{} tries to shift op by {}; fix this program", 557 ProcessManager::current_pcb().pid().data(), 558 oparg 559 ); 560 561 oparg &= 31; 562 } 563 564 // TODO: 这个汇编似乎是有问题的,目前不好测试 565 let old_val = Self::arch_futex_atomic_op_inuser(op, oparg, uaddr)?; 566 567 match cmp { 568 FutexOpCMP::FUTEX_OP_CMP_EQ => { 569 return Ok(cmparg == old_val); 570 } 571 FutexOpCMP::FUTEX_OP_CMP_NE => { 572 return Ok(cmparg != old_val); 573 } 574 FutexOpCMP::FUTEX_OP_CMP_LT => { 575 return Ok(cmparg < old_val); 576 } 577 FutexOpCMP::FUTEX_OP_CMP_LE => { 578 return Ok(cmparg <= old_val); 579 } 580 FutexOpCMP::FUTEX_OP_CMP_GE => { 581 return Ok(cmparg >= old_val); 582 } 583 FutexOpCMP::FUTEX_OP_CMP_GT => { 584 return Ok(cmparg > old_val); 585 } 586 _ => { 587 return Err(SystemError::ENOSYS); 588 } 589 } 590 } 591 592 /// ### 对futex进行操作 593 /// 594 /// 进入该方法会关闭中断保证修改的原子性,所以进入该方法前应确保中断锁已释放 595 /// 596 /// ### return uaddr原来的值 597 #[allow(unused_assignments)] 598 pub fn arch_futex_atomic_op_inuser( 599 op: FutexOP, 600 oparg: u32, 601 uaddr: VirtAddr, 602 ) -> Result<u32, SystemError> { 603 let guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; 604 605 let reader = 606 UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?; 607 608 let oldval = reader.read_one_from_user::<u32>(0)?; 609 610 let atomic_addr = AtomicU64::new(uaddr.data() as u64); 611 // 这个指针是指向指针的指针 612 let ptr = atomic_addr.as_ptr(); 613 match op { 614 FutexOP::FUTEX_OP_SET => unsafe { 615 *((*ptr) as *mut u32) = oparg; 616 }, 617 FutexOP::FUTEX_OP_ADD => unsafe { 618 *((*ptr) as *mut u32) += oparg; 619 }, 620 FutexOP::FUTEX_OP_OR => unsafe { 621 *((*ptr) as *mut u32) |= oparg; 622 }, 623 FutexOP::FUTEX_OP_ANDN => unsafe { 624 *((*ptr) as *mut u32) &= oparg; 625 }, 626 FutexOP::FUTEX_OP_XOR => unsafe { 627 *((*ptr) as *mut u32) ^= oparg; 628 }, 629 _ => return Err(SystemError::ENOSYS), 630 } 631 632 drop(guard); 633 634 Ok(*oldval) 635 } 636 } 637