1 use std::{ 2 collections::BTreeMap, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 }; 8 9 use log::{debug, error, info, warn}; 10 11 use crate::{ 12 context::{Action, DadkUserExecuteContext}, 13 executor::cache::CacheDir, 14 parser::{ 15 task::{CodeSource, PrebuiltSource, TaskType}, 16 task_log::{BuildStatus, InstallStatus, TaskLog}, 17 }, 18 scheduler::{SchedEntities, SchedEntity}, 19 utils::{file::FileUtils, path::abs_path}, 20 }; 21 22 use dadk_config::{common::task::TaskEnv, user::UserCleanLevel}; 23 24 use self::cache::{CacheDirType, TaskDataDir}; 25 26 pub mod cache; 27 pub mod source; 28 #[cfg(test)] 29 mod tests; 30 31 lazy_static! { 32 // 全局环境变量的列表 33 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 34 } 35 36 #[derive(Debug, Clone)] 37 pub struct Executor { 38 entity: Arc<SchedEntity>, 39 action: Action, 40 local_envs: EnvMap, 41 /// 任务构建结果输出到的目录 42 build_dir: CacheDir, 43 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 44 source_dir: Option<CacheDir>, 45 /// 任务数据目录 46 task_data_dir: TaskDataDir, 47 /// DragonOS sysroot的路径 48 dragonos_sysroot: PathBuf, 49 } 50 51 impl Executor { 52 /// # 创建执行器 53 /// 54 /// 用于执行一个任务 55 /// 56 /// ## 参数 57 /// 58 /// * `entity` - 任务调度实体 59 /// 60 /// ## 返回值 61 /// 62 /// * `Ok(Executor)` - 创建成功 63 /// * `Err(ExecutorError)` - 创建失败 64 pub fn new( 65 entity: Arc<SchedEntity>, 66 action: Action, 67 dragonos_sysroot: PathBuf, 68 ) -> Result<Self, ExecutorError> { 69 let local_envs = EnvMap::new(); 70 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 71 let task_data_dir = TaskDataDir::new(entity.clone())?; 72 73 let source_dir = if CacheDir::need_source_cache(&entity) { 74 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 75 } else { 76 None 77 }; 78 79 let result: Executor = Self { 80 action, 81 entity, 82 local_envs, 83 build_dir, 84 source_dir, 85 task_data_dir, 86 dragonos_sysroot, 87 }; 88 89 return Ok(result); 90 } 91 92 /// # 执行任务 93 /// 94 /// 创建执行器后,调用此方法执行任务。 95 /// 该方法会执行以下步骤: 96 /// 97 /// 1. 创建工作线程 98 /// 2. 准备环境变量 99 /// 3. 拉取数据(可选) 100 /// 4. 执行构建 101 pub fn execute(&mut self) -> Result<(), ExecutorError> { 102 info!("Execute task: {}", self.entity.task().name_version()); 103 104 let r = self.do_execute(); 105 self.save_task_data(r.clone()); 106 info!("Task {} finished", self.entity.task().name_version()); 107 return r; 108 } 109 110 /// # 保存任务数据 111 fn save_task_data(&self, r: Result<(), ExecutorError>) { 112 let mut task_log = self.task_data_dir.task_log(); 113 match self.action { 114 Action::Build => { 115 if r.is_ok() { 116 task_log.set_build_status(BuildStatus::Success); 117 } else { 118 task_log.set_build_status(BuildStatus::Failed); 119 } 120 121 task_log.set_build_time_now(); 122 } 123 124 Action::Install => { 125 if r.is_ok() { 126 task_log.set_install_status(InstallStatus::Success); 127 } else { 128 task_log.set_install_status(InstallStatus::Failed); 129 } 130 } 131 132 Action::Clean(_) => { 133 task_log.clean_build_status(); 134 task_log.clean_install_status(); 135 } 136 } 137 138 self.task_data_dir 139 .save_task_log(&task_log) 140 .expect("Failed to save task log"); 141 } 142 143 fn do_execute(&mut self) -> Result<(), ExecutorError> { 144 // 准备本地环境变量 145 self.prepare_local_env()?; 146 147 match self.action { 148 Action::Build => { 149 // 构建任务 150 self.build()?; 151 } 152 Action::Install => { 153 // 把构建结果安装到DragonOS 154 self.install()?; 155 } 156 Action::Clean(_) => { 157 // 清理构建结果 158 let r = self.clean(); 159 if let Err(e) = r { 160 error!( 161 "Failed to clean task {}: {:?}", 162 self.entity.task().name_version(), 163 e 164 ); 165 } 166 } 167 } 168 169 return Ok(()); 170 } 171 172 /// # 执行build操作 173 fn build(&mut self) -> Result<(), ExecutorError> { 174 if let Some(status) = self.task_log().build_status() { 175 if *status == BuildStatus::Success && self.entity.task().build_once { 176 info!( 177 "Task {} has been built successfully, skip build.", 178 self.entity.task().name_version() 179 ); 180 return Ok(()); 181 } 182 } 183 184 // 确认源文件就绪 185 self.prepare_input()?; 186 187 let command: Option<Command> = self.create_command()?; 188 if let Some(cmd) = command { 189 self.run_command(cmd)?; 190 } 191 192 // 检查构建结果,如果为空,则抛出警告 193 if self.build_dir.is_empty()? { 194 warn!( 195 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?", 196 self.entity.task().name_version(), 197 ); 198 } 199 return Ok(()); 200 } 201 202 /// # 执行安装操作,把构建结果安装到DragonOS 203 fn install(&self) -> Result<(), ExecutorError> { 204 if let Some(status) = self.task_log().install_status() { 205 if *status == InstallStatus::Success && self.entity.task().install_once { 206 info!( 207 "Task {} has been installed successfully, skip install.", 208 self.entity.task().name_version() 209 ); 210 return Ok(()); 211 } 212 } 213 214 let binding = self.entity.task(); 215 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 216 // 如果没有指定安装路径,则不执行安装 217 if in_dragonos_path.is_none() { 218 return Ok(()); 219 } 220 info!("Installing task: {}", self.entity.task().name_version()); 221 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 222 223 debug!("in_dragonos_path: {}", in_dragonos_path); 224 // 去除开头的斜杠 225 { 226 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 227 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 228 } 229 // 拼接最终的安装路径 230 let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path)); 231 debug!("install_path: {:?}", install_path); 232 // 创建安装路径 233 std::fs::create_dir_all(&install_path).map_err(|e| { 234 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 235 })?; 236 237 // 拷贝构建结果到安装路径 238 let build_dir: PathBuf = self.build_dir.path.clone(); 239 FileUtils::copy_dir_all(&build_dir, &install_path) 240 .map_err(|e| ExecutorError::InstallError(e))?; 241 info!("Task {} installed.", self.entity.task().name_version()); 242 243 return Ok(()); 244 } 245 246 fn clean(&self) -> Result<(), ExecutorError> { 247 let level = if let Action::Clean(l) = self.action { 248 l 249 } else { 250 panic!( 251 "BUG: clean() called with non-clean action. executor details: {:?}", 252 self 253 ); 254 }; 255 info!( 256 "Cleaning task: {}, level={level:?}", 257 self.entity.task().name_version() 258 ); 259 260 let r: Result<(), ExecutorError> = match level { 261 UserCleanLevel::All => self.clean_all(), 262 UserCleanLevel::InSrc => self.clean_src(), 263 UserCleanLevel::Output => { 264 self.clean_target()?; 265 self.clean_cache() 266 } 267 }; 268 269 if let Err(e) = r { 270 error!( 271 "Failed to clean task: {}, error message: {:?}", 272 self.entity.task().name_version(), 273 e 274 ); 275 return Err(e); 276 } 277 278 return Ok(()); 279 } 280 281 fn clean_all(&self) -> Result<(), ExecutorError> { 282 // 在源文件目录执行清理 283 self.clean_src()?; 284 // 清理构建结果 285 self.clean_target()?; 286 // 清理缓存 287 self.clean_cache()?; 288 return Ok(()); 289 } 290 291 /// 在源文件目录执行清理 292 fn clean_src(&self) -> Result<(), ExecutorError> { 293 let cmd: Option<Command> = self.create_command()?; 294 if cmd.is_none() { 295 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 296 return Ok(()); 297 } 298 info!( 299 "{}: Cleaning in source directory: {:?}", 300 self.entity.task().name_version(), 301 self.src_work_dir() 302 ); 303 304 let cmd = cmd.unwrap(); 305 self.run_command(cmd)?; 306 return Ok(()); 307 } 308 309 /// 清理构建输出目录 310 fn clean_target(&self) -> Result<(), ExecutorError> { 311 info!( 312 "{}: Cleaning build target directory: {:?}", 313 self.entity.task().name_version(), 314 self.build_dir.path 315 ); 316 317 return self.build_dir.remove_self_recursive(); 318 } 319 320 /// 清理下载缓存 321 fn clean_cache(&self) -> Result<(), ExecutorError> { 322 let cache_dir = self.source_dir.as_ref(); 323 if cache_dir.is_none() { 324 // 如果没有缓存目录,则认为用户不需要清理缓存 325 return Ok(()); 326 } 327 info!( 328 "{}: Cleaning cache directory: {}", 329 self.entity.task().name_version(), 330 self.src_work_dir().display() 331 ); 332 return cache_dir.unwrap().remove_self_recursive(); 333 } 334 335 /// 获取源文件的工作目录 336 fn src_work_dir(&self) -> PathBuf { 337 if let Some(local_path) = self.entity.task().source_path() { 338 return local_path; 339 } 340 return self.source_dir.as_ref().unwrap().path.clone(); 341 } 342 343 fn task_log(&self) -> TaskLog { 344 return self.task_data_dir.task_log(); 345 } 346 347 /// 为任务创建命令 348 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 349 // 获取命令 350 let raw_cmd = match self.entity.task().task_type { 351 TaskType::BuildFromSource(_) => match self.action { 352 Action::Build => self.entity.task().build.build_command.clone(), 353 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 354 _ => unimplemented!( 355 "create_command: Action {:?} not supported yet.", 356 self.action 357 ), 358 }, 359 360 TaskType::InstallFromPrebuilt(_) => match self.action { 361 Action::Build => self.entity.task().build.build_command.clone(), 362 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 363 _ => unimplemented!( 364 "create_command: Action {:?} not supported yet.", 365 self.action 366 ), 367 }, 368 }; 369 370 if raw_cmd.is_none() { 371 return Ok(None); 372 } 373 374 let raw_cmd = raw_cmd.unwrap(); 375 376 let mut command = Command::new("bash"); 377 command.current_dir(self.src_work_dir()); 378 379 // 设置参数 380 command.arg("-c"); 381 command.arg(raw_cmd); 382 383 // 设置环境变量 384 let env_list = ENV_LIST.read().unwrap(); 385 for (key, value) in env_list.envs.iter() { 386 // if key.starts_with("DADK") { 387 // debug!("DADK env found: {}={}", key, value.value); 388 // } 389 command.env(key, value.value.clone()); 390 } 391 drop(env_list); 392 for (key, value) in self.local_envs.envs.iter() { 393 debug!("Local env found: {}={}", key, value.value); 394 command.env(key, value.value.clone()); 395 } 396 397 return Ok(Some(command)); 398 } 399 400 /// # 准备工作线程本地环境变量 401 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 402 let binding = self.entity.task(); 403 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 404 405 if let Some(task_envs) = task_envs { 406 for tv in task_envs.iter() { 407 self.local_envs 408 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 409 } 410 } 411 412 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 413 self.local_envs.add(EnvVar::new( 414 "DADK_CURRENT_BUILD_DIR".to_string(), 415 self.build_dir.path.to_str().unwrap().to_string(), 416 )); 417 418 return Ok(()); 419 } 420 421 fn prepare_input(&self) -> Result<(), ExecutorError> { 422 // 拉取源文件 423 let task = self.entity.task(); 424 match &task.task_type { 425 TaskType::BuildFromSource(cs) => { 426 if self.source_dir.is_none() { 427 return Ok(()); 428 } 429 let source_dir = self.source_dir.as_ref().unwrap(); 430 match cs { 431 CodeSource::Git(git) => { 432 git.prepare(source_dir) 433 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 434 } 435 // 本地源文件,不需要拉取 436 CodeSource::Local(_) => return Ok(()), 437 // 在线压缩包,需要下载 438 CodeSource::Archive(archive) => { 439 archive 440 .download_unzip(source_dir) 441 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 442 } 443 } 444 } 445 TaskType::InstallFromPrebuilt(pb) => { 446 match pb { 447 // 本地源文件,不需要拉取 448 PrebuiltSource::Local(local_source) => { 449 let local_path = local_source.path(); 450 let target_path = &self.build_dir.path; 451 FileUtils::copy_dir_all(&local_path, &target_path) 452 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 453 return Ok(()); 454 } 455 // 在线压缩包,需要下载 456 PrebuiltSource::Archive(archive) => { 457 archive 458 .download_unzip(&self.build_dir) 459 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 460 } 461 } 462 } 463 } 464 465 return Ok(()); 466 } 467 468 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 469 let mut child = command 470 .stdin(Stdio::inherit()) 471 .spawn() 472 .map_err(|e| ExecutorError::IoError(e.to_string()))?; 473 474 // 等待子进程结束 475 let r = child 476 .wait() 477 .map_err(|e| ExecutorError::IoError(e.to_string())); 478 debug!("Command finished: {:?}", r); 479 if r.is_ok() { 480 let r = r.unwrap(); 481 if r.success() { 482 return Ok(()); 483 } else { 484 // 执行失败,获取最后100行stderr输出 485 let errmsg = format!( 486 "Task {} failed, exit code = {}", 487 self.entity.task().name_version(), 488 r.code().unwrap() 489 ); 490 error!("{errmsg}"); 491 let command_opt = command.output(); 492 if command_opt.is_err() { 493 return Err(ExecutorError::TaskFailed( 494 "Failed to get command output".to_string(), 495 )); 496 } 497 let command_opt = command_opt.unwrap(); 498 let command_output = String::from_utf8_lossy(&command_opt.stderr); 499 let mut last_100_outputs = command_output 500 .lines() 501 .rev() 502 .take(100) 503 .collect::<Vec<&str>>(); 504 last_100_outputs.reverse(); 505 error!("Last 100 lines msg of stderr:"); 506 for line in last_100_outputs { 507 error!("{}", line); 508 } 509 return Err(ExecutorError::TaskFailed(errmsg)); 510 } 511 } else { 512 let errmsg = format!( 513 "Task {} failed, msg = {:?}", 514 self.entity.task().name_version(), 515 r.err().unwrap() 516 ); 517 error!("{errmsg}"); 518 return Err(ExecutorError::TaskFailed(errmsg)); 519 } 520 } 521 } 522 523 #[derive(Debug, Clone)] 524 pub struct EnvMap { 525 pub envs: BTreeMap<String, EnvVar>, 526 } 527 528 impl EnvMap { 529 pub fn new() -> Self { 530 Self { 531 envs: BTreeMap::new(), 532 } 533 } 534 535 pub fn add(&mut self, env: EnvVar) { 536 self.envs.insert(env.key.clone(), env); 537 } 538 539 #[allow(dead_code)] 540 pub fn get(&self, key: &str) -> Option<&EnvVar> { 541 self.envs.get(key) 542 } 543 544 pub fn add_vars(&mut self, vars: Vars) { 545 for (key, value) in vars { 546 self.add(EnvVar::new(key, value)); 547 } 548 } 549 } 550 551 /// # 环境变量 552 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 553 pub struct EnvVar { 554 pub key: String, 555 pub value: String, 556 } 557 558 impl EnvVar { 559 pub fn new(key: String, value: String) -> Self { 560 Self { key, value } 561 } 562 } 563 564 /// # 任务执行器错误枚举 565 #[allow(dead_code)] 566 #[derive(Debug, Clone)] 567 pub enum ExecutorError { 568 /// 准备执行环境错误 569 PrepareEnvError(String), 570 IoError(String), 571 /// 构建执行错误 572 TaskFailed(String), 573 /// 安装错误 574 InstallError(String), 575 /// 清理错误 576 CleanError(String), 577 } 578 579 /// # 准备全局环境变量 580 pub fn prepare_env( 581 sched_entities: &SchedEntities, 582 execute_ctx: &Arc<DadkUserExecuteContext>, 583 ) -> Result<(), ExecutorError> { 584 info!("Preparing environment variables..."); 585 let env_list = create_global_env_list(sched_entities, execute_ctx)?; 586 // 写入全局环境变量列表 587 let mut global_env_list = ENV_LIST.write().unwrap(); 588 *global_env_list = env_list; 589 return Ok(()); 590 } 591 592 /// # 创建全局环境变量列表 593 fn create_global_env_list( 594 sched_entities: &SchedEntities, 595 execute_ctx: &Arc<DadkUserExecuteContext>, 596 ) -> Result<EnvMap, ExecutorError> { 597 let mut env_list = EnvMap::new(); 598 let envs: Vars = std::env::vars(); 599 env_list.add_vars(envs); 600 601 // 为每个任务创建特定的环境变量 602 for entity in sched_entities.entities().iter() { 603 // 导出任务的构建目录环境变量 604 let build_dir = CacheDir::build_dir(entity.clone())?; 605 606 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 607 env_list.add(EnvVar::new( 608 build_dir_key, 609 build_dir.to_str().unwrap().to_string(), 610 )); 611 612 // 如果需要源码缓存目录,则导出 613 if CacheDir::need_source_cache(entity) { 614 let source_dir = CacheDir::source_dir(entity.clone())?; 615 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 616 env_list.add(EnvVar::new( 617 source_dir_key, 618 source_dir.to_str().unwrap().to_string(), 619 )); 620 } 621 } 622 623 // 创建ARCH环境变量 624 let target_arch = execute_ctx.target_arch(); 625 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into())); 626 627 return Ok(env_list); 628 } 629