xref: /DragonReach/src/executor/dep_graph/mod.rs (revision 17ae4661898f9f5bdb95f865ac453f4508b9e876)
1 #[cfg(target_os = "dragonos")]
2 use drstd as std;
3 
4 use core::slice::SlicePattern;
5 use std::sync::Arc;
6 use std::vec::Vec;
7 use std::{cmp::PartialEq, sync::Mutex};
8 
9 use crate::manager::{self, UnitManager};
10 use crate::{
11     error::runtime_error::{RuntimeError, RuntimeErrorType},
12     unit::Unit,
13 };
14 
15 pub struct DepGraphNode {
16     value: usize,
17     edges: Vec<usize>,
18     incoming_edges: Vec<usize>,
19 }
20 
21 pub struct DepGraph {
22     nodes: Vec<DepGraphNode>,
23     value: Vec<usize>,
24 }
25 
26 // 提供拓扑排序方法,在启动服务时确定先后顺序
27 impl DepGraph {
28     fn new() -> Self {
29         return DepGraph {
30             nodes: Vec::new(),
31             value: Vec::new(),
32         };
33     }
34 
35     pub fn add_node(&mut self, value: usize) -> usize {
36         let index = self.nodes.len();
37         //如果nodes中已经有了这个value则无需重复添加,直接返回nodes中的value对应的index
38         if let Some(idx) = self.value.iter().position(|x| *x == value) {
39             return idx;
40         }
41         //如果value在nodes中不存在,则添加value
42         self.nodes.push(DepGraphNode {
43             value: value,
44             edges: Vec::new(),
45             incoming_edges: Vec::new(),
46         });
47         self.value.push(value);
48         return index;
49     }
50     pub fn add_edge(&mut self, from: usize, to: usize) {
51         self.nodes[from].edges.push(to);
52         self.nodes[to].incoming_edges.push(from);
53     }
54     pub fn topological_sort(&mut self) -> Result<Vec<usize>, RuntimeError> {
55         let mut result = Vec::new();
56         let mut visited = Vec::new();
57         let mut stack = Vec::new();
58         for (i, node) in self.nodes.iter().enumerate() {
59             if node.incoming_edges.len() == 0 {
60                 stack.push(i);
61             }
62         }
63         while stack.len() > 0 {
64             let index = stack.pop().unwrap();
65             if visited.contains(&index) {
66                 continue;
67             }
68             visited.push(index);
69             result.push(self.nodes[index].value);
70             let len = self.nodes[index].edges.len();
71             for i in 0..len {
72                 let edge = self.nodes[index].edges[i];
73                 self.nodes[edge].incoming_edges.retain(|&x| x != index);
74                 if self.nodes[edge].incoming_edges.len() == 0 {
75                     stack.push(edge);
76                 }
77             }
78         }
79         if result.len() != self.nodes.len() {
80             return Err(RuntimeError::new(RuntimeErrorType::CircularDependency));
81         }
82         result.reverse();
83         return Ok(result);
84     }
85 
86     fn add_edges(&mut self, unit: usize, after: &[usize]) {
87         //因为service的依赖关系规模不会很大,故先使用递归实现
88         //TODO:改递归
89         for target in after {
90             let s = self.add_node(unit);
91             let t = self.add_node(*target);
92             self.add_edge(s, t);
93 
94             let arc_unit = UnitManager::get_unit_with_id(target).unwrap();
95             let unit = arc_unit.lock().unwrap();
96             let after = unit.unit_base().unit_part().after();
97 
98             self.add_edges(*target, after);
99         }
100     }
101 
102     pub fn construct_graph(unit: &Arc<Mutex<dyn Unit>>) -> DepGraph {
103         let mut graph: DepGraph = DepGraph::new();
104 
105         let unit = unit.lock().unwrap();
106         let uid = unit.unit_id();
107         graph.add_node(uid);
108         let after = (&unit).unit_base().unit_part().after();
109         //递归添加边来构建图
110         graph.add_edges(uid, after);
111         return graph;
112     }
113 }
114