rust

rust使用tokio快速统计百万级别文件个数

main.rs


use std::{env, sync::{atomic::{AtomicU32, Ordering}, Arc}, time::SystemTime};

use serde_json::json;
use tokio::runtime::Builder;

pub mod tokio_file;


///获取时间戳 毫秒
pub fn get_sys_timestamp_millis() -> u128 {
    let now = SystemTime::now();
    now.duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_millis()
}

fn cre_runtime() -> tokio::runtime::Runtime {
    // 创建一个 Tokio 运行时,配置线程池的大小
    Builder::new_multi_thread()
        //.worker_threads(4) // 设置工作线程的数量
        .max_blocking_threads(16) // 设置阻塞任务的最大线程数
        .enable_all()
        .build()
        .unwrap()
}

fn main() {
     // 获取命令行参数,返回一个迭代器
     let args: Vec<String> = env::args().collect();
     if args.len() !=2 {
        println!("请输入统计的文件夹路径");
        return;
     }
     // 第一个参数是程序的名称
     let dir = args.get(1).unwrap().to_string();
 
     // 其余参数是用户输入的参数
    let runtime = cre_runtime();
    let s = get_sys_timestamp_millis();
    let counter = Arc::new(AtomicU32::new(0_u32));
    runtime.block_on(async {
        tokio_file::count_sub_file(dir, counter.clone()).await.unwrap();
    });
    let e = get_sys_timestamp_millis();
    let current_value = counter.load(Ordering::SeqCst);
    let  data = json!({
        "count": current_value,
        "time": (e-s)
    });
    println!("{}", data);
}

count_sub_file函数:


/// @Author: DengLibin
/// @Date: Create in 2025-01-15 10:10:26
/// @Description: 统计文件夹下文件(夹)数量
pub async fn count_sub_file(
    dir: String,
    counter: Arc<AtomicU32>,
) -> Result<(), Box<dyn std::error::Error>> {
    let semaphore = Arc::new(Semaphore::new(400)); // 限制并发数
                                                   //无界多生产者单消费者通道
    let (tx, mut rx) = mpsc::unbounded_channel();

    // 启动初始任务
    tx.send(dir)?;

    let active_tasks = Arc::new(AtomicUsize::new(0)); // 活跃任务计数器

    // 消费队列
    while let Some(path) = rx.recv().await {
        if path.is_empty() {
            //收到空表示结束
            break;
        }
        let tx_clone = tx.clone();
        let semaphore_clone = semaphore.clone();
        let counter_clone = Arc::clone(&counter);
        let active_tasks_clone = Arc::clone(&active_tasks);
        //获取执行权限
        let permit = semaphore_clone.acquire_owned().await?;
        task::spawn(async move {
            // 遍历目录
            if let Ok(mut entries) = read_dir(&path).await {
                while let Ok(Some(entry)) = entries.next_entry().await {
                    if let Ok(file_type) = entry.file_type().await {
                        // 计数
                        counter_clone.fetch_add(1, Ordering::SeqCst);

                        // 如果是目录,加入队列
                        if file_type.is_dir() {
                            let sub_dir = entry.path().to_string_lossy().into_owned();
                            let r = tx_clone.send(sub_dir.clone());
                            if let Err(e) = r {
                                println!("发送文件夹失败:{}, {}", sub_dir, e);
                            } else {
                                //发送了一个新任务
                                active_tasks_clone.fetch_add(1, Ordering::SeqCst);
                            }
                        }
                    }
                }
            }
            //当前目录消费完成,最后一个了
            if 1 == active_tasks_clone.fetch_sub(1, Ordering::SeqCst) {
                let _ = tx_clone.send("".into()); //发送一个空
            }

            drop(permit); // 释放信号量
        });
    }
    Ok(())
}

关于作者

程序员,软件工程师,java, golang, rust, c, python,vue, Springboot, mybatis, mysql,elasticsearch, docker, maven, gcc, linux, ubuntu, centos, axum,llm, paddlepaddle, onlyoffice,minio,银河麒麟,中科方德,rpm