rust

rust语言访问es接口

使用es官方提供的rust语言sdk访问elasticsearch

1.引入依赖

[dependencies]
elasticsearch = "7.17.7-alpha.1"
serde =  "1.0.190",
json ="0.12.4"
serde_json ="1.0.108"

2.api调用示例

//! @Author: DengLibin
//! @Date: Create in 2024-03-28 11:47:26
//! @Description:
//!
//!
use std::fmt::Display;

use elasticsearch::{
    http::transport::{SingleNodeConnectionPool, Transport, TransportBuilder},
    indices::{IndicesCreateParts, IndicesDeleteParts},
    params::Conflicts,
    Elasticsearch,
};

use json::{number::Number, object::Object, JsonValue};

use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

#[derive(Debug)]
pub struct EsError {
    pub msg: String,
}
impl std::fmt::Display for EsError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.msg)
    }
}

// 自定义结果
pub type EsResult<T> = std::result::Result<T, EsError>;

//转换结果
fn check_result<T, E: Display>(result: Result<T, E>) -> EsResult<T> {
    return match result {
        Ok(r) => Result::Ok(r),
        Err(err) => Result::Err(EsError {
            msg: err.to_string(),
        }),
    };
}

// es字段类型
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EsFieldType {
    pub type_name: String, //类型名 text, keyword, byte,boolean, short,integer,long,float, half_float, scaled_float,double,date,dense_vector(向量)
    pub store: bool,       //是否存储
    pub index: bool,       //是否索引
    pub dims: u16,         //向量类型 需要指定向量维度
}

//转json字符串
impl Into<JsonValue> for EsFieldType {
    fn into(self) -> JsonValue {
        let mut obj: Object = Object::new();
        //是否向量
        let is_vec = self.type_name.as_str() == "dense_vector";
        let is_text = self.type_name.as_str() == "text";
        obj.insert("type", JsonValue::String(self.type_name));
        obj.insert("store", JsonValue::Boolean(self.store));
        obj.insert("index", JsonValue::Boolean(self.index));
        if is_vec {
            //向量类型
            obj.insert("dims", JsonValue::Number(Number::from(self.dims))); //向量维度
        }
        if is_text {
            obj.insert("analyzer", JsonValue::String("html_text_analyzer".into())); //使用自定义的分词器 忽略html标签
            obj.insert("search_analyzer", JsonValue::String("ik_smart".into()));
        }

        json::JsonValue::Object(obj)
    }
}

//es字段
#[derive(Debug, Serialize, Deserialize)]
pub struct EsField {
    pub name: String,            //字段名
    pub filed_type: EsFieldType, //字段类型
}

//文档结构
#[derive(Debug, Serialize, Deserialize)]
pub struct EsDoc<D> {
    pub _index: String,
    pub _type: String,
    pub _id: String,
    pub _version: i16,
    pub _seq_no: i16,
    pub _primary_term: i16,
    pub found: bool,
    pub _source: D,
}

// 根据id查询多个返回数据结构
#[derive(Debug, Serialize, Deserialize)]
pub struct Docs<D> {
    pub docs: Vec<EsDoc<D>>,
}

//添加的文档实现这个trait,返回添加文档的id,返回None,es自动生成文档的id
pub trait EsId {
    fn id(&self) -> Option<String>;
}


//搜索search 结果json结构
#[derive(Deserialize,Serialize, Debug)]
pub struct SearchResult<D> {
    pub took: u32, //耗时 ms
    pub timed_out: bool, //是否超时
    pub hits: HitsObj<D>,

}
//命中对象结构
#[derive(Deserialize, Serialize,Debug)]
pub struct HitsObj<D> {
    pub total: TotalObj, //总数
    pub max_score: Option<f32>, //最大得分
    pub hits: Vec<HitObj<D>> //命中文档列表
}

#[derive(Deserialize, Serialize, Debug)]
pub struct TotalObj{
    pub value: u64,
    pub relation: Option<String>,
}

//命中文档对象
#[derive(Deserialize, Serialize, Debug)]
pub struct HitObj<D>{
    pub _index: String,
    pub _type: String,
    pub _id: String, //es id
    pub _score: f32, //得分
    pub _source: D, //文档对象
}






/// @Author: DengLibin
/// @Date: Create in 2024-03-28 11:49:53
/// @Description: 创建客户端 http://localhost:9200
pub async fn get_local_client() -> Elasticsearch {
    Elasticsearch::default()
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 12:02:51
/// @Description: 创建客户端
pub async fn get_client_by_url(url: &str) -> EsResult<Elasticsearch> {
    let transport = check_result(Transport::single_node(url))?;
    let client = Elasticsearch::new(transport);

    Ok(client)
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 12:05:08
/// @Description: 创建客户端
pub async fn get_client_by_pool(url: &str) -> EsResult<Elasticsearch> {
    let url = check_result(Url::parse(url))?;
    let conn_pool = SingleNodeConnectionPool::new(url);
    let transport = check_result(TransportBuilder::new(conn_pool).disable_proxy().build())?;
    let client = Elasticsearch::new(transport);
    Ok(client)
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 13:44:15
/// @Description: 建索引
pub async fn create_index(
    client: &Elasticsearch,
    index_name: &str,
    es_fields: &Vec<EsField>,
) -> EsResult<()> {
    //settings
    let settings: serde_json::Value = json!({
        "number_of_shards": 3,
        "number_of_replicas": 0,
        "analysis": {
            "analyzer": {
                "html_text_analyzer": {
                    "tokenizer": "ik_max_word",
                    "char_filter": [
                        "html_strip"
                    ]
                }
            }
        }
    });
    //mappings
    let mappings = create_mappings(es_fields);

    let mappings = check_result(serde_json::from_str::<Value>(&mappings))?;

    let body = json!({"settings": settings, "mappings": mappings});

    let r = client
        .indices()
        .create(IndicesCreateParts::Index(index_name))
        .body(body)
        .send()
        .await;
    let r = check_result(r)?;
    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;
    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 删除索引
pub async fn delete_index(client: &Elasticsearch, index_name: &str) -> EsResult<()> {
    let r = client
        .indices()
        .delete(IndicesDeleteParts::Index(&[index_name]))
        .send()
        .await;
    let r = check_result(r)?;
    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;
    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 清空
pub async fn clear(client: &Elasticsearch, index_name: &str) -> EsResult<()> {
    let index_names = [index_name];
    let delete_parts = elasticsearch::DeleteByQueryParts::Index(&index_names);

    //查询删除 匹所有
    let q_json = json!({"query":{"match_all": {}}});

    let r = client
        .delete_by_query(delete_parts)
        .conflicts(Conflicts::Proceed)
        .refresh(true)
        .body(q_json)
        .send()
        .await;
    let r = check_result(r)?;
    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;
    Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 是否存在
pub async fn exists(client: &Elasticsearch, index_name: &str) -> EsResult<bool> {
    let index_names = [index_name];
    let exists_parts = elasticsearch::indices::IndicesExistsParts::Index(&index_names);

    let r = client.indices().exists(exists_parts).send().await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    if let Ok(_) = r {
        Ok(true)
    } else {
        Ok(false)
    }
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 添加一篇文档
pub async fn insert_doc<T: Serialize>(
    client: &Elasticsearch,
    index_name: &str,
    id: &str,
    doc: T,
) -> EsResult<()> {
    // let body = json::stringify(doc);
    // let body_value = check_result(serde_json::from_str::<Value>(&body))?;

    let r = client
        .index(elasticsearch::IndexParts::IndexId(index_name, id))
        .refresh(elasticsearch::params::Refresh::True)
        .body(doc)
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;
    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 更新一篇文档
pub async fn update_by_id<T: Serialize>(
    client: &Elasticsearch,
    index_name: &str,
    id: &str,
    doc: T,
) -> EsResult<()> {
    // let body = json::stringify(doc);
    // let body_value = check_result(serde_json::from_str::<Value>(&body))?;

    let r = client
        .update(elasticsearch::UpdateParts::IndexId(index_name, id))
        .refresh(elasticsearch::params::Refresh::True)
        .body(json!({"doc": doc}))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;
    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 更新某个字段
pub async fn update_value_by_id(
    client: &Elasticsearch,
    index_name: &str,
    id: &str,
    field_name: &str, //字段名
    value: &str,      //字段值
) -> EsResult<()> {
    let r = client
        .update(elasticsearch::UpdateParts::IndexId(index_name, id))
        .refresh(elasticsearch::params::Refresh::True)
        .body(json!({"doc": {field_name: value}}))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;
    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 查询一篇文档
pub async fn get_source_by_id<DOC>(
    client: &Elasticsearch,
    index_name: &str,
    id: &str,
) -> EsResult<DOC>
where
    DOC: for<'de> serde::Deserialize<'de>,
{
    let r = client
        .get_source(elasticsearch::GetSourceParts::IndexId(index_name, id))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let r = check_result(r)?;
    let doc = check_result(r.json::<DOC>().await)?;
    Ok(doc)
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 查询一篇文档
pub async fn get_source_by_ids<DOC>(
    client: &Elasticsearch,
    index_name: &str,
    ids: &Vec<String>,
) -> EsResult<Vec<DOC>>
where
    DOC: for<'de> serde::Deserialize<'de>,
{
    let r = client
        .mget(elasticsearch::MgetParts::Index(index_name))
        .body(json!({"ids": ids}))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let r = check_result(r)?;

    let doc = check_result(r.json::<Docs<DOC>>().await)?;
    // println!("{}", r.text().await.unwrap());
    let mut vec = vec![];
    for doc in doc.docs {
        let source = doc._source;
        vec.push(source);
    }
    Ok(vec)
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 删除
pub async fn delete_by_id(client: &Elasticsearch, index_name: &str, id: &str) -> EsResult<()> {
    let r = client
        .delete(elasticsearch::DeleteParts::IndexId(index_name, id))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;

    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 删除
pub async fn delete_by_ids(
    client: &Elasticsearch,
    index_name: &str,
    ids: &Vec<String>,
) -> EsResult<()> {
    let bulk_parts = elasticsearch::BulkParts::Index(index_name);

    let delete_lines = ids
        .iter()
        .map(|id| json!({"delete": {"_id": id}}).to_string())
        .collect::<Vec<String>>();
    //bulk 批量操作 : https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html
    let r = client.bulk(bulk_parts).body(delete_lines).send().await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;

    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-29 09:05:48
/// @Description: 批量添加, es批量添加数据格式(两行为一组,第一行指定索引id(也可为空),第二行为实际的数据体。):
/// {"index":{"_id":"1"}}
///{"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL"}
///
pub async fn insert_docs<T: Serialize + EsId>(
    client: &Elasticsearch,
    index_name: &str,
    docs: &Vec<T>,
) -> EsResult<()> {
    let bulk_parts = elasticsearch::BulkParts::Index(index_name);
    let index_lines = docs
        .iter()
        .map(|doc| {
            let line1 = json!({"index": {"_id": doc.id()}}).to_string();
            let line2 = serde_json::to_string(doc).unwrap();
            format!("{}\n{}", line1, line2)
        })
        .collect::<Vec<String>>();

    let r = client.bulk(bulk_parts).body(index_lines).send().await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;

    Ok(())
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-29 10:33:11
/// @Description: 计数
pub async fn count(client: &Elasticsearch, index_name: &str) -> EsResult<u64> {
    let r = client
        .count(elasticsearch::CountParts::Index(&[index_name]))
        .send()
        .await;
    let r = check_result(r)?;

    let s = check_result(r.text().await)?;
    let j_v = check_result(json::parse(s.as_str()))?;
    let count = j_v["count"].as_u64();
    if let Some(c) = count {
        Ok(c)
    } else {
        Ok(0)
    }
    // let r = r.error_for_status_code();
    // let _ = check_result(r)?;
}

/// @Author: DengLibin
/// @Date: Create in 2024-03-29 10:43:19
/// @Description: 重建索引
/// https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-reindex.html
pub async fn re_index(
    client: &Elasticsearch,
    source_index: &str,
    dest_index: &str,
) -> EsResult<()> {
    let r = client
        .reindex()
        .body(json!({
                "source": {
                  "index": source_index
                },
                "dest": {
                  "index": dest_index
                }
        }))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;

    Ok(())
}


 /// @Author: DengLibin
 /// @Date: Create in 2024-03-29 10:50:05
 /// @Description: 根据某个字段删除
pub async fn delete_by_term_query(
    client: &Elasticsearch,
    index_name: &str,
    field_name: &str,
    field_value: &str,
) -> EsResult<()> {
    let r = client
        .delete_by_query(elasticsearch::DeleteByQueryParts::Index(&[index_name]))
        .body(json!({
            "query": {
                "term": {
                  field_name: field_value
                }
              }
        }))
        .send()
        .await;
    let r = check_result(r)?;

    // println!("{}", r.text().await.unwrap());
    let r = r.error_for_status_code();
    let _ = check_result(r)?;

    Ok(())
}


 /// @Author: DengLibin
 /// @Date: Create in 2024-03-29 10:50:05
 /// @Description: 查询
 pub async fn search_by_term_query<DOC>(
    client: &Elasticsearch,
    index_name: &str,
    field_name: &str,
    field_value: &str,
    from: i64,
    size: i64,
) -> EsResult<SearchResult<DOC>> 
where
    DOC: for<'de> serde::Deserialize<'de>,{
    let r = client
        .search(elasticsearch::SearchParts::Index(&[index_name]))
        .from(from)
        .size(size)
        // ._source(&["id", "title"]) //需要哪些字段
        ._source(&["true"]) //所有字段 
        // ._source(&["false"]) //不返回_source的任何字段 
        .sort(&["_score:desc"]) //排序 数字类型
        .body(json!({
            "query": {
                "term": {
                  field_name: field_value
                }
              }
        }))
        .send()
        .await;
    let r = check_result(r)?;
    let r = r.json::<SearchResult<DOC>>().await;
    // println!("{}", r.text().await.unwrap());
    // let r = r.error_for_status_code();
    // let _ = check_result(r)?;
    let r = check_result(r)?;

    Ok(r)
}



/// @Author: DengLibin
/// @Date: Create in 2024-03-28 13:50:06
/// @Description: 根据字段创建mappings 返回json字符串(对象类型)
///   {
///     "properties": {
///       "field1": { "type": "text" }
///      }
///    }
///
///
///
fn create_mappings(es_fields: &Vec<EsField>) -> String {
    let mut mappings_obj = Object::new();
    let mut properties_obj = Object::new();
    for field in es_fields {
        properties_obj.insert(&field.name, field.filed_type.clone().into());
    }

    mappings_obj.insert("properties", properties_obj.into());
    json::stringify(mappings_obj)
}


关于作者

程序员,软件工程师,java, golang, rust, c, python,vue, Springboot, mybatis, mysql,elasticsearch, docker, maven, gcc, linux, ubuntu, centos, axum,llm, paddlepaddle, onlyoffice,minio,银河麒麟,中科方德,rpm