xref: /DragonReach/src/executor/dep_graph/mod.rs (revision 2069cc0dc0984a2981454b00316ba607f88ac512)
1 use std::sync::{Arc, Mutex};
2 
3 use crate::manager::UnitManager;
4 use crate::{
5     error::runtime_error::{RuntimeError, RuntimeErrorType},
6     unit::Unit,
7 };
8 
9 #[allow(dead_code)]
10 pub struct DepGraphNode {
11     value: usize,
12     edges: Vec<usize>,
13     incoming_edges: Vec<usize>,
14 }
15 
16 #[allow(dead_code)]
17 pub struct DepGraph {
18     nodes: Vec<DepGraphNode>,
19     value: Vec<usize>,
20 }
21 
22 // 提供拓扑排序方法,在启动服务时确定先后顺序
23 #[allow(dead_code)]
24 impl DepGraph {
25     fn new() -> Self {
26         return DepGraph {
27             nodes: Vec::new(),
28             value: Vec::new(),
29         };
30     }
31 
32     pub fn add_node(&mut self, value: usize) -> usize {
33         let index = self.nodes.len();
34         //如果nodes中已经有了这个value则无需重复添加,直接返回nodes中的value对应的index
35         if let Some(idx) = self.value.iter().position(|x| *x == value) {
36             return idx;
37         }
38         //如果value在nodes中不存在,则添加value
39         self.nodes.push(DepGraphNode {
40             value: value,
41             edges: Vec::new(),
42             incoming_edges: Vec::new(),
43         });
44         self.value.push(value);
45         return index;
46     }
47     pub fn add_edge(&mut self, from: usize, to: usize) {
48         self.nodes[from].edges.push(to);
49         self.nodes[to].incoming_edges.push(from);
50     }
51     pub fn topological_sort(&mut self) -> Result<Vec<usize>, RuntimeError> {
52         let mut result = Vec::new();
53         let mut visited = Vec::new();
54         let mut stack = Vec::new();
55         for (i, node) in self.nodes.iter().enumerate() {
56             if node.incoming_edges.len() == 0 {
57                 stack.push(i);
58             }
59         }
60         while stack.len() > 0 {
61             let index = stack.pop().unwrap();
62             if visited.contains(&index) {
63                 continue;
64             }
65             visited.push(index);
66             result.push(self.nodes[index].value);
67             let len = self.nodes[index].edges.len();
68             for i in 0..len {
69                 let edge = self.nodes[index].edges[i];
70                 self.nodes[edge].incoming_edges.retain(|&x| x != index);
71                 if self.nodes[edge].incoming_edges.len() == 0 {
72                     stack.push(edge);
73                 }
74             }
75         }
76         if result.len() != self.nodes.len() {
77             return Err(RuntimeError::new(RuntimeErrorType::CircularDependency));
78         }
79         result.reverse();
80         return Ok(result);
81     }
82 
83     fn add_edges(&mut self, unit: usize, after: &[usize]) {
84         //因为service的依赖关系规模不会很大,故先使用递归实现
85         //TODO:改递归
86         for target in after {
87             let s = self.add_node(unit);
88             let t = self.add_node(*target);
89             self.add_edge(s, t);
90 
91             let arc_unit = UnitManager::get_unit_with_id(target).unwrap();
92             let unit = arc_unit.lock().unwrap();
93             let after = unit.unit_base().unit_part().after();
94 
95             self.add_edges(*target, after);
96         }
97     }
98 
99     pub fn construct_graph(unit: &Arc<Mutex<dyn Unit>>) -> DepGraph {
100         let mut graph: DepGraph = DepGraph::new();
101 
102         let unit = unit.lock().unwrap();
103         let uid = unit.unit_id();
104         graph.add_node(uid);
105         let after = (&unit).unit_base().unit_part().after();
106         //递归添加边来构建图
107         graph.add_edges(uid, after);
108         return graph;
109     }
110 }
111