发表于 2024/03/29 19:03:22 [rust] 浏览次数:487
使用es官方提供的rust语言sdk访问elasticsearch
1.引入依赖
[dependencies]
elasticsearch = "7.17.7-alpha.1"
serde = "1.0.190",
json ="0.12.4"
serde_json ="1.0.108"
2.api调用示例
//! @Author: DengLibin
//! @Date: Create in 2024-03-28 11:47:26
//! @Description:
//!
//!
use std::fmt::Display;
use elasticsearch::{
http::transport::{SingleNodeConnectionPool, Transport, TransportBuilder},
indices::{IndicesCreateParts, IndicesDeleteParts},
params::Conflicts,
Elasticsearch,
};
use json::{number::Number, object::Object, JsonValue};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
#[derive(Debug)]
pub struct EsError {
pub msg: String,
}
impl std::fmt::Display for EsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.msg)
}
}
// 自定义结果
pub type EsResult<T> = std::result::Result<T, EsError>;
//转换结果
fn check_result<T, E: Display>(result: Result<T, E>) -> EsResult<T> {
return match result {
Ok(r) => Result::Ok(r),
Err(err) => Result::Err(EsError {
msg: err.to_string(),
}),
};
}
// es字段类型
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EsFieldType {
pub type_name: String, //类型名 text, keyword, byte,boolean, short,integer,long,float, half_float, scaled_float,double,date,dense_vector(向量)
pub store: bool, //是否存储
pub index: bool, //是否索引
pub dims: u16, //向量类型 需要指定向量维度
}
//转json字符串
impl Into<JsonValue> for EsFieldType {
fn into(self) -> JsonValue {
let mut obj: Object = Object::new();
//是否向量
let is_vec = self.type_name.as_str() == "dense_vector";
let is_text = self.type_name.as_str() == "text";
obj.insert("type", JsonValue::String(self.type_name));
obj.insert("store", JsonValue::Boolean(self.store));
obj.insert("index", JsonValue::Boolean(self.index));
if is_vec {
//向量类型
obj.insert("dims", JsonValue::Number(Number::from(self.dims))); //向量维度
}
if is_text {
obj.insert("analyzer", JsonValue::String("html_text_analyzer".into())); //使用自定义的分词器 忽略html标签
obj.insert("search_analyzer", JsonValue::String("ik_smart".into()));
}
json::JsonValue::Object(obj)
}
}
//es字段
#[derive(Debug, Serialize, Deserialize)]
pub struct EsField {
pub name: String, //字段名
pub filed_type: EsFieldType, //字段类型
}
//文档结构
#[derive(Debug, Serialize, Deserialize)]
pub struct EsDoc<D> {
pub _index: String,
pub _type: String,
pub _id: String,
pub _version: i16,
pub _seq_no: i16,
pub _primary_term: i16,
pub found: bool,
pub _source: D,
}
// 根据id查询多个返回数据结构
#[derive(Debug, Serialize, Deserialize)]
pub struct Docs<D> {
pub docs: Vec<EsDoc<D>>,
}
//添加的文档实现这个trait,返回添加文档的id,返回None,es自动生成文档的id
pub trait EsId {
fn id(&self) -> Option<String>;
}
//搜索search 结果json结构
#[derive(Deserialize,Serialize, Debug)]
pub struct SearchResult<D> {
pub took: u32, //耗时 ms
pub timed_out: bool, //是否超时
pub hits: HitsObj<D>,
}
//命中对象结构
#[derive(Deserialize, Serialize,Debug)]
pub struct HitsObj<D> {
pub total: TotalObj, //总数
pub max_score: Option<f32>, //最大得分
pub hits: Vec<HitObj<D>> //命中文档列表
}
#[derive(Deserialize, Serialize, Debug)]
pub struct TotalObj{
pub value: u64,
pub relation: Option<String>,
}
//命中文档对象
#[derive(Deserialize, Serialize, Debug)]
pub struct HitObj<D>{
pub _index: String,
pub _type: String,
pub _id: String, //es id
pub _score: f32, //得分
pub _source: D, //文档对象
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 11:49:53
/// @Description: 创建客户端 http://localhost:9200
pub async fn get_local_client() -> Elasticsearch {
Elasticsearch::default()
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 12:02:51
/// @Description: 创建客户端
pub async fn get_client_by_url(url: &str) -> EsResult<Elasticsearch> {
let transport = check_result(Transport::single_node(url))?;
let client = Elasticsearch::new(transport);
Ok(client)
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 12:05:08
/// @Description: 创建客户端
pub async fn get_client_by_pool(url: &str) -> EsResult<Elasticsearch> {
let url = check_result(Url::parse(url))?;
let conn_pool = SingleNodeConnectionPool::new(url);
let transport = check_result(TransportBuilder::new(conn_pool).disable_proxy().build())?;
let client = Elasticsearch::new(transport);
Ok(client)
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 13:44:15
/// @Description: 建索引
pub async fn create_index(
client: &Elasticsearch,
index_name: &str,
es_fields: &Vec<EsField>,
) -> EsResult<()> {
//settings
let settings: serde_json::Value = json!({
"number_of_shards": 3,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"html_text_analyzer": {
"tokenizer": "ik_max_word",
"char_filter": [
"html_strip"
]
}
}
}
});
//mappings
let mappings = create_mappings(es_fields);
let mappings = check_result(serde_json::from_str::<Value>(&mappings))?;
let body = json!({"settings": settings, "mappings": mappings});
let r = client
.indices()
.create(IndicesCreateParts::Index(index_name))
.body(body)
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 删除索引
pub async fn delete_index(client: &Elasticsearch, index_name: &str) -> EsResult<()> {
let r = client
.indices()
.delete(IndicesDeleteParts::Index(&[index_name]))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 清空
pub async fn clear(client: &Elasticsearch, index_name: &str) -> EsResult<()> {
let index_names = [index_name];
let delete_parts = elasticsearch::DeleteByQueryParts::Index(&index_names);
//查询删除 匹所有
let q_json = json!({"query":{"match_all": {}}});
let r = client
.delete_by_query(delete_parts)
.conflicts(Conflicts::Proceed)
.refresh(true)
.body(q_json)
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 是否存在
pub async fn exists(client: &Elasticsearch, index_name: &str) -> EsResult<bool> {
let index_names = [index_name];
let exists_parts = elasticsearch::indices::IndicesExistsParts::Index(&index_names);
let r = client.indices().exists(exists_parts).send().await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
if let Ok(_) = r {
Ok(true)
} else {
Ok(false)
}
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 添加一篇文档
pub async fn insert_doc<T: Serialize>(
client: &Elasticsearch,
index_name: &str,
id: &str,
doc: T,
) -> EsResult<()> {
// let body = json::stringify(doc);
// let body_value = check_result(serde_json::from_str::<Value>(&body))?;
let r = client
.index(elasticsearch::IndexParts::IndexId(index_name, id))
.refresh(elasticsearch::params::Refresh::True)
.body(doc)
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 更新一篇文档
pub async fn update_by_id<T: Serialize>(
client: &Elasticsearch,
index_name: &str,
id: &str,
doc: T,
) -> EsResult<()> {
// let body = json::stringify(doc);
// let body_value = check_result(serde_json::from_str::<Value>(&body))?;
let r = client
.update(elasticsearch::UpdateParts::IndexId(index_name, id))
.refresh(elasticsearch::params::Refresh::True)
.body(json!({"doc": doc}))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 更新某个字段
pub async fn update_value_by_id(
client: &Elasticsearch,
index_name: &str,
id: &str,
field_name: &str, //字段名
value: &str, //字段值
) -> EsResult<()> {
let r = client
.update(elasticsearch::UpdateParts::IndexId(index_name, id))
.refresh(elasticsearch::params::Refresh::True)
.body(json!({"doc": {field_name: value}}))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 查询一篇文档
pub async fn get_source_by_id<DOC>(
client: &Elasticsearch,
index_name: &str,
id: &str,
) -> EsResult<DOC>
where
DOC: for<'de> serde::Deserialize<'de>,
{
let r = client
.get_source(elasticsearch::GetSourceParts::IndexId(index_name, id))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let r = check_result(r)?;
let doc = check_result(r.json::<DOC>().await)?;
Ok(doc)
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 查询一篇文档
pub async fn get_source_by_ids<DOC>(
client: &Elasticsearch,
index_name: &str,
ids: &Vec<String>,
) -> EsResult<Vec<DOC>>
where
DOC: for<'de> serde::Deserialize<'de>,
{
let r = client
.mget(elasticsearch::MgetParts::Index(index_name))
.body(json!({"ids": ids}))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let r = check_result(r)?;
let doc = check_result(r.json::<Docs<DOC>>().await)?;
// println!("{}", r.text().await.unwrap());
let mut vec = vec![];
for doc in doc.docs {
let source = doc._source;
vec.push(source);
}
Ok(vec)
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 删除
pub async fn delete_by_id(client: &Elasticsearch, index_name: &str, id: &str) -> EsResult<()> {
let r = client
.delete(elasticsearch::DeleteParts::IndexId(index_name, id))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 16:31:50
/// @Description: 删除
pub async fn delete_by_ids(
client: &Elasticsearch,
index_name: &str,
ids: &Vec<String>,
) -> EsResult<()> {
let bulk_parts = elasticsearch::BulkParts::Index(index_name);
let delete_lines = ids
.iter()
.map(|id| json!({"delete": {"_id": id}}).to_string())
.collect::<Vec<String>>();
//bulk 批量操作 : https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-bulk.html
let r = client.bulk(bulk_parts).body(delete_lines).send().await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-29 09:05:48
/// @Description: 批量添加, es批量添加数据格式(两行为一组,第一行指定索引id(也可为空),第二行为实际的数据体。):
/// {"index":{"_id":"1"}}
///{"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL"}
///
pub async fn insert_docs<T: Serialize + EsId>(
client: &Elasticsearch,
index_name: &str,
docs: &Vec<T>,
) -> EsResult<()> {
let bulk_parts = elasticsearch::BulkParts::Index(index_name);
let index_lines = docs
.iter()
.map(|doc| {
let line1 = json!({"index": {"_id": doc.id()}}).to_string();
let line2 = serde_json::to_string(doc).unwrap();
format!("{}\n{}", line1, line2)
})
.collect::<Vec<String>>();
let r = client.bulk(bulk_parts).body(index_lines).send().await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-29 10:33:11
/// @Description: 计数
pub async fn count(client: &Elasticsearch, index_name: &str) -> EsResult<u64> {
let r = client
.count(elasticsearch::CountParts::Index(&[index_name]))
.send()
.await;
let r = check_result(r)?;
let s = check_result(r.text().await)?;
let j_v = check_result(json::parse(s.as_str()))?;
let count = j_v["count"].as_u64();
if let Some(c) = count {
Ok(c)
} else {
Ok(0)
}
// let r = r.error_for_status_code();
// let _ = check_result(r)?;
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-29 10:43:19
/// @Description: 重建索引
/// https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-reindex.html
pub async fn re_index(
client: &Elasticsearch,
source_index: &str,
dest_index: &str,
) -> EsResult<()> {
let r = client
.reindex()
.body(json!({
"source": {
"index": source_index
},
"dest": {
"index": dest_index
}
}))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-29 10:50:05
/// @Description: 根据某个字段删除
pub async fn delete_by_term_query(
client: &Elasticsearch,
index_name: &str,
field_name: &str,
field_value: &str,
) -> EsResult<()> {
let r = client
.delete_by_query(elasticsearch::DeleteByQueryParts::Index(&[index_name]))
.body(json!({
"query": {
"term": {
field_name: field_value
}
}
}))
.send()
.await;
let r = check_result(r)?;
// println!("{}", r.text().await.unwrap());
let r = r.error_for_status_code();
let _ = check_result(r)?;
Ok(())
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-29 10:50:05
/// @Description: 查询
pub async fn search_by_term_query<DOC>(
client: &Elasticsearch,
index_name: &str,
field_name: &str,
field_value: &str,
from: i64,
size: i64,
) -> EsResult<SearchResult<DOC>>
where
DOC: for<'de> serde::Deserialize<'de>,{
let r = client
.search(elasticsearch::SearchParts::Index(&[index_name]))
.from(from)
.size(size)
// ._source(&["id", "title"]) //需要哪些字段
._source(&["true"]) //所有字段
// ._source(&["false"]) //不返回_source的任何字段
.sort(&["_score:desc"]) //排序 数字类型
.body(json!({
"query": {
"term": {
field_name: field_value
}
}
}))
.send()
.await;
let r = check_result(r)?;
let r = r.json::<SearchResult<DOC>>().await;
// println!("{}", r.text().await.unwrap());
// let r = r.error_for_status_code();
// let _ = check_result(r)?;
let r = check_result(r)?;
Ok(r)
}
/// @Author: DengLibin
/// @Date: Create in 2024-03-28 13:50:06
/// @Description: 根据字段创建mappings 返回json字符串(对象类型)
/// {
/// "properties": {
/// "field1": { "type": "text" }
/// }
/// }
///
///
///
fn create_mappings(es_fields: &Vec<EsField>) -> String {
let mut mappings_obj = Object::new();
let mut properties_obj = Object::new();
for field in es_fields {
properties_obj.insert(&field.name, field.filed_type.clone().into());
}
mappings_obj.insert("properties", properties_obj.into());
json::stringify(mappings_obj)
}