This commit is contained in:
2025-08-27 19:57:04 +08:00
parent 590f753102
commit 7edcaef27b

View File

@@ -0,0 +1,223 @@
package com.agentsflex.engine.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.agentsflex.core.document.Document;
import com.agentsflex.search.engine.service.DocumentSearcher;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.*;
public class ElasticSearcher implements DocumentSearcher {
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearcher.class);
private final ESConfig esConfig;
public ElasticSearcher(ESConfig esConfig) {
this.esConfig = esConfig;
}
// 忽略 SSL 的 client 构建逻辑
private RestClient buildRestClient() throws NoSuchAlgorithmException, KeyManagementException {
TrustManager[] trustAllCerts = new TrustManager[]{
new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
};
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(esConfig.getUserName(), esConfig.getPassword()));
return RestClient.builder(HttpHost.create(esConfig.getHost()))
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setSSLContext(sslContext);
httpClientBuilder.setSSLHostnameVerifier((hostname, session) -> true);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
})
.build();
}
/**
* 添加文档到Elasticsearch
*/
@Override
public boolean addDocument(Document document) {
if (document == null || document.getContent() == null) {
return false;
}
RestClient restClient = null;
ElasticsearchTransport transport = null;
try {
restClient = buildRestClient();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
Map<String, Object> source = new HashMap<>();
source.put("id", document.getId());
source.put("content", document.getContent());
if (document.getTitle() != null) {
source.put("title", document.getTitle());
}
String documentId = document.getId().toString();
IndexOperation<?> indexOp = IndexOperation.of(i -> i
.index(esConfig.getIndexName())
.id(documentId)
.document(JsonData.of(source))
);
BulkOperation bulkOp = BulkOperation.of(b -> b.index(indexOp));
BulkRequest request = BulkRequest.of(b -> b.operations(Collections.singletonList(bulkOp)));
BulkResponse response = client.bulk(request);
return !response.errors();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return false;
} finally {
closeResources(transport, restClient);
}
}
@Override
public List<Document> searchDocuments(String keyword, int count) {
RestClient restClient = null;
ElasticsearchTransport transport = null;
try {
restClient = buildRestClient();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
SearchRequest request = SearchRequest.of(s -> s
.index(esConfig.getIndexName())
.size(count)
.query(q -> q
.match(m -> m
.field("title")
.field("content")
.query(keyword)
)
)
);
SearchResponse<Document> response = client.search(request, Document.class);
List<Document> results = new ArrayList<>();
response.hits().hits().forEach(hit -> results.add(hit.source()));
return results;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return Collections.emptyList();
} finally {
closeResources(transport, restClient);
}
}
@Override
public boolean deleteDocument(Object id) {
if (id == null) {
return false;
}
RestClient restClient = null;
ElasticsearchTransport transport = null;
try {
restClient = buildRestClient();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
DeleteRequest request = DeleteRequest.of(d -> d
.index(esConfig.getIndexName())
.id(id.toString())
);
DeleteResponse response = client.delete(request);
return response.result() == co.elastic.clients.elasticsearch._types.Result.Deleted;
} catch (Exception e) {
LOG.error("Error deleting document with id: " + id, e);
return false;
} finally {
closeResources(transport, restClient);
}
}
@Override
public boolean updateDocument(Document document) {
if (document == null || document.getId() == null) {
return false;
}
RestClient restClient = null;
ElasticsearchTransport transport = null;
try {
restClient = buildRestClient();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
UpdateRequest<Document, Object> request = UpdateRequest.of(u -> u
.index(esConfig.getIndexName())
.id(document.getId().toString())
.doc(document)
);
UpdateResponse<Document> response = client.update(request, Object.class);
return response.result() == co.elastic.clients.elasticsearch._types.Result.Updated;
} catch (Exception e) {
LOG.error("Error updating document with id: " + document.getId(), e);
return false;
} finally {
closeResources(transport, restClient);
}
}
private void closeResources(AutoCloseable... closeables) {
for (AutoCloseable closeable : closeables) {
try {
if (closeable != null)
closeable.close();
} catch (Exception e) {
LOG.error("Error closing resource", e);
}
}
}
}