1 use std::{
2     fs::File,
3     mem::size_of,
4     os::unix::prelude::FileExt,
5     path::PathBuf,
6     sync::{atomic::AtomicBool, mpsc, Arc, Mutex, Weak},
7     thread::JoinHandle,
8 };
9 
10 use klog_types::{AllocatorLog, MMLogChannel};
11 use log::info;
12 
13 use crate::backend::{
14     loader::Symbol,
15     monitor::{logset::LogSet, ObjectWrapper},
16     BackendData,
17 };
18 
19 #[derive(Debug)]
20 pub struct MMLogMonitor {
21     channel_symbol: Option<Symbol>,
22     shared_data: Arc<Mutex<BackendData>>,
23     /// All threads spawned by the mm log monitor.
24     threads: Mutex<Vec<JoinHandle<()>>>,
25     stop_child_threads: AtomicBool,
26     self_ref: Weak<Self>,
27 
28     mm_log_receiver: Mutex<mpsc::Receiver<MMLogWorkerResult>>,
29     mm_log_sender: mpsc::Sender<MMLogWorkerResult>,
30 }
31 
32 impl MMLogMonitor {
new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self>33     pub(crate) fn new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self> {
34         let guard = shared_data.lock().unwrap();
35         let mm_log_buffer_symbol: Option<Symbol> = guard
36             .kernel_metadata
37             .as_ref()
38             .map(|km| {
39                 km.sym_collection()
40                     .find_by_name("__MM_ALLOCATOR_LOG_CHANNEL")
41                     .map(|s| s.clone())
42             })
43             .flatten();
44         drop(guard);
45 
46         info!("mm_log_buffer_symbol: {:?}", mm_log_buffer_symbol);
47 
48         let mm_log_worker_mpsc: (
49             mpsc::Sender<MMLogWorkerResult>,
50             mpsc::Receiver<MMLogWorkerResult>,
51         ) = mpsc::channel::<MMLogWorkerResult>();
52 
53         let r = Self {
54             channel_symbol: mm_log_buffer_symbol,
55             shared_data,
56             threads: Mutex::new(Vec::new()),
57             stop_child_threads: AtomicBool::new(false),
58             self_ref: Weak::new(),
59             mm_log_receiver: Mutex::new(mm_log_worker_mpsc.1),
60             mm_log_sender: mm_log_worker_mpsc.0,
61         };
62 
63         let r = Arc::new(r);
64         unsafe {
65             let self_ref = Arc::downgrade(&r);
66             let r_ptr = r.as_ref() as *const Self as *mut Self;
67             (*r_ptr).self_ref = self_ref;
68         }
69 
70         return r;
71     }
72 
run(&self)73     pub fn run(&self) {
74         info!("MMLogMonitor::run()");
75 
76         self.create_threads();
77 
78         let mut logs_set =
79             LogSet::<usize, ObjectWrapper<AllocatorLog>>::new("mm_allocator_log".to_string(), None);
80 
81         self.handle_logs(&mut logs_set);
82         // std::thread::sleep(std::time::Duration::from_micros(50));
83     }
84 
handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>)85     fn handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>) {
86         let mut last_cnt = 0;
87         let mut last_time = std::time::Instant::now();
88         let mm_log_receiver = self.mm_log_receiver.lock().unwrap();
89         loop {
90             let logs = mm_log_receiver.recv();
91             if logs.is_err() {
92                 return;
93             }
94 
95             let logs = logs.unwrap();
96 
97             for log in logs.logs {
98                 logs_set.insert(log.id as usize, log);
99             }
100 
101             let x = logs_set.len();
102             // info!("logs_set.len(): {}", x);
103             let current_time = std::time::Instant::now();
104             if current_time.duration_since(last_time).as_secs() >= 1 {
105                 info!("memory log rate: {} logs/s", x - last_cnt);
106                 last_cnt = x;
107                 last_time = current_time;
108             }
109         }
110     }
111 
112     // fn show_speed(&self, )
113 
create_threads(&self)114     fn create_threads(&self) {
115         let km = self
116             .shared_data
117             .lock()
118             .unwrap()
119             .kmem_path
120             .clone()
121             .expect("DragonOS memory map file not specified.");
122         let monitor_weak = self.self_ref.clone();
123 
124         let handle = std::thread::spawn(move || {
125             let mut monitor_thread = MMMonitorThread::new(monitor_weak, PathBuf::from(km));
126             monitor_thread.run();
127         });
128 
129         self.threads.lock().unwrap().push(handle);
130     }
131 }
132 
133 #[derive(Debug)]
134 struct MMMonitorThread {
135     mm_log_monitor: Weak<MMLogMonitor>,
136     kmem_path: PathBuf,
137 }
138 
139 impl MMMonitorThread {
140     /// Constructs a new instance of [`MMMonitorThread`].
141     ///
142     /// ## Parameters
143     ///
144     /// - `mm_log_monitor`: The [`MMLogMonitor`] instance.
145     /// - `kmem_path`: The path to the kernel memory file.
new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self146     pub fn new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self {
147         Self {
148             mm_log_monitor,
149             kmem_path,
150         }
151     }
152 
run(&mut self)153     pub fn run(&mut self) {
154         info!("MMMonitorThread::run(): kmem_path: {:?}", self.kmem_path);
155 
156         let mut kmem_file = {
157             let mut file: File;
158             loop {
159                 let f = self.open_kmem_file();
160                 if f.is_ok() {
161                     file = f.unwrap();
162                     break;
163                 } else {
164                     log::error!("Failed to open kmem file, error: {:?}", f.unwrap_err());
165                     std::thread::sleep(std::time::Duration::from_secs(1));
166                 }
167             }
168             file
169         };
170 
171         info!("Channel header loaded!");
172 
173         let channel_header: ObjectWrapper<MMLogChannel<1>> = self.load_header(&mut kmem_file);
174 
175         // 循环读取
176 
177         self.process_logs(&mut kmem_file, &channel_header);
178     }
179 
180     /// 处理内核内存分配日志
process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>)181     fn process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>) {
182         let cap = channel_header.capacity;
183         let mut buf = vec![0u8; (cap * channel_header.slot_size as u64) as usize];
184         let symbol = self
185             .mm_log_channel_symbol()
186             .expect("Failed to get memory log channel symbol.");
187 
188         let sym_offset = symbol.memory_offset();
189 
190         let slots_offset = channel_header.slots_offset + sym_offset;
191         let sender = self.mm_log_monitor.upgrade().unwrap().mm_log_sender.clone();
192         loop {
193             if self.should_stop() {
194                 break;
195             }
196 
197             let r = kmem_file
198                 .read_at(&mut buf, slots_offset)
199                 .expect("Failed to read kmem file.");
200             assert!(r == buf.len());
201 
202             let mut logs = Vec::new();
203 
204             for chunck in buf.chunks(channel_header.slot_size as usize) {
205                 let log_item = {
206                     let log: Option<ObjectWrapper<AllocatorLog>> =
207                         ObjectWrapper::new(&chunck[0..channel_header.element_size as usize]);
208                     let log: ObjectWrapper<AllocatorLog> = log.unwrap();
209 
210                     if log.is_valid() {
211                         Some(log)
212                     } else {
213                         None
214                     }
215                 };
216                 if let Some(log_item) = log_item {
217                     logs.push(log_item);
218                 }
219             }
220             // 收集所有校验和正确的日志
221             // info!("valid_cnt: {}, invalid_cnt: {}", valid_cnt, invalid_cnt);
222             // info!("to send {} logs", logs.len());
223             if !logs.is_empty() {
224                 sender.send(MMLogWorkerResult::new(logs)).unwrap();
225             }
226         }
227     }
228 
open_kmem_file(&self) -> std::io::Result<std::fs::File>229     fn open_kmem_file(&self) -> std::io::Result<std::fs::File> {
230         std::fs::OpenOptions::new().read(true).open(&self.kmem_path)
231     }
232 
load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>>233     fn load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>> {
234         let mut buf = [0u8; size_of::<MMLogChannel<1>>()];
235         let symbol = self
236             .mm_log_channel_symbol()
237             .expect("Failed to get memory log channel symbol.");
238 
239         let sym_offset = symbol.memory_offset();
240 
241         let channel_header: Option<ObjectWrapper<MMLogChannel<1>>>;
242 
243         loop {
244             let _r = kmem_file.read_at(&mut buf, sym_offset);
245 
246             let header: ObjectWrapper<MMLogChannel<1>> =
247                 ObjectWrapper::new(&buf).expect("Failed to parse MMLogChannel header.");
248             if header.magic == MMLogChannel::<1>::MM_LOG_CHANNEL_MAGIC {
249                 info!("channel_header: {:?}", header);
250                 channel_header = Some(header);
251                 break;
252             } else {
253                 info!("MM Log Channel not found... Maybe the kernel not started? Or the kernel version is not supported?");
254             }
255 
256             std::thread::sleep(std::time::Duration::from_secs(1));
257         }
258 
259         return channel_header.unwrap();
260     }
261 
262     /// Get the symbol of the memory log channel.
mm_log_channel_symbol(&self) -> Option<Symbol>263     fn mm_log_channel_symbol(&self) -> Option<Symbol> {
264         self.mm_log_monitor
265             .upgrade()
266             .unwrap()
267             .channel_symbol
268             .clone()
269     }
270 
271     /// Check if the monitor worker thread should stop.
should_stop(&self) -> bool272     fn should_stop(&self) -> bool {
273         self.mm_log_monitor
274             .upgrade()
275             .map(|mm_log_monitor| {
276                 mm_log_monitor
277                     .stop_child_threads
278                     .load(std::sync::atomic::Ordering::Relaxed)
279             })
280             .unwrap_or(true)
281     }
282 }
283 
284 /// 内存日志监视器工作线程处理的结果
285 #[derive(Debug)]
286 struct MMLogWorkerResult {
287     logs: Vec<ObjectWrapper<AllocatorLog>>,
288 }
289 
290 impl MMLogWorkerResult {
291     /// 创建一个新的内存日志监视器工作线程处理的结果
292     ///
293     /// ## 参数
294     ///
295     /// - `logs`:处理的日志
new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self296     pub fn new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self {
297         Self { logs }
298     }
299 }
300