xref: /DragonOS/kernel/src/libs/futex/futex.rs (revision da152319797436368304cbc3f85a3b9ec049134b)
1 use alloc::{
2     collections::LinkedList,
3     sync::{Arc, Weak},
4 };
5 use core::hash::{Hash, Hasher};
6 use core::{intrinsics::likely, sync::atomic::AtomicU64};
7 use hashbrown::HashMap;
8 use system_error::SystemError;
9 
10 use crate::{
11     arch::{sched::sched, CurrentIrqArch, MMArch},
12     exception::InterruptArch,
13     libs::spinlock::{SpinLock, SpinLockGuard},
14     mm::{ucontext::AddressSpace, MemoryManagementArch, VirtAddr},
15     process::{ProcessControlBlock, ProcessManager},
16     syscall::user_access::UserBufferReader,
17     time::{
18         timer::{next_n_us_timer_jiffies, Timer, WakeUpHelper},
19         TimeSpec,
20     },
21 };
22 
23 use super::constant::*;
24 
25 static mut FUTEX_DATA: Option<FutexData> = None;
26 
27 pub struct FutexData {
28     data: SpinLock<HashMap<FutexKey, FutexHashBucket>>,
29 }
30 
31 impl FutexData {
32     pub fn futex_map() -> SpinLockGuard<'static, HashMap<FutexKey, FutexHashBucket>> {
33         unsafe { FUTEX_DATA.as_ref().unwrap().data.lock() }
34     }
35 
36     pub fn try_remove(key: &FutexKey) -> Option<FutexHashBucket> {
37         unsafe {
38             let mut guard = FUTEX_DATA.as_ref().unwrap().data.lock();
39             if let Some(futex) = guard.get(key) {
40                 if futex.chain.is_empty() {
41                     return guard.remove(key);
42                 }
43             }
44         }
45         None
46     }
47 }
48 
49 pub struct Futex;
50 
51 // 对于同一个futex的进程或线程将会在这个bucket等待
52 pub struct FutexHashBucket {
53     // 该futex维护的等待队列
54     chain: LinkedList<Arc<FutexObj>>,
55 }
56 
57 impl FutexHashBucket {
58     /// ## 判断是否在bucket里
59     pub fn contains(&self, futex_q: &FutexObj) -> bool {
60         self.chain
61             .iter()
62             .filter(|x| futex_q.pcb.ptr_eq(&x.pcb) && x.key == futex_q.key)
63             .count()
64             != 0
65     }
66 
67     /// 让futex_q在该bucket上挂起
68     ///
69     /// 进入该函数前,需要关中断
70     #[inline(always)]
71     pub fn sleep_no_sched(&mut self, futex_q: Arc<FutexObj>) -> Result<(), SystemError> {
72         assert!(!CurrentIrqArch::is_irq_enabled());
73         self.chain.push_back(futex_q);
74 
75         ProcessManager::mark_sleep(true)?;
76 
77         Ok(())
78     }
79 
80     /// ## 唤醒队列中的最多nr_wake个进程
81     ///
82     /// return: 唤醒的进程数
83     #[inline(always)]
84     pub fn wake_up(
85         &mut self,
86         key: FutexKey,
87         bitset: Option<u32>,
88         nr_wake: u32,
89     ) -> Result<usize, SystemError> {
90         let mut count = 0;
91         let mut pop_count = 0;
92         while let Some(futex_q) = self.chain.pop_front() {
93             if futex_q.key == key {
94                 // TODO: 考虑优先级继承的机制
95 
96                 if let Some(bitset) = bitset {
97                     if futex_q.bitset != bitset {
98                         self.chain.push_back(futex_q);
99                         continue;
100                     }
101                 }
102 
103                 // 唤醒
104                 if futex_q.pcb.upgrade().is_some() {
105                     self.remove(futex_q.clone());
106                     ProcessManager::wakeup(&futex_q.pcb.upgrade().unwrap())?;
107                 }
108 
109                 // 判断唤醒数
110                 count += 1;
111                 if count >= nr_wake {
112                     break;
113                 }
114             } else {
115                 self.chain.push_back(futex_q);
116             }
117             // 判断是否循环完队列了
118             pop_count += 1;
119             if pop_count >= self.chain.len() {
120                 break;
121             }
122         }
123         Ok(count as usize)
124     }
125 
126     /// 将FutexObj从bucket中删除
127     pub fn remove(&mut self, futex: Arc<FutexObj>) {
128         self.chain
129             .extract_if(|x| Arc::ptr_eq(x, &futex))
130             .for_each(drop);
131     }
132 }
133 
134 #[derive(Debug)]
135 pub struct FutexObj {
136     pcb: Weak<ProcessControlBlock>,
137     key: FutexKey,
138     bitset: u32,
139     // TODO: 优先级继承
140 }
141 
142 pub enum FutexAccess {
143     FutexRead,
144     FutexWrite,
145 }
146 
147 #[allow(dead_code)]
148 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
149 /// ### 用于定位内核唯一的futex
150 pub enum InnerFutexKey {
151     Shared(SharedKey),
152     Private(PrivateKey),
153 }
154 
155 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
156 pub struct FutexKey {
157     ptr: u64,
158     word: u64,
159     offset: u32,
160     key: InnerFutexKey,
161 }
162 
163 /// 不同进程间通过文件共享futex变量,表明该变量在文件中的位置
164 #[derive(Hash, PartialEq, Eq, Clone, Debug)]
165 pub struct SharedKey {
166     i_seq: u64,
167     page_offset: u64,
168 }
169 
170 /// 同一进程的不同线程共享futex变量,表明该变量在进程地址空间中的位置
171 #[derive(Clone, Debug)]
172 pub struct PrivateKey {
173     // 所在的地址空间
174     address_space: Option<Weak<AddressSpace>>,
175     // 表示所在页面的初始地址
176     address: u64,
177 }
178 
179 impl Hash for PrivateKey {
180     fn hash<H: Hasher>(&self, state: &mut H) {
181         self.address.hash(state);
182     }
183 }
184 
185 impl Eq for PrivateKey {}
186 
187 impl PartialEq for PrivateKey {
188     fn eq(&self, other: &Self) -> bool {
189         if self.address_space.is_none() && other.address_space.is_none() {
190             return self.address == other.address;
191         } else {
192             return self
193                 .address_space
194                 .as_ref()
195                 .unwrap_or(&Weak::default())
196                 .ptr_eq(other.address_space.as_ref().unwrap_or(&Weak::default()))
197                 && self.address == other.address;
198         }
199     }
200 }
201 
202 impl Futex {
203     /// ### 初始化FUTEX_DATA
204     pub fn init() {
205         unsafe {
206             FUTEX_DATA = Some(FutexData {
207                 data: SpinLock::new(HashMap::new()),
208             })
209         };
210     }
211 
212     /// ### 让当前进程在指定futex上等待直到futex_wake显式唤醒
213     pub fn futex_wait(
214         uaddr: VirtAddr,
215         flags: FutexFlag,
216         val: u32,
217         abs_time: Option<TimeSpec>,
218         bitset: u32,
219     ) -> Result<usize, SystemError> {
220         if bitset == 0 {
221             return Err(SystemError::EINVAL);
222         }
223 
224         // 获取全局hash表的key值
225         let key = Self::get_futex_key(
226             uaddr,
227             flags.contains(FutexFlag::FLAGS_SHARED),
228             FutexAccess::FutexRead,
229         )?;
230 
231         let mut futex_map_guard = FutexData::futex_map();
232         let bucket = futex_map_guard.get_mut(&key);
233         let bucket_mut = match bucket {
234             Some(bucket) => bucket,
235             None => {
236                 let bucket = FutexHashBucket {
237                     chain: LinkedList::new(),
238                 };
239                 futex_map_guard.insert(key.clone(), bucket);
240                 futex_map_guard.get_mut(&key).unwrap()
241             }
242         };
243 
244         // 使用UserBuffer读取futex
245         let user_reader =
246             UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
247 
248         // 从用户空间读取到futex的val
249         let mut uval = 0;
250 
251         // 读取
252         // 这里只尝试一种方式去读取用户空间,与linux不太一致
253         // 对于linux,如果bucket被锁住时读取失败,将会将bucket解锁后重新读取
254         user_reader.copy_one_from_user::<u32>(&mut uval, 0)?;
255 
256         // 不满足wait条件,返回错误
257         if uval != val {
258             return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
259         }
260 
261         let pcb = ProcessManager::current_pcb();
262         // 创建超时计时器任务
263         let mut timer = None;
264         if let Some(time) = abs_time {
265             let wakeup_helper = WakeUpHelper::new(pcb.clone());
266 
267             let sec = time.tv_sec;
268             let nsec = time.tv_nsec;
269             let jiffies = next_n_us_timer_jiffies((nsec / 1000 + sec * 1_000_000) as u64);
270 
271             let wake_up = Timer::new(wakeup_helper, jiffies);
272 
273             wake_up.activate();
274             timer = Some(wake_up);
275         }
276 
277         let futex_q = Arc::new(FutexObj {
278             pcb: Arc::downgrade(&pcb),
279             key: key.clone(),
280             bitset,
281         });
282         let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
283         // 满足条件则将当前进程在该bucket上挂起
284         bucket_mut.sleep_no_sched(futex_q.clone()).map_err(|e| {
285             kwarn!("error:{e:?}");
286             e
287         })?;
288         drop(futex_map_guard);
289         drop(irq_guard);
290         sched();
291 
292         // 被唤醒后的检查
293         let mut futex_map_guard = FutexData::futex_map();
294         let bucket = futex_map_guard.get_mut(&key);
295         let bucket_mut = match bucket {
296             // 如果该pcb不在链表里面了或者该链表已经被释放,就证明是正常的Wake操作
297             Some(bucket_mut) => {
298                 if !bucket_mut.contains(&futex_q) {
299                     // 取消定时器任务
300                     if let Some(timer) = timer {
301                         timer.cancel();
302                     }
303                     return Ok(0);
304                 }
305                 // 非正常唤醒,返回交给下层
306                 bucket_mut
307             }
308             None => {
309                 // 取消定时器任务
310                 if let Some(timer) = timer {
311                     timer.cancel();
312                 }
313                 return Ok(0);
314             }
315         };
316 
317         // 如果是超时唤醒,则返回错误
318         if timer.is_some() && timer.clone().unwrap().timeout() {
319             bucket_mut.remove(futex_q);
320 
321             return Err(SystemError::ETIMEDOUT);
322         }
323 
324         // TODO: 如果没有挂起的信号,则重新判断是否满足wait要求,重新进入wait
325 
326         // 经过前面的几个判断,到这里之后,
327         // 当前进程被唤醒大概率是其他进程更改了uval,需要重新去判断当前进程是否满足wait
328 
329         // 到这里之后,前面的唤醒条件都不满足,则是被信号唤醒
330         // 需要处理信号然后重启futex系统调用
331 
332         // 取消定时器任务
333         if let Some(timer) = timer {
334             if !timer.timeout() {
335                 timer.cancel();
336             }
337         }
338 
339         Ok(0)
340     }
341 
342     // ### 唤醒指定futex上挂起的最多nr_wake个进程
343     pub fn futex_wake(
344         uaddr: VirtAddr,
345         flags: FutexFlag,
346         nr_wake: u32,
347         bitset: u32,
348     ) -> Result<usize, SystemError> {
349         if bitset == 0 {
350             return Err(SystemError::EINVAL);
351         }
352 
353         // 获取futex_key,并且判断地址空间合法性
354         let key = Self::get_futex_key(
355             uaddr,
356             flags.contains(FutexFlag::FLAGS_SHARED),
357             FutexAccess::FutexRead,
358         )?;
359         let mut binding = FutexData::futex_map();
360         let bucket_mut = binding.get_mut(&key).ok_or(SystemError::EINVAL)?;
361 
362         // 确保后面的唤醒操作是有意义的
363         if bucket_mut.chain.is_empty() {
364             return Ok(0);
365         }
366         // 从队列中唤醒
367         let count = bucket_mut.wake_up(key.clone(), Some(bitset), nr_wake)?;
368 
369         drop(binding);
370 
371         FutexData::try_remove(&key);
372 
373         Ok(count)
374     }
375 
376     /// ### 唤醒制定uaddr1上的最多nr_wake个进程,然后将uaddr1最多nr_requeue个进程移动到uaddr2绑定的futex上
377     pub fn futex_requeue(
378         uaddr1: VirtAddr,
379         flags: FutexFlag,
380         uaddr2: VirtAddr,
381         nr_wake: i32,
382         nr_requeue: i32,
383         cmpval: Option<u32>,
384         requeue_pi: bool,
385     ) -> Result<usize, SystemError> {
386         if nr_requeue < 0 || nr_wake < 0 {
387             return Err(SystemError::EINVAL);
388         }
389 
390         // 暂时不支持优先级继承
391         if requeue_pi {
392             return Err(SystemError::ENOSYS);
393         }
394 
395         let key1 = Self::get_futex_key(
396             uaddr1,
397             flags.contains(FutexFlag::FLAGS_SHARED),
398             FutexAccess::FutexRead,
399         )?;
400         let key2 = Self::get_futex_key(uaddr2, flags.contains(FutexFlag::FLAGS_SHARED), {
401             match requeue_pi {
402                 true => FutexAccess::FutexWrite,
403                 false => FutexAccess::FutexRead,
404             }
405         })?;
406 
407         if requeue_pi && key1 == key2 {
408             return Err(SystemError::EINVAL);
409         }
410 
411         if likely(cmpval.is_some()) {
412             let uval_reader =
413                 UserBufferReader::new(uaddr1.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
414             let curval = uval_reader.read_one_from_user::<u32>(0)?;
415 
416             // 判断是否满足条件
417             if *curval != cmpval.unwrap() {
418                 return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
419             }
420         }
421 
422         let mut futex_data_guard = FutexData::futex_map();
423         if !requeue_pi {
424             // 唤醒nr_wake个进程
425             let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
426             let ret = bucket_1_mut.wake_up(key1.clone(), None, nr_wake as u32)?;
427             // 将bucket1中最多nr_requeue个任务转移到bucket2
428             for _ in 0..nr_requeue {
429                 let bucket_1_mut = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
430                 let futex_q = bucket_1_mut.chain.pop_front();
431                 match futex_q {
432                     Some(futex_q) => {
433                         let bucket_2_mut =
434                             futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?;
435                         bucket_2_mut.chain.push_back(futex_q);
436                     }
437                     None => {
438                         break;
439                     }
440                 }
441             }
442 
443             return Ok(ret);
444         } else {
445             // 暂时不支持优先级继承
446             todo!()
447         }
448     }
449 
450     /// ### 唤醒futex上的进程的同时进行一些操作
451     pub fn futex_wake_op(
452         uaddr1: VirtAddr,
453         flags: FutexFlag,
454         uaddr2: VirtAddr,
455         nr_wake: i32,
456         nr_wake2: i32,
457         op: i32,
458     ) -> Result<usize, SystemError> {
459         let key1 = Futex::get_futex_key(
460             uaddr1,
461             flags.contains(FutexFlag::FLAGS_SHARED),
462             FutexAccess::FutexRead,
463         )?;
464         let key2 = Futex::get_futex_key(
465             uaddr2,
466             flags.contains(FutexFlag::FLAGS_SHARED),
467             FutexAccess::FutexWrite,
468         )?;
469 
470         let mut futex_data_guard = FutexData::futex_map();
471         let bucket1 = futex_data_guard.get_mut(&key1).ok_or(SystemError::EINVAL)?;
472         let mut wake_count = 0;
473 
474         // 唤醒uaddr1中的进程
475         wake_count += bucket1.wake_up(key1, None, nr_wake as u32)?;
476 
477         match Self::futex_atomic_op_inuser(op as u32, uaddr2) {
478             Ok(ret) => {
479                 // 操作成功则唤醒uaddr2中的进程
480                 if ret {
481                     let bucket2 = futex_data_guard.get_mut(&key2).ok_or(SystemError::EINVAL)?;
482                     wake_count += bucket2.wake_up(key2, None, nr_wake2 as u32)?;
483                 }
484             }
485             Err(e) => {
486                 // TODO:retry?
487                 return Err(e);
488             }
489         }
490 
491         Ok(wake_count)
492     }
493 
494     fn get_futex_key(
495         uaddr: VirtAddr,
496         fshared: bool,
497         _access: FutexAccess,
498     ) -> Result<FutexKey, SystemError> {
499         let mut address = uaddr.data();
500 
501         // 计算相对页的偏移量
502         let offset = address & (MMArch::PAGE_SIZE - 1);
503         // 判断内存对齐
504         if uaddr.data() & (core::mem::size_of::<u32>() - 1) != 0 {
505             return Err(SystemError::EINVAL);
506         }
507 
508         // 目前address指向所在页面的起始地址
509         address -= offset;
510 
511         // 若不是进程间共享的futex,则返回Private
512         if !fshared {
513             return Ok(FutexKey {
514                 ptr: 0,
515                 word: 0,
516                 offset: offset as u32,
517                 key: InnerFutexKey::Private(PrivateKey {
518                     address: address as u64,
519                     address_space: None,
520                 }),
521             });
522         }
523 
524         // 获取到地址所在地址空间
525         let address_space = AddressSpace::current()?;
526         // TODO: 判断是否为匿名映射,是匿名映射才返回PrivateKey
527         return Ok(FutexKey {
528             ptr: 0,
529             word: 0,
530             offset: offset as u32,
531             key: InnerFutexKey::Private(PrivateKey {
532                 address: address as u64,
533                 address_space: Some(Arc::downgrade(&address_space)),
534             }),
535         });
536 
537         // 未实现共享内存机制,贡献内存部分应该通过inode构建SharedKey
538         // todo!("Shared memory not implemented");
539     }
540 
541     pub fn futex_atomic_op_inuser(encoded_op: u32, uaddr: VirtAddr) -> Result<bool, SystemError> {
542         let op = FutexOP::from_bits((encoded_op & 0x70000000) >> 28).ok_or(SystemError::ENOSYS)?;
543         let cmp =
544             FutexOpCMP::from_bits((encoded_op & 0x0f000000) >> 24).ok_or(SystemError::ENOSYS)?;
545 
546         let sign_extend32 = |value: u32, index: i32| {
547             let shift = (31 - index) as u8;
548             return (value << shift) >> shift;
549         };
550 
551         let mut oparg = sign_extend32((encoded_op & 0x00fff000) >> 12, 11);
552         let cmparg = sign_extend32(encoded_op & 0x00000fff, 11);
553 
554         if (encoded_op & (FutexOP::FUTEX_OP_OPARG_SHIFT.bits() << 28) != 0) && oparg > 31 {
555             kwarn!(
556                 "futex_wake_op: pid:{} tries to shift op by {}; fix this program",
557                 ProcessManager::current_pcb().pid().data(),
558                 oparg
559             );
560 
561             oparg &= 31;
562         }
563 
564         // TODO: 这个汇编似乎是有问题的,目前不好测试
565         let old_val = Self::arch_futex_atomic_op_inuser(op, oparg, uaddr)?;
566 
567         match cmp {
568             FutexOpCMP::FUTEX_OP_CMP_EQ => {
569                 return Ok(cmparg == old_val);
570             }
571             FutexOpCMP::FUTEX_OP_CMP_NE => {
572                 return Ok(cmparg != old_val);
573             }
574             FutexOpCMP::FUTEX_OP_CMP_LT => {
575                 return Ok(cmparg < old_val);
576             }
577             FutexOpCMP::FUTEX_OP_CMP_LE => {
578                 return Ok(cmparg <= old_val);
579             }
580             FutexOpCMP::FUTEX_OP_CMP_GE => {
581                 return Ok(cmparg >= old_val);
582             }
583             FutexOpCMP::FUTEX_OP_CMP_GT => {
584                 return Ok(cmparg > old_val);
585             }
586             _ => {
587                 return Err(SystemError::ENOSYS);
588             }
589         }
590     }
591 
592     /// ### 对futex进行操作
593     ///
594     /// 进入该方法会关闭中断保证修改的原子性,所以进入该方法前应确保中断锁已释放
595     ///
596     /// ### return uaddr原来的值
597     #[allow(unused_assignments)]
598     pub fn arch_futex_atomic_op_inuser(
599         op: FutexOP,
600         oparg: u32,
601         uaddr: VirtAddr,
602     ) -> Result<u32, SystemError> {
603         let guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
604 
605         let reader =
606             UserBufferReader::new(uaddr.as_ptr::<u32>(), core::mem::size_of::<u32>(), true)?;
607 
608         let oldval = reader.read_one_from_user::<u32>(0)?;
609 
610         let atomic_addr = AtomicU64::new(uaddr.data() as u64);
611         // 这个指针是指向指针的指针
612         let ptr = atomic_addr.as_ptr();
613         match op {
614             FutexOP::FUTEX_OP_SET => unsafe {
615                 *((*ptr) as *mut u32) = oparg;
616             },
617             FutexOP::FUTEX_OP_ADD => unsafe {
618                 *((*ptr) as *mut u32) += oparg;
619             },
620             FutexOP::FUTEX_OP_OR => unsafe {
621                 *((*ptr) as *mut u32) |= oparg;
622             },
623             FutexOP::FUTEX_OP_ANDN => unsafe {
624                 *((*ptr) as *mut u32) &= oparg;
625             },
626             FutexOP::FUTEX_OP_XOR => unsafe {
627                 *((*ptr) as *mut u32) ^= oparg;
628             },
629             _ => return Err(SystemError::ENOSYS),
630         }
631 
632         drop(guard);
633 
634         Ok(*oldval)
635     }
636 }
637