xref: /DragonOS/kernel/src/libs/futex/futex.rs (revision 08a2ee408498b0db4c76c57b149f1cf047758f3c)
1 use alloc::{
2     collections::LinkedList,
3     sync::{Arc, Weak},
4 };
5 use core::hash::{Hash, Hasher};
6 use core::{intrinsics::likely, sync::atomic::AtomicU64};
7 use hashbrown::HashMap;
8 
9 use crate::{
10     arch::{sched::sched, CurrentIrqArch, MMArch},
11     exception::InterruptArch,
12     libs::spinlock::{SpinLock, SpinLockGuard},
13     mm::{ucontext::AddressSpace, MemoryManagementArch, VirtAddr},
14     process::{ProcessControlBlock, ProcessManager},
15     syscall::{user_access::UserBufferReader, SystemError},
16     time::{
17         timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper},
18         TimeSpec,
19     },
20 };
21 
22 use super::constant::*;
23 
24 static mut FUTEX_DATA: Option<FutexData> = None;
25 
26 pub struct FutexData {
27     data: SpinLock<HashMap<FutexKey, FutexHashBucket>>,
28 }
29 
30 impl FutexData {
31     pub fn futex_map() -> SpinLockGuard<'static, HashMap<FutexKey, FutexHashBucket>> {
32         unsafe { FUTEX_DATA.as_ref().unwrap().data.lock() }
33     }
34 
35     pub fn try_remove(key: &FutexKey) -> Option<FutexHashBucket> {
36         unsafe {
37             let mut guard = FUTEX_DATA.as_ref().unwrap().data.lock();
38             if let Some(futex) = guard.get(key) {
39                 if futex.chain.is_empty() {
40                     return guard.remove(key);
41                 }
42             }
43         }
44         None
45     }
46 }
47 
48 pub struct Futex;
49 
50 // 对于同一个futex的进程或线程将会在这个bucket等待
51 pub struct FutexHashBucket {
52     // 该futex维护的等待队列
53     chain: LinkedList<Arc<FutexObj>>,
54 }
55 
56 impl FutexHashBucket {
57     /// ## 判断是否在bucket里
58     pub fn contains(&self, futex_q: &FutexObj) -> bool {
59         self.chain
60             .iter()
61             .filter(|x| futex_q.pcb.ptr_eq(&x.pcb) && x.key == futex_q.key)
62             .count()
63             != 0
64     }
65 
66     /// 让futex_q在该bucket上挂起
67     ///
68     /// 进入该函数前,需要关中断
69     #[inline(always)]
70     pub fn sleep_no_sched(&mut self, futex_q: Arc<FutexObj>) -> Result<(), SystemError> {
71         assert!(CurrentIrqArch::is_irq_enabled() == false);
72         self.chain.push_back(futex_q);
73 
74         ProcessManager::mark_sleep(true)?;
75 
76         Ok(())
77     }
78 
79     /// ## 唤醒队列中的最多nr_wake个进程
80     ///
81     /// return: 唤醒的进程数
82     #[inline(always)]
83     pub fn wake_up(
84         &mut self,
85         key: FutexKey,
86         bitset: Option<u32>,
87         nr_wake: u32,
88     ) -> Result<usize, SystemError> {
89         let mut count = 0;
90         let mut pop_count = 0;
91         while let Some(futex_q) = self.chain.pop_front() {
92             if futex_q.key == key {
93                 // TODO: 考虑优先级继承的机制
94 
95                 if let Some(bitset) = bitset {
96                     if futex_q.bitset != bitset {
97                         self.chain.push_back(futex_q);
98                         continue;
99                     }
100                 }
101 
102                 // 唤醒
103                 if futex_q.pcb.upgrade().is_some() {
104                     self.remove(futex_q.clone());
105                     ProcessManager::wakeup(&futex_q.pcb.upgrade().unwrap())?;
106                 }
107 
108                 // 判断唤醒数
109                 count += 1;
110                 if count >= nr_wake {
111                     break;
112                 }
113             } else {
114                 self.chain.push_back(futex_q);
115             }
116             // 判断是否循环完队列了
117             pop_count += 1;
118             if pop_count >= self.chain.len() {
119                 break;
120             }
121         }
122         Ok(count as usize)
123     }
124 
125     /// 将FutexObj从bucket中删除
126     pub fn remove(&mut self, futex: Arc<FutexObj>) {
127         self.chain
128             .extract_if(|x| Arc::ptr_eq(x, &futex))
129             .for_each(drop);
130     }
131 }
132 
133 #[derive(Debug)]
134 pub struct FutexObj {
135     pcb: Weak<ProcessControlBlock>,
136     key: FutexKey,
137     bitset: u32,
138     // TODO: 优先级继承
139 }
140 
141 pub enum FutexAccess {
142     FutexRead,
143     FutexWrite,
144 }
145 
146 #[allow(dead_code)]
147 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
148 /// ### 用于定位内核唯一的futex
149 pub enum InnerFutexKey {
150     Shared(SharedKey),
151     Private(PrivateKey),
152 }
153 
154 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
155 pub struct FutexKey {
156     ptr: u64,
157     word: u64,
158     offset: u32,
159     key: InnerFutexKey,
160 }
161 
162 /// 不同进程间通过文件共享futex变量,表明该变量在文件中的位置
163 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
164 pub struct SharedKey {
165     i_seq: u64,
166     page_offset: u64,
167 }
168 
169 /// 同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置
170 #[derive(Clone, Debug)]
171 pub struct PrivateKey {
172     // 所在的地址空间
173     address_space: Option<Weak<AddressSpace>>,
174     // 表示所在页面的初始地址
175     address: u64,
176 }
177 
178 impl Hash for PrivateKey {
179     fn hash<H: Hasher>(&self, state: &mut H) {
180         self.address.hash(state);
181     }
182 }
183 
184 impl Eq for PrivateKey {}
185 
186 impl PartialEq for PrivateKey {
187     fn eq(&self, other: &Self) -> bool {
188         if self.address_space.is_none() && other.address_space.is_none() {
189             return self.address == other.address;
190         } else {
191             return self
192                 .address_space
193                 .as_ref()
194                 .unwrap_or(&Weak::default())
195                 .ptr_eq(&other.address_space.as_ref().unwrap_or(&Weak::default()))
196                 && self.address == other.address;
197         }
198     }
199 }
200 
201 impl Futex {
202     /// ### 初始化FUTEX_DATA
203     pub fn init() {
204         unsafe {
205             FUTEX_DATA = Some(FutexData {
206                 data: SpinLock::new(HashMap::new()),
207             })
208         };
209     }
210 
211     /// ### 让当前进程在指定futex上等待直到futex_wake显式唤醒
212     pub fn futex_wait(
213         uaddr: VirtAddr,
214         flags: FutexFlag,
215         val: u32,
216         abs_time: Option<TimeSpec>,
217         bitset: u32,
218     ) -> Result<usize, SystemError> {
219         if bitset == 0 {
220             return Err(SystemError::EINVAL);
221         }
222 
223         // 获取全局hash表的key值
224         let key = Self::get_futex_key(
225             uaddr,
226             flags.contains(FutexFlag::FLAGS_SHARED),
227             FutexAccess::FutexRead,
228         )?;
229 
230         let mut futex_map_guard = FutexData::futex_map();
231         let bucket = futex_map_guard.get_mut(&key);
232         let bucket_mut = match bucket {
233             Some(bucket) => bucket,
234             None => {
235                 let bucket = FutexHashBucket {
236                     chain: LinkedList::new(),
237                 };
238                 futex_map_guard.insert(key.clone(), bucket);
239                 futex_map_guard.get_mut(&key).unwrap()
240             }
241         };
242 
243         // 使用UserBuffer读取futex
244         let user_reader =
245             UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
246 
247         // 从用户空间读取到futex的val
248         let mut uval = 0;
249 
250         // 读取
251         // 这里只尝试一种方式去读取用户空间,与linux不太一致
252         // 对于linux,如果bucket被锁住时读取失败,将会将bucket解锁后重新读取
253         user_reader.copy_one_from_user::<u32>(&mut uval, 0)?;
254 
255         // 不满足wait条件,返回错误
256         if uval != val {
257             return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
258         }
259 
260         let pcb = ProcessManager::current_pcb();
261         // 创建超时计时器任务
262         let mut timer = None;
263         if !abs_time.is_none() {
264             let time = abs_time.unwrap();
265 
266             let wakeup_helper = WakeUpHelper::new(pcb.clone());
267 
268             let sec = time.tv_sec;
269             let nsec = time.tv_nsec;
270             let jiffies = next_n_us_timer_jiffies((nsec / 1000 + sec * 1_000_000) as u64);
271 
272             let wake_up = Timer::new(wakeup_helper, jiffies);
273 
274             wake_up.activate();
275             timer = Some(wake_up);
276         }
277 
278         let futex_q = Arc::new(FutexObj {
279             pcb: Arc::downgrade(&pcb),
280             key: key.clone(),
281             bitset,
282         });
283         let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
284         // 满足条件则将当前进程在该bucket上挂起
285         bucket_mut.sleep_no_sched(futex_q.clone()).map_err(|e| {
286             kwarn!("error:{e:?}");
287             e
288         })?;
289         drop(futex_map_guard);
290         drop(irq_guard);
291         sched();
292 
293         // 被唤醒后的检查
294         let mut futex_map_guard = FutexData::futex_map();
295         let bucket = futex_map_guard.get_mut(&key);
296         let bucket_mut = match bucket {
297             // 如果该pcb不在链表里面了或者该链表已经被释放,就证明是正常的Wake操作
298             Some(bucket_mut) => {
299                 if !bucket_mut.contains(&futex_q) {
300                     // 取消定时器任务
301                     if timer.is_some() {
302                         timer.unwrap().cancel();
303                     }
304                     return Ok(0);
305                 }
306                 // 非正常唤醒,返回交给下层
307                 bucket_mut
308             }
309             None => {
310                 // 取消定时器任务
311                 if timer.is_some() {
312                     timer.unwrap().cancel();
313                 }
314                 return Ok(0);
315             }
316         };
317 
318         // 如果是超时唤醒,则返回错误
319         if timer.is_some() {
320             if timer.clone().unwrap().timeout() {
321                 bucket_mut.remove(futex_q);
322 
323                 return Err(SystemError::ETIMEDOUT);
324             }
325         }
326 
327         // TODO: 如果没有挂起的信号,则重新判断是否满足wait要求,重新进入wait
328 
329         // 经过前面的几个判断,到这里之后,
330         // 当前进程被唤醒大概率是其他进程更改了uval,需要重新去判断当前进程是否满足wait
331 
332         // 到这里之后,前面的唤醒条件都不满足,则是被信号唤醒
333         // 需要处理信号然后重启futex系统调用
334 
335         // 取消定时器任务
336         if timer.is_some() {
337             let timer = timer.unwrap();
338             if !timer.timeout() {
339                 timer.cancel();
340             }
341         }
342         Ok(0)
343     }
344 
345     // ### 唤醒指定futex上挂起的最多nr_wake个进程
346     pub fn futex_wake(
347         uaddr: VirtAddr,
348         flags: FutexFlag,
349         nr_wake: u32,
350         bitset: u32,
351     ) -> Result<usize, SystemError> {
352         if bitset == 0 {
353             return Err(SystemError::EINVAL);
354         }
355 
356         // 获取futex_key,并且判断地址空间合法性
357         let key = Self::get_futex_key(
358             uaddr,
359             flags.contains(FutexFlag::FLAGS_SHARED),
360             FutexAccess::FutexRead,
361         )?;
362         let mut binding = FutexData::futex_map();
363         let bucket_mut = binding.get_mut(&key).ok_or(SystemError::EINVAL)?;
364 
365         // 确保后面的唤醒操作是有意义的
366         if bucket_mut.chain.is_empty() {
367             return Ok(0);
368         }
369         // 从队列中唤醒
370         let count = bucket_mut.wake_up(key.clone(), Some(bitset), nr_wake)?;
371 
372         drop(binding);
373 
374         FutexData::try_remove(&key);
375 
376         Ok(count)
377     }
378 
379     /// ### 唤醒制定uaddr1上的最多nr_wake个进程,然后将uaddr1最多nr_requeue个进程移动到uaddr2绑定的futex上
380     pub fn futex_requeue(
381         uaddr1: VirtAddr,
382         flags: FutexFlag,
383         uaddr2: VirtAddr,
384         nr_wake: i32,
385         nr_requeue: i32,
386         cmpval: Option<u32>,
387         requeue_pi: bool,
388     ) -> Result<usize, SystemError> {
389         if nr_requeue < 0 || nr_wake < 0 {
390             return Err(SystemError::EINVAL);
391         }
392 
393         // 暂时不支持优先级继承
394         if requeue_pi {
395             return Err(SystemError::ENOSYS);
396         }
397 
398         let key1 = Self::get_futex_key(
399             uaddr1,
400             flags.contains(FutexFlag::FLAGS_SHARED),
401             FutexAccess::FutexRead,
402         )?;
403         let key2 = Self::get_futex_key(uaddr2, flags.contains(FutexFlag::FLAGS_SHARED), {
404             match requeue_pi {
405                 true => FutexAccess::FutexWrite,
406                 false => FutexAccess::FutexRead,
407             }
408         })?;
409 
410         if requeue_pi && key1 == key2 {
411             return Err(SystemError::EINVAL);
412         }
413 
414         if likely(!cmpval.is_none()) {
415             let uval_reader =
416                 UserBufferReader::new(uaddr1.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
417             let curval = uval_reader.read_one_from_user::<u32>(0)?;
418 
419             // 判断是否满足条件
420             if *curval != cmpval.unwrap() {
421                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
422             }
423         }
424 
425         let mut futex_data_guard = FutexData::futex_map();
426         if !requeue_pi {
427             // 唤醒nr_wake个进程
428             let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
429             let ret = bucket_1_mut.wake_up(key1.clone(), None, nr_wake as u32)?;
430             // 将bucket1中最多nr_requeue个任务转移到bucket2
431             for _ in 0..nr_requeue {
432                 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
433                 let futex_q = bucket_1_mut.chain.pop_front();
434                 match futex_q {
435                     Some(futex_q) => {
436                         let bucket_2_mut =
437                             futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?;
438                         bucket_2_mut.chain.push_back(futex_q);
439                     }
440                     None => {
441                         break;
442                     }
443                 }
444             }
445 
446             return Ok(ret);
447         } else {
448             // 暂时不支持优先级继承
449             todo!()
450         }
451     }
452 
453     /// ### 唤醒futex上的进程的同时进行一些操作
454     pub fn futex_wake_op(
455         uaddr1: VirtAddr,
456         flags: FutexFlag,
457         uaddr2: VirtAddr,
458         nr_wake: i32,
459         nr_wake2: i32,
460         op: i32,
461     ) -> Result<usize, SystemError> {
462         let key1 = Futex::get_futex_key(
463             uaddr1,
464             flags.contains(FutexFlag::FLAGS_SHARED),
465             FutexAccess::FutexRead,
466         )?;
467         let key2 = Futex::get_futex_key(
468             uaddr2,
469             flags.contains(FutexFlag::FLAGS_SHARED),
470             FutexAccess::FutexWrite,
471         )?;
472 
473         let mut futex_data_guard = FutexData::futex_map();
474         let bucket1 = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
475         let mut wake_count = 0;
476 
477         // 唤醒uaddr1中的进程
478         wake_count += bucket1.wake_up(key1, None, nr_wake as u32)?;
479 
480         match Self::futex_atomic_op_inuser(op as u32, uaddr2) {
481             Ok(ret) => {
482                 // 操作成功则唤醒uaddr2中的进程
483                 if ret {
484                     let bucket2 = futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?;
485                     wake_count += bucket2.wake_up(key2, None, nr_wake2 as u32)?;
486                 }
487             }
488             Err(e) => {
489                 // TODO:retry?
490                 return Err(e);
491             }
492         }
493 
494         Ok(wake_count)
495     }
496 
497     fn get_futex_key(
498         uaddr: VirtAddr,
499         fshared: bool,
500         _access: FutexAccess,
501     ) -> Result<FutexKey, SystemError> {
502         let mut address = uaddr.data();
503 
504         // 计算相对页的偏移量
505         let offset = address & (MMArch::PAGE_SIZE - 1);
506         // 判断内存对齐
507         if !(uaddr.data() & (core::mem::size_of::<u32>() - 1) == 0) {
508             return Err(SystemError::EINVAL);
509         }
510 
511         // 目前address指向所在页面的起始地址
512         address -= offset;
513 
514         // 若不是进程间共享的futex,则返回Private
515         if !fshared {
516             return Ok(FutexKey {
517                 ptr: 0,
518                 word: 0,
519                 offset: offset as u32,
520                 key: InnerFutexKey::Private(PrivateKey {
521                     address: address as u64,
522                     address_space: None,
523                 }),
524             });
525         }
526 
527         // 获取到地址所在地址空间
528         let address_space = AddressSpace::current()?;
529         // TODO: 判断是否为匿名映射,是匿名映射才返回PrivateKey
530         return Ok(FutexKey {
531             ptr: 0,
532             word: 0,
533             offset: offset as u32,
534             key: InnerFutexKey::Private(PrivateKey {
535                 address: address as u64,
536                 address_space: Some(Arc::downgrade(&address_space)),
537             }),
538         });
539 
540         // 未实现共享内存机制,贡献内存部分应该通过inode构建SharedKey
541         // todo!("Shared memory not implemented");
542     }
543 
544     pub fn futex_atomic_op_inuser(encoded_op: u32, uaddr: VirtAddr) -> Result<bool, SystemError> {
545         let op = FutexOP::from_bits((encoded_op & 0x70000000) >> 28).ok_or(SystemError::ENOSYS)?;
546         let cmp =
547             FutexOpCMP::from_bits((encoded_op & 0x0f000000) >> 24).ok_or(SystemError::ENOSYS)?;
548 
549         let sign_extend32 = |value: u32, index: i32| {
550             let shift = (31 - index) as u8;
551             return (value << shift) >> shift;
552         };
553 
554         let mut oparg = sign_extend32((encoded_op & 0x00fff000) >> 12, 11);
555         let cmparg = sign_extend32(encoded_op & 0x00000fff, 11);
556 
557         if encoded_op & (FutexOP::FUTEX_OP_OPARG_SHIFT.bits() << 28) != 0 {
558             if oparg > 31 {
559                 kwarn!(
560                     "futex_wake_op: pid:{} tries to shift op by {}; fix this program",
561                     ProcessManager::current_pcb().pid().data(),
562                     oparg
563                 );
564 
565                 oparg &= 31;
566             }
567         }
568 
569         // TODO: 这个汇编似乎是有问题的,目前不好测试
570         let old_val = Self::arch_futex_atomic_op_inuser(op, oparg, uaddr)?;
571 
572         match cmp {
573             FutexOpCMP::FUTEX_OP_CMP_EQ => {
574                 return Ok(cmparg == old_val);
575             }
576             FutexOpCMP::FUTEX_OP_CMP_NE => {
577                 return Ok(cmparg != old_val);
578             }
579             FutexOpCMP::FUTEX_OP_CMP_LT => {
580                 return Ok(cmparg < old_val);
581             }
582             FutexOpCMP::FUTEX_OP_CMP_LE => {
583                 return Ok(cmparg <= old_val);
584             }
585             FutexOpCMP::FUTEX_OP_CMP_GE => {
586                 return Ok(cmparg >= old_val);
587             }
588             FutexOpCMP::FUTEX_OP_CMP_GT => {
589                 return Ok(cmparg > old_val);
590             }
591             _ => {
592                 return Err(SystemError::ENOSYS);
593             }
594         }
595     }
596 
597     /// ### 对futex进行操作
598     ///
599     /// 进入该方法会关闭中断保证修改的原子性,所以进入该方法前应确保中断锁已释放
600     ///
601     /// ### return uaddr原来的值
602     #[allow(unused_assignments)]
603     pub fn arch_futex_atomic_op_inuser(
604         op: FutexOP,
605         oparg: u32,
606         uaddr: VirtAddr,
607     ) -> Result<u32, SystemError> {
608         let guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
609 
610         let reader =
611             UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
612 
613         let oldval = reader.read_one_from_user::<u32>(0)?;
614 
615         let atomic_addr = AtomicU64::new(uaddr.data() as u64);
616         // 这个指针是指向指针的指针
617         let ptr = atomic_addr.as_ptr();
618         match op {
619             FutexOP::FUTEX_OP_SET => unsafe {
620                 *((*ptr) as *mut u32) = oparg;
621             },
622             FutexOP::FUTEX_OP_ADD => unsafe {
623                 *((*ptr) as *mut u32) += oparg;
624             },
625             FutexOP::FUTEX_OP_OR => unsafe {
626                 *((*ptr) as *mut u32) |= oparg;
627             },
628             FutexOP::FUTEX_OP_ANDN => unsafe {
629                 *((*ptr) as *mut u32) &= oparg;
630             },
631             FutexOP::FUTEX_OP_XOR => unsafe {
632                 *((*ptr) as *mut u32) ^= oparg;
633             },
634             _ => return Err(SystemError::ENOSYS),
635         }
636 
637         drop(guard);
638 
639         Ok(*oldval)
640     }
641 }
642 
643 #[no_mangle]
644 unsafe extern "C" fn rs_futex_init() {
645     Futex::init();
646 }
647