xref: /DragonReach/src/executor/dep_graph/mod.rs (revision 909e4d102ffeb8646e7346c10b007cb0861226a4)
1 use std::sync::Arc;
2 use std::sync::Mutex;
3 use std::vec::Vec;
4 
5 use crate::manager::UnitManager;
6 use crate::{
7     error::runtime_error::{RuntimeError, RuntimeErrorType},
8     unit::Unit,
9 };
10 
11 #[allow(dead_code)]
12 pub struct DepGraphNode {
13     value: usize,
14     edges: Vec<usize>,
15     incoming_edges: Vec<usize>,
16 }
17 
18 #[allow(dead_code)]
19 pub struct DepGraph {
20     nodes: Vec<DepGraphNode>,
21     value: Vec<usize>,
22 }
23 
24 // 提供拓扑排序方法,在启动服务时确定先后顺序
25 #[allow(dead_code)]
26 impl DepGraph {
27     fn new() -> Self {
28         return DepGraph {
29             nodes: Vec::new(),
30             value: Vec::new(),
31         };
32     }
33 
34     pub fn add_node(&mut self, value: usize) -> usize {
35         let index = self.nodes.len();
36         //如果nodes中已经有了这个value则无需重复添加,直接返回nodes中的value对应的index
37         if let Some(idx) = self.value.iter().position(|x| *x == value) {
38             return idx;
39         }
40         //如果value在nodes中不存在,则添加value
41         self.nodes.push(DepGraphNode {
42             value: value,
43             edges: Vec::new(),
44             incoming_edges: Vec::new(),
45         });
46         self.value.push(value);
47         return index;
48     }
49     pub fn add_edge(&mut self, from: usize, to: usize) {
50         self.nodes[from].edges.push(to);
51         self.nodes[to].incoming_edges.push(from);
52     }
53     pub fn topological_sort(&mut self) -> Result<Vec<usize>, RuntimeError> {
54         let mut result = Vec::new();
55         let mut visited = Vec::new();
56         let mut stack = Vec::new();
57         for (i, node) in self.nodes.iter().enumerate() {
58             if node.incoming_edges.len() == 0 {
59                 stack.push(i);
60             }
61         }
62         while stack.len() > 0 {
63             let index = stack.pop().unwrap();
64             if visited.contains(&index) {
65                 continue;
66             }
67             visited.push(index);
68             result.push(self.nodes[index].value);
69             let len = self.nodes[index].edges.len();
70             for i in 0..len {
71                 let edge = self.nodes[index].edges[i];
72                 self.nodes[edge].incoming_edges.retain(|&x| x != index);
73                 if self.nodes[edge].incoming_edges.len() == 0 {
74                     stack.push(edge);
75                 }
76             }
77         }
78         if result.len() != self.nodes.len() {
79             return Err(RuntimeError::new(RuntimeErrorType::CircularDependency));
80         }
81         result.reverse();
82         return Ok(result);
83     }
84 
85     fn add_edges(&mut self, unit: usize, after: &[usize]) {
86         //因为service的依赖关系规模不会很大,故先使用递归实现
87         //TODO:改递归
88         for target in after {
89             let s = self.add_node(unit);
90             let t = self.add_node(*target);
91             self.add_edge(s, t);
92 
93             let arc_unit = UnitManager::get_unit_with_id(target).unwrap();
94             let unit = arc_unit.lock().unwrap();
95             let after = unit.unit_base().unit_part().after();
96 
97             self.add_edges(*target, after);
98         }
99     }
100 
101     pub fn construct_graph(unit: &Arc<Mutex<dyn Unit>>) -> DepGraph {
102         let mut graph: DepGraph = DepGraph::new();
103 
104         let unit = unit.lock().unwrap();
105         let uid = unit.unit_id();
106         graph.add_node(uid);
107         let after = (&unit).unit_base().unit_part().after();
108         //递归添加边来构建图
109         graph.add_edges(uid, after);
110         return graph;
111     }
112 }
113