商城项目-es的海量查询/聚合/数据同步
创始人
2024-05-24 15:41:20
0

一.项目搭建

1.sql表
  • 用户数据库:

  • tb_user:用户表,其中包含用户的详细信息

  • tb_address:用户地址表

  • 商品数据库

  • tb_item:商品表

  • 订单数据库

  • tb_order:用户订单表

  • tb_order_detail:订单详情表,主要是订单中包含的商品信息

  • tb_order_logistics:订单物流表,订单的收货人信息

2.模块搭建
  • feign-api:是通用的api模块,一些feign的客户端、实体类、工具类都可以放这里

  • item-service:商品微服务,负责商品的CRUD

  • order-service:订单微服务,负责订单的CRUD

  • search-service:搜索服务,负责es搜索和数据同步

  • user-service:用户微服务,负责用户的CRUD、用户地址的CRUD

项目的pom略...

3.前端

页面分为后台仓库端,和商城首页,代码略,前端部署在了nginx上,nginx的部署配置截图如下

进入cmd窗口启动ngixn,启动成功后,效果如图:

4.启动注册中心

这儿示例的单机模式,没用到集群


5.配网关

链接里是配置网关的模板:http://t.csdn.cn/Un29Z

步骤简单,如果你要借鉴的话,套自己公司模板就行。

二.写项目

由于mysql和es的特性,查询方面,由mysql同步给es,至于删除修改新增都是基于mysql。

因此业务接口包括:

  • 分页查询商品(基于mysql)

  • 根据id查询商品

  • 新增商品

  • 修改商品

  • 根据id删除商品(直接删除,不做逻辑删除)

  • 上架、下架商品

商品表,实体类略,提供给其他微服务的实体类略

把其他都略了,本篇主要讲海量查询的业务实现。


搜索业务

搜索相关业务包括:

  • 设计索引库数据结构

  • 完成数据导入

  • 实现基本搜索功能

  • 实现过滤项聚合功能

  • 数据同步

1.es索引库创建

应该把mysql表中的哪些字段放到ES中?

  1. 搜索时需要的字段 ,商品的标题、分类、品牌、价格

  1. 搜索后需要展示的字段: 图片,销量数、评价数

  1. 隐藏的字段:id、商品的创建时间

基本字段包括:

  • 用于关键字全文检索的字段,比如All,里面包含name、brand、category信息

  • 分类.品牌.价格.销量.idname.评价.数量.图片

PUT /item
{"mappings": {"properties": {"id":{"type": "long"},"name":{"type": "text","analyzer": "ik_max_word","copy_to": "all"},"price":{"type": "long"},"image":{"type": "keyword"},"category":{"type": "keyword","copy_to": "all"},"brand":{"type": "keyword","copy_to": "all"},"sold":{"type": "integer"},"commentCount":{"type": "integer"},"isAD":{"type": "boolean"},"all":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"}}}
}

设计文档对应的Java类:ItemDoc

@Data
@NoArgsConstructor
public class ItemDoc {private Long id;private String name;private Long price;private String image;private String category;private String brand;private Integer sold;private Integer commentCount;private Boolean isAD;public ItemDoc(Item item) {// 属性拷贝BeanUtils.copyProperties(item, this);}
}
2.海量数据导入es

要把数据库数据导入到elasticsearch中,包括下面几步:

将商品微服务中的分页查询商品接口定义为一个FeignClient,放到feign-api模块中

搜索服务编写一个业务,实现下面功能:

调用item-service提供的FeignClient,分页查询商品 PageDTO

将查询到的商品封装为一个ItemDoc对象,放入ItemDoc集合

将ItemDoc集合批量导入elasticsearch中

注意:数据库数据比较多,一下导入内存会炸裂哈哈哈,所以分页导入,一下三千条自设定。

1.//fegin里的分页查询接口
@FeignClient("itemservice")    
public interface ItemClient {// 查询商品数据@GetMapping("/item/list")public PageDTO list(@RequestParam(value = "page", defaultValue = "1") Integer pageNum,@RequestParam(value = "size", defaultValue = "5") Integer pageSize);
}
2.
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PageDTO {/*** 总条数*/private Long total;/*** 当前页数据*/private List list;
}
3.
@Data
public class Item {private Long id;//商品idprivate String name;//商品名称private Long price;//价格(分)private Integer stock;//库存数量private String image;//商品图片private String category;//分类名称private String brand;//品牌名称private String spec;//规格private Integer sold;//销量private Integer commentCount;//评论数private Integer status;//商品状态 1-正常,2-下架private Boolean isAD;// 是否是广告}

@SpringBootTest
public class ESTest {@Autowiredprivate ItemClient itemClient;@Autowiredprivate RestHighLevelClient client;@Testpublic void test() throws Exception {Integer pageNum = 1;// 循环while (true) {// 查询3000条PageDTO page = itemClient.list(pageNum, 3000);List list = page.getList();// 跳出循环if (list.size() < 1) {break;}// 将数据同步到索引库// 1.创建requestBulkRequest request = new BulkRequest();// 2.准备DSLfor (Item item : list) {// 2-1 转为es实体ItemDoc itemDoc = new ItemDoc(item);// 2-2 转为jsonString json = JSON.toJSONString(itemDoc);// 2-3 新增文档request.add(new IndexRequest("item").id(itemDoc.getId().toString()).source(json, XContentType.JSON));}// 3.发送请求client.bulk(request, RequestOptions.DEFAULT);// 页码++pageNum++;}}
}
3.实现基本搜索功能
@RestController
@RequestMapping("/search")
public class SearchController {@Autowiredprivate SearchService searchService;// 分页条件查询@PostMapping("/list")public PageDTO search(@RequestBody RequestParams params) throws IOException {return searchService.search(params);}
@Service
public class SearchService {@Autowiredprivate RestHighLevelClient client;// 分页条件查询public PageDTO search(RequestParams params) throws IOException {// 1.创建requestSearchRequest request = new SearchRequest("item");// 2.准备DSL// 2-1 querybuildBasicQuery(params, request);// 2-2 分页Integer page = params.getPage();Integer size = params.getSize();request.source().from((page-1)*size).size(size);// 2-3 高亮request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));// 3.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析响应return handleResponse(response);}// 抽取查询条件代码private void buildBasicQuery(RequestParams params, SearchRequest request) {String key = params.getKey();BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();if (StrUtil.isEmpty(key)) {boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all", key));}// 品牌if (StrUtil.isNotEmpty(params.getBrand())) {boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}// 分类if (StrUtil.isNotEmpty(params.getCategory())) {boolQuery.filter(QueryBuilders.termQuery("category", params.getCategory()));}// 价格if (params.getMinPrice() != null && params.getMaxPrice() != null) {boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()*100).lte(params.getMaxPrice()*100));}request.source().query(boolQuery);}private PageDTO handleResponse(SearchResponse response) {// 4.解析响应SearchHits searchHits = response.getHits();// 4.1 获取总记录数long total = searchHits.getTotalHits().value;// 4.2 获取文档数组List items = new ArrayList<>();SearchHit[] hits = searchHits.getHits();for (SearchHit hit : hits) {// 获取文档sourceString json = hit.getSourceAsString();ItemDoc hotelDoc = JSON.parseObject(json, ItemDoc.class);// 处理高亮Map highlightFields = hit.getHighlightFields();if (CollectionUtil.isNotEmpty(highlightFields)) {HighlightField highlightField = highlightFields.get("name");if (highlightField!=null) {String name = highlightField.getFragments()[0].string();hotelDoc.setName(name);}}items.add(hotelDoc);}return new PageDTO<>(total,items);}
4.过滤聚合功能
@RestController
@RequestMapping("/search")
public class SearchController {@Autowiredprivate SearchService searchService;// 分页条件查询@PostMapping("/list")public PageDTO search(@RequestBody RequestParams params) throws IOException {return searchService.search(params);}// 聚合查询@PostMapping("/filters")public Map> filters(@RequestBody RequestParams params) throws IOException{return searchService.filters(params);}
}
// 聚合查询public Map> filters(RequestParams params) throws IOException {// 1.创建requestSearchRequest request = new SearchRequest("item");// 2.准备DSL参数// 2-1 querybuildBasicQuery(params, request);// 2-2 设置sizerequest.source().size(0);// 2-3 聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));request.source().aggregation(AggregationBuilders.terms("categoryAgg").field("category").size(20));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析响应Map> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4-1 根据品牌名称List brandAgg = getAggByName(aggregations, "brandAgg");result.put("brand", brandAgg);// 4-2 根据分类名称List cityAgg = getAggByName(aggregations, "categoryAgg");result.put("category", cityAgg);return result;}private List getAggByName(Aggregations aggregations, String aggName) {// 根据名称获取聚合结果Terms terms = aggregations.get(aggName);// 获取bucktesList buckets = terms.getBuckets();// 遍历List list = new ArrayList<>();for (Terms.Bucket bucket : buckets) {// 获取聚合名称(key)String key = bucket.getKeyAsString();// 添加集合list.add(key);}// 返回return list;}

数据同步

基于RabbitMQ实现数据库、elasticsearch的数据同步,要求如下:

  • 商品上架时:search-service新增商品到elasticsearch

  • 商品下架时:search-service删除elasticsearch中的商品

1)启动MQ docker 容器命令启动MQ的镜像
2)item-service和search-service设置连接MQ坐标和配置文件

坐标配置复制模板见此链接:http://t.csdn.cn/YQAwY

3)创建交换机和队列实现绑定

消费者或者生产者都可以创建交换机。记得交给spring管理就行。

@Configuration
public class MQConfig {// 创建交换机@Beanpublic DirectExchange itemDirect() {return new DirectExchange("item.direct");}// 上架队列@Beanpublic Queue upQueue() {return new Queue("up.queue");}// 下架队列@Beanpublic Queue downQueue() {return new Queue("down.queue");}// 上架队列绑定交换机@Beanpublic Binding bindingUpQueue(DirectExchange itemDirect,Queue upQueue){return BindingBuilder.bind(upQueue).to(itemDirect).with("item.up");}// 下架队列绑定交换机@Beanpublic Binding bindingDownQueue(DirectExchange itemDirect,Queue downQueue){return BindingBuilder.bind(downQueue).to(itemDirect).with("item.down");}}
4)item-service发送消息
@Autowiredprivate RabbitTemplate rabbitTemplate;// 上下架@PutMapping("/status/{id}/{status}")public void updateStataus(@PathVariable("id") Long id,@PathVariable("status") Integer status) {// update tb_item set status = ? where id = ?;// 在mp通用service中支持链式编程 (修改局部字段)itemService.update().set("status", status).eq("id", id).update();// 发送MQ消息if (status==1) {// 上架rabbitTemplate.convertAndSend("item.direct","item.up",id);}else{// 下架rabbitTemplate.convertAndSend("item.direct","item.down",id);}}
5)search-service监听消息
@Slf4j
@Component
public class ItemListener {@Autowiredprivate ItemClient itemClient;@Autowiredprivate RestHighLevelClient client;// 监听上架@RabbitListener(queues = "up.queue")public void listenUpQueue(Long id) {try {log.info("上架:{}", id);// 查询数据库Item item = itemClient.findById(id);// 转为es实体ItemDoc itemDoc = new ItemDoc(item);// 转为jsonString json = JSON.toJSONString(itemDoc);// 保存到索引库// 1.创建requestIndexRequest request = new IndexRequest("item").id(id.toString());// 2.准备DSLrequest.source(json, XContentType.JSON);// 3.发送请求client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}// 监听下架@RabbitListener(queues = "down.queue")public void listenDownQueue(Long id) {log.info("下架:{}", id);try {// 1.创建requestDeleteRequest request = new DeleteRequest("item", id.toString());// 2.发送请求client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}
}
6)编写fegin中间件里ItemClient实现根据id查询商品对象
@FeignClient("itemservice")
public interface ItemClient {// 查询商品数据@GetMapping("/item/list")public PageDTO list(@RequestParam(value = "page", defaultValue = "1") Integer pageNum,@RequestParam(value = "size", defaultValue = "5") Integer pageSize);// 根据id查询@GetMapping("/item/{id}")public Item findById(@PathVariable("id") Long id);
}

完成索引库代码同步

ok了,写完了

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...