xref: /DragonOS/kernel/src/libs/futex/futex.rs (revision f0c87a897fe813b7f06bf5a9e93c43ad9519dafd)
1 use alloc::{
2     collections::LinkedList,
3     sync::{Arc, Weak},
4 };
5 use core::hash::{Hash, Hasher};
6 use core::{intrinsics::likely, sync::atomic::AtomicU64};
7 use hashbrown::HashMap;
8 use system_error::SystemError;
9 
10 use crate::{
11     arch::{CurrentIrqArch, MMArch},
12     exception::InterruptArch,
13     libs::spinlock::{SpinLock, SpinLockGuard},
14     mm::{ucontext::AddressSpace, MemoryManagementArch, VirtAddr},
15     process::{ProcessControlBlock, ProcessManager},
16     sched::{schedule, SchedMode},
17     syscall::user_access::UserBufferReader,
18     time::{
19         timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper},
20         TimeSpec,
21     },
22 };
23 
24 use super::constant::*;
25 
26 static mut FUTEX_DATA: Option<FutexData> = None;
27 
28 pub struct FutexData {
29     data: SpinLock<HashMap<FutexKey, FutexHashBucket>>,
30 }
31 
32 impl FutexData {
33     pub fn futex_map() -> SpinLockGuard<'static, HashMap<FutexKey, FutexHashBucket>> {
34         unsafe { FUTEX_DATA.as_ref().unwrap().data.lock() }
35     }
36 
37     pub fn try_remove(key: &FutexKey) -> Option<FutexHashBucket> {
38         unsafe {
39             let mut guard = FUTEX_DATA.as_ref().unwrap().data.lock();
40             if let Some(futex) = guard.get(key) {
41                 if futex.chain.is_empty() {
42                     return guard.remove(key);
43                 }
44             }
45         }
46         None
47     }
48 }
49 
50 pub struct Futex;
51 
52 // 对于同一个futex的进程或线程将会在这个bucket等待
53 pub struct FutexHashBucket {
54     // 该futex维护的等待队列
55     chain: LinkedList<Arc<FutexObj>>,
56 }
57 
58 impl FutexHashBucket {
59     /// ## 判断是否在bucket里
60     pub fn contains(&self, futex_q: &FutexObj) -> bool {
61         self.chain
62             .iter()
63             .filter(|x| futex_q.pcb.ptr_eq(&x.pcb) && x.key == futex_q.key)
64             .count()
65             != 0
66     }
67 
68     /// 让futex_q在该bucket上挂起
69     ///
70     /// 进入该函数前,需要关中断
71     #[inline(always)]
72     pub fn sleep_no_sched(&mut self, futex_q: Arc<FutexObj>) -> Result<(), SystemError> {
73         assert!(!CurrentIrqArch::is_irq_enabled());
74         self.chain.push_back(futex_q);
75 
76         ProcessManager::mark_sleep(true)?;
77 
78         Ok(())
79     }
80 
81     /// ## 唤醒队列中的最多nr_wake个进程
82     ///
83     /// return: 唤醒的进程数
84     #[inline(always)]
85     pub fn wake_up(
86         &mut self,
87         key: FutexKey,
88         bitset: Option<u32>,
89         nr_wake: u32,
90     ) -> Result<usize, SystemError> {
91         let mut count = 0;
92         let mut pop_count = 0;
93         while let Some(futex_q) = self.chain.pop_front() {
94             if futex_q.key == key {
95                 // TODO: 考虑优先级继承的机制
96 
97                 if let Some(bitset) = bitset {
98                     if futex_q.bitset != bitset {
99                         self.chain.push_back(futex_q);
100                         continue;
101                     }
102                 }
103 
104                 // 唤醒
105                 if futex_q.pcb.upgrade().is_some() {
106                     self.remove(futex_q.clone());
107                     ProcessManager::wakeup(&futex_q.pcb.upgrade().unwrap())?;
108                 }
109 
110                 // 判断唤醒数
111                 count += 1;
112                 if count >= nr_wake {
113                     break;
114                 }
115             } else {
116                 self.chain.push_back(futex_q);
117             }
118             // 判断是否循环完队列了
119             pop_count += 1;
120             if pop_count >= self.chain.len() {
121                 break;
122             }
123         }
124         Ok(count as usize)
125     }
126 
127     /// 将FutexObj从bucket中删除
128     pub fn remove(&mut self, futex: Arc<FutexObj>) {
129         self.chain
130             .extract_if(|x| Arc::ptr_eq(x, &futex))
131             .for_each(drop);
132     }
133 }
134 
135 #[derive(Debug)]
136 pub struct FutexObj {
137     pcb: Weak<ProcessControlBlock>,
138     key: FutexKey,
139     bitset: u32,
140     // TODO: 优先级继承
141 }
142 
143 pub enum FutexAccess {
144     FutexRead,
145     FutexWrite,
146 }
147 
148 #[allow(dead_code)]
149 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
150 /// ### 用于定位内核唯一的futex
151 pub enum InnerFutexKey {
152     Shared(SharedKey),
153     Private(PrivateKey),
154 }
155 
156 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
157 pub struct FutexKey {
158     ptr: u64,
159     word: u64,
160     offset: u32,
161     key: InnerFutexKey,
162 }
163 
164 /// 不同进程间通过文件共享futex变量,表明该变量在文件中的位置
165 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
166 pub struct SharedKey {
167     i_seq: u64,
168     page_offset: u64,
169 }
170 
171 /// 同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置
172 #[derive(Clone, Debug)]
173 pub struct PrivateKey {
174     // 所在的地址空间
175     address_space: Option<Weak<AddressSpace>>,
176     // 表示所在页面的初始地址
177     address: u64,
178 }
179 
180 impl Hash for PrivateKey {
181     fn hash<H: Hasher>(&self, state: &mut H) {
182         self.address.hash(state);
183     }
184 }
185 
186 impl Eq for PrivateKey {}
187 
188 impl PartialEq for PrivateKey {
189     fn eq(&self, other: &Self) -> bool {
190         if self.address_space.is_none() && other.address_space.is_none() {
191             return self.address == other.address;
192         } else {
193             return self
194                 .address_space
195                 .as_ref()
196                 .unwrap_or(&Weak::default())
197                 .ptr_eq(other.address_space.as_ref().unwrap_or(&Weak::default()))
198                 && self.address == other.address;
199         }
200     }
201 }
202 
203 impl Futex {
204     /// ### 初始化FUTEX_DATA
205     pub fn init() {
206         unsafe {
207             FUTEX_DATA = Some(FutexData {
208                 data: SpinLock::new(HashMap::new()),
209             })
210         };
211     }
212 
213     /// ### 让当前进程在指定futex上等待直到futex_wake显式唤醒
214     pub fn futex_wait(
215         uaddr: VirtAddr,
216         flags: FutexFlag,
217         val: u32,
218         abs_time: Option<TimeSpec>,
219         bitset: u32,
220     ) -> Result<usize, SystemError> {
221         if bitset == 0 {
222             return Err(SystemError::EINVAL);
223         }
224 
225         // 获取全局hash表的key值
226         let key = Self::get_futex_key(
227             uaddr,
228             flags.contains(FutexFlag::FLAGS_SHARED),
229             FutexAccess::FutexRead,
230         )?;
231 
232         let mut futex_map_guard = FutexData::futex_map();
233         let bucket = futex_map_guard.get_mut(&key);
234         let bucket_mut = match bucket {
235             Some(bucket) => bucket,
236             None => {
237                 let bucket = FutexHashBucket {
238                     chain: LinkedList::new(),
239                 };
240                 futex_map_guard.insert(key.clone(), bucket);
241                 futex_map_guard.get_mut(&key).unwrap()
242             }
243         };
244 
245         // 使用UserBuffer读取futex
246         let user_reader =
247             UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
248 
249         // 从用户空间读取到futex的val
250         let mut uval = 0;
251 
252         // 读取
253         // 这里只尝试一种方式去读取用户空间,与linux不太一致
254         // 对于linux,如果bucket被锁住时读取失败,将会将bucket解锁后重新读取
255         user_reader.copy_one_from_user::<u32>(&mut uval, 0)?;
256 
257         // 不满足wait条件,返回错误
258         if uval != val {
259             return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
260         }
261 
262         let pcb = ProcessManager::current_pcb();
263         // 创建超时计时器任务
264         let mut timer = None;
265         if let Some(time) = abs_time {
266             let wakeup_helper = WakeUpHelper::new(pcb.clone());
267 
268             let sec = time.tv_sec;
269             let nsec = time.tv_nsec;
270             let jiffies = next_n_us_timer_jiffies((nsec / 1000 + sec * 1_000_000) as u64);
271 
272             let wake_up = Timer::new(wakeup_helper, jiffies);
273 
274             wake_up.activate();
275             timer = Some(wake_up);
276         }
277 
278         let futex_q = Arc::new(FutexObj {
279             pcb: Arc::downgrade(&pcb),
280             key: key.clone(),
281             bitset,
282         });
283         let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
284         // 满足条件则将当前进程在该bucket上挂起
285         bucket_mut.sleep_no_sched(futex_q.clone()).map_err(|e| {
286             kwarn!("error:{e:?}");
287             e
288         })?;
289         drop(futex_map_guard);
290         drop(irq_guard);
291         schedule(SchedMode::SM_NONE);
292 
293         // 被唤醒后的检查
294         let mut futex_map_guard = FutexData::futex_map();
295         let bucket = futex_map_guard.get_mut(&key);
296         let bucket_mut = match bucket {
297             // 如果该pcb不在链表里面了或者该链表已经被释放,就证明是正常的Wake操作
298             Some(bucket_mut) => {
299                 if !bucket_mut.contains(&futex_q) {
300                     // 取消定时器任务
301                     if let Some(timer) = timer {
302                         timer.cancel();
303                     }
304                     return Ok(0);
305                 }
306                 // 非正常唤醒,返回交给下层
307                 bucket_mut
308             }
309             None => {
310                 // 取消定时器任务
311                 if let Some(timer) = timer {
312                     timer.cancel();
313                 }
314                 return Ok(0);
315             }
316         };
317 
318         // 如果是超时唤醒,则返回错误
319         if timer.is_some() && timer.clone().unwrap().timeout() {
320             bucket_mut.remove(futex_q);
321 
322             return Err(SystemError::ETIMEDOUT);
323         }
324 
325         // TODO: 如果没有挂起的信号,则重新判断是否满足wait要求,重新进入wait
326 
327         // 经过前面的几个判断,到这里之后,
328         // 当前进程被唤醒大概率是其他进程更改了uval,需要重新去判断当前进程是否满足wait
329 
330         // 到这里之后,前面的唤醒条件都不满足,则是被信号唤醒
331         // 需要处理信号然后重启futex系统调用
332 
333         // 取消定时器任务
334         if let Some(timer) = timer {
335             if !timer.timeout() {
336                 timer.cancel();
337             }
338         }
339 
340         Ok(0)
341     }
342 
343     // ### 唤醒指定futex上挂起的最多nr_wake个进程
344     pub fn futex_wake(
345         uaddr: VirtAddr,
346         flags: FutexFlag,
347         nr_wake: u32,
348         bitset: u32,
349     ) -> Result<usize, SystemError> {
350         if bitset == 0 {
351             return Err(SystemError::EINVAL);
352         }
353 
354         // 获取futex_key,并且判断地址空间合法性
355         let key = Self::get_futex_key(
356             uaddr,
357             flags.contains(FutexFlag::FLAGS_SHARED),
358             FutexAccess::FutexRead,
359         )?;
360         let mut binding = FutexData::futex_map();
361         let bucket_mut = binding.get_mut(&key).ok_or(SystemError::EINVAL)?;
362 
363         // 确保后面的唤醒操作是有意义的
364         if bucket_mut.chain.is_empty() {
365             return Ok(0);
366         }
367         // 从队列中唤醒
368         let count = bucket_mut.wake_up(key.clone(), Some(bitset), nr_wake)?;
369 
370         drop(binding);
371 
372         FutexData::try_remove(&key);
373 
374         Ok(count)
375     }
376 
377     /// ### 唤醒制定uaddr1上的最多nr_wake个进程,然后将uaddr1最多nr_requeue个进程移动到uaddr2绑定的futex上
378     pub fn futex_requeue(
379         uaddr1: VirtAddr,
380         flags: FutexFlag,
381         uaddr2: VirtAddr,
382         nr_wake: i32,
383         nr_requeue: i32,
384         cmpval: Option<u32>,
385         requeue_pi: bool,
386     ) -> Result<usize, SystemError> {
387         if nr_requeue < 0 || nr_wake < 0 {
388             return Err(SystemError::EINVAL);
389         }
390 
391         // 暂时不支持优先级继承
392         if requeue_pi {
393             return Err(SystemError::ENOSYS);
394         }
395 
396         let key1 = Self::get_futex_key(
397             uaddr1,
398             flags.contains(FutexFlag::FLAGS_SHARED),
399             FutexAccess::FutexRead,
400         )?;
401         let key2 = Self::get_futex_key(uaddr2, flags.contains(FutexFlag::FLAGS_SHARED), {
402             match requeue_pi {
403                 true => FutexAccess::FutexWrite,
404                 false => FutexAccess::FutexRead,
405             }
406         })?;
407 
408         if requeue_pi && key1 == key2 {
409             return Err(SystemError::EINVAL);
410         }
411 
412         if likely(cmpval.is_some()) {
413             let uval_reader =
414                 UserBufferReader::new(uaddr1.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
415             let curval = uval_reader.read_one_from_user::<u32>(0)?;
416 
417             // 判断是否满足条件
418             if *curval != cmpval.unwrap() {
419                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
420             }
421         }
422 
423         let mut futex_data_guard = FutexData::futex_map();
424         if !requeue_pi {
425             // 唤醒nr_wake个进程
426             let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
427             let ret = bucket_1_mut.wake_up(key1.clone(), None, nr_wake as u32)?;
428             // 将bucket1中最多nr_requeue个任务转移到bucket2
429             for _ in 0..nr_requeue {
430                 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
431                 let futex_q = bucket_1_mut.chain.pop_front();
432                 match futex_q {
433                     Some(futex_q) => {
434                         let bucket_2_mut =
435                             futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?;
436                         bucket_2_mut.chain.push_back(futex_q);
437                     }
438                     None => {
439                         break;
440                     }
441                 }
442             }
443 
444             return Ok(ret);
445         } else {
446             // 暂时不支持优先级继承
447             todo!()
448         }
449     }
450 
451     /// ### 唤醒futex上的进程的同时进行一些操作
452     pub fn futex_wake_op(
453         uaddr1: VirtAddr,
454         flags: FutexFlag,
455         uaddr2: VirtAddr,
456         nr_wake: i32,
457         nr_wake2: i32,
458         op: i32,
459     ) -> Result<usize, SystemError> {
460         let key1 = Futex::get_futex_key(
461             uaddr1,
462             flags.contains(FutexFlag::FLAGS_SHARED),
463             FutexAccess::FutexRead,
464         )?;
465         let key2 = Futex::get_futex_key(
466             uaddr2,
467             flags.contains(FutexFlag::FLAGS_SHARED),
468             FutexAccess::FutexWrite,
469         )?;
470 
471         let mut futex_data_guard = FutexData::futex_map();
472         let bucket1 = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
473         let mut wake_count = 0;
474 
475         // 唤醒uaddr1中的进程
476         wake_count += bucket1.wake_up(key1, None, nr_wake as u32)?;
477 
478         match Self::futex_atomic_op_inuser(op as u32, uaddr2) {
479             Ok(ret) => {
480                 // 操作成功则唤醒uaddr2中的进程
481                 if ret {
482                     let bucket2 = futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?;
483                     wake_count += bucket2.wake_up(key2, None, nr_wake2 as u32)?;
484                 }
485             }
486             Err(e) => {
487                 // TODO:retry?
488                 return Err(e);
489             }
490         }
491 
492         Ok(wake_count)
493     }
494 
495     fn get_futex_key(
496         uaddr: VirtAddr,
497         fshared: bool,
498         _access: FutexAccess,
499     ) -> Result<FutexKey, SystemError> {
500         let mut address = uaddr.data();
501 
502         // 计算相对页的偏移量
503         let offset = address & (MMArch::PAGE_SIZE - 1);
504         // 判断内存对齐
505         if uaddr.data() & (core::mem::size_of::<u32>() - 1) != 0 {
506             return Err(SystemError::EINVAL);
507         }
508 
509         // 目前address指向所在页面的起始地址
510         address -= offset;
511 
512         // 若不是进程间共享的futex,则返回Private
513         if !fshared {
514             return Ok(FutexKey {
515                 ptr: 0,
516                 word: 0,
517                 offset: offset as u32,
518                 key: InnerFutexKey::Private(PrivateKey {
519                     address: address as u64,
520                     address_space: None,
521                 }),
522             });
523         }
524 
525         // 获取到地址所在地址空间
526         let address_space = AddressSpace::current()?;
527         // TODO: 判断是否为匿名映射,是匿名映射才返回PrivateKey
528         return Ok(FutexKey {
529             ptr: 0,
530             word: 0,
531             offset: offset as u32,
532             key: InnerFutexKey::Private(PrivateKey {
533                 address: address as u64,
534                 address_space: Some(Arc::downgrade(&address_space)),
535             }),
536         });
537 
538         // 未实现共享内存机制,贡献内存部分应该通过inode构建SharedKey
539         // todo!("Shared memory not implemented");
540     }
541 
542     pub fn futex_atomic_op_inuser(encoded_op: u32, uaddr: VirtAddr) -> Result<bool, SystemError> {
543         let op = FutexOP::from_bits((encoded_op & 0x70000000) >> 28).ok_or(SystemError::ENOSYS)?;
544         let cmp =
545             FutexOpCMP::from_bits((encoded_op & 0x0f000000) >> 24).ok_or(SystemError::ENOSYS)?;
546 
547         let sign_extend32 = |value: u32, index: i32| {
548             let shift = (31 - index) as u8;
549             return (value << shift) >> shift;
550         };
551 
552         let mut oparg = sign_extend32((encoded_op & 0x00fff000) >> 12, 11);
553         let cmparg = sign_extend32(encoded_op & 0x00000fff, 11);
554 
555         if (encoded_op & (FutexOP::FUTEX_OP_OPARG_SHIFT.bits() << 28) != 0) && oparg > 31 {
556             kwarn!(
557                 "futex_wake_op: pid:{} tries to shift op by {}; fix this program",
558                 ProcessManager::current_pcb().pid().data(),
559                 oparg
560             );
561 
562             oparg &= 31;
563         }
564 
565         // TODO: 这个汇编似乎是有问题的,目前不好测试
566         let old_val = Self::arch_futex_atomic_op_inuser(op, oparg, uaddr)?;
567 
568         match cmp {
569             FutexOpCMP::FUTEX_OP_CMP_EQ => {
570                 return Ok(cmparg == old_val);
571             }
572             FutexOpCMP::FUTEX_OP_CMP_NE => {
573                 return Ok(cmparg != old_val);
574             }
575             FutexOpCMP::FUTEX_OP_CMP_LT => {
576                 return Ok(cmparg < old_val);
577             }
578             FutexOpCMP::FUTEX_OP_CMP_LE => {
579                 return Ok(cmparg <= old_val);
580             }
581             FutexOpCMP::FUTEX_OP_CMP_GE => {
582                 return Ok(cmparg >= old_val);
583             }
584             FutexOpCMP::FUTEX_OP_CMP_GT => {
585                 return Ok(cmparg > old_val);
586             }
587             _ => {
588                 return Err(SystemError::ENOSYS);
589             }
590         }
591     }
592 
593     /// ### 对futex进行操作
594     ///
595     /// 进入该方法会关闭中断保证修改的原子性,所以进入该方法前应确保中断锁已释放
596     ///
597     /// ### return uaddr原来的值
598     #[allow(unused_assignments)]
599     pub fn arch_futex_atomic_op_inuser(
600         op: FutexOP,
601         oparg: u32,
602         uaddr: VirtAddr,
603     ) -> Result<u32, SystemError> {
604         let guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
605 
606         let reader =
607             UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
608 
609         let oldval = reader.read_one_from_user::<u32>(0)?;
610 
611         let atomic_addr = AtomicU64::new(uaddr.data() as u64);
612         // 这个指针是指向指针的指针
613         let ptr = atomic_addr.as_ptr();
614         match op {
615             FutexOP::FUTEX_OP_SET => unsafe {
616                 *((*ptr) as *mut u32) = oparg;
617             },
618             FutexOP::FUTEX_OP_ADD => unsafe {
619                 *((*ptr) as *mut u32) += oparg;
620             },
621             FutexOP::FUTEX_OP_OR => unsafe {
622                 *((*ptr) as *mut u32) |= oparg;
623             },
624             FutexOP::FUTEX_OP_ANDN => unsafe {
625                 *((*ptr) as *mut u32) &= oparg;
626             },
627             FutexOP::FUTEX_OP_XOR => unsafe {
628                 *((*ptr) as *mut u32) ^= oparg;
629             },
630             _ => return Err(SystemError::ENOSYS),
631         }
632 
633         drop(guard);
634 
635         Ok(*oldval)
636     }
637 }
638