用户数据库:
tb_user:用户表,其中包含用户的详细信息
tb_address:用户地址表
商品数据库
tb_item:商品表
订单数据库
tb_order:用户订单表
tb_order_detail:订单详情表,主要是订单中包含的商品信息
tb_order_logistics:订单物流表,订单的收货人信息
feign-api:是通用的api模块,一些feign的客户端、实体类、工具类都可以放这里
item-service:商品微服务,负责商品的CRUD
order-service:订单微服务,负责订单的CRUD
search-service:搜索服务,负责es搜索和数据同步
user-service:用户微服务,负责用户的CRUD、用户地址的CRUD
项目的pom略...
页面分为后台仓库端,和商城首页,代码略,前端部署在了nginx上,nginx的部署配置截图如下
进入cmd窗口启动ngixn,启动成功后,效果如图:
这儿示例的单机模式,没用到集群
链接里是配置网关的模板:http://t.csdn.cn/Un29Z
步骤简单,如果你要借鉴的话,套自己公司模板就行。
由于mysql和es的特性,查询方面,由mysql同步给es,至于删除修改新增都是基于mysql。
因此业务接口包括:
分页查询商品(基于mysql)
根据id查询商品
新增商品
修改商品
根据id删除商品(直接删除,不做逻辑删除)
上架、下架商品
商品表,实体类略,提供给其他微服务的实体类略
把其他都略了,本篇主要讲海量查询的业务实现。
搜索相关业务包括:
设计索引库数据结构
完成数据导入
实现基本搜索功能
实现过滤项聚合功能
数据同步
应该把mysql表中的哪些字段放到ES中?
搜索时需要的字段 ,商品的标题、分类、品牌、价格
搜索后需要展示的字段: 图片,销量数、评价数
隐藏的字段:id、商品的创建时间
基本字段包括:
用于关键字全文检索的字段,比如All,里面包含name、brand、category信息
分类.品牌.价格.销量.idname.评价.数量.图片
PUT /item
{"mappings": {"properties": {"id":{"type": "long"},"name":{"type": "text","analyzer": "ik_max_word","copy_to": "all"},"price":{"type": "long"},"image":{"type": "keyword"},"category":{"type": "keyword","copy_to": "all"},"brand":{"type": "keyword","copy_to": "all"},"sold":{"type": "integer"},"commentCount":{"type": "integer"},"isAD":{"type": "boolean"},"all":{"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"}}}
}
设计文档对应的Java类:ItemDoc
@Data
@NoArgsConstructor
public class ItemDoc {private Long id;private String name;private Long price;private String image;private String category;private String brand;private Integer sold;private Integer commentCount;private Boolean isAD;public ItemDoc(Item item) {// 属性拷贝BeanUtils.copyProperties(item, this);}
}
要把数据库数据导入到elasticsearch中,包括下面几步:
将商品微服务中的分页查询商品接口定义为一个FeignClient,放到feign-api模块中
搜索服务编写一个业务,实现下面功能:
调用item-service提供的FeignClient,分页查询商品 PageDTO
将查询到的商品封装为一个ItemDoc对象,放入ItemDoc集合
将ItemDoc集合批量导入elasticsearch中
注意:数据库数据比较多,一下导入内存会炸裂哈哈哈,所以分页导入,一下三千条自设定。
1.//fegin里的分页查询接口
@FeignClient("itemservice")
public interface ItemClient {// 查询商品数据@GetMapping("/item/list")public PageDTO- list(@RequestParam(value = "page", defaultValue = "1") Integer pageNum,@RequestParam(value = "size", defaultValue = "5") Integer pageSize);
}
2.
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PageDTO
{/*** 总条数*/private Long total;/*** 当前页数据*/private List list;
}
3.
@Data
public class Item {private Long id;//商品idprivate String name;//商品名称private Long price;//价格(分)private Integer stock;//库存数量private String image;//商品图片private String category;//分类名称private String brand;//品牌名称private String spec;//规格private Integer sold;//销量private Integer commentCount;//评论数private Integer status;//商品状态 1-正常,2-下架private Boolean isAD;// 是否是广告}
@SpringBootTest
public class ESTest {@Autowiredprivate ItemClient itemClient;@Autowiredprivate RestHighLevelClient client;@Testpublic void test() throws Exception {Integer pageNum = 1;// 循环while (true) {// 查询3000条PageDTO- page = itemClient.list(pageNum, 3000);List
- list = page.getList();// 跳出循环if (list.size() < 1) {break;}// 将数据同步到索引库// 1.创建requestBulkRequest request = new BulkRequest();// 2.准备DSLfor (Item item : list) {// 2-1 转为es实体ItemDoc itemDoc = new ItemDoc(item);// 2-2 转为jsonString json = JSON.toJSONString(itemDoc);// 2-3 新增文档request.add(new IndexRequest("item").id(itemDoc.getId().toString()).source(json, XContentType.JSON));}// 3.发送请求client.bulk(request, RequestOptions.DEFAULT);// 页码++pageNum++;}}
}
@RestController
@RequestMapping("/search")
public class SearchController {@Autowiredprivate SearchService searchService;// 分页条件查询@PostMapping("/list")public PageDTO search(@RequestBody RequestParams params) throws IOException {return searchService.search(params);}
@Service
public class SearchService {@Autowiredprivate RestHighLevelClient client;// 分页条件查询public PageDTO search(RequestParams params) throws IOException {// 1.创建requestSearchRequest request = new SearchRequest("item");// 2.准备DSL// 2-1 querybuildBasicQuery(params, request);// 2-2 分页Integer page = params.getPage();Integer size = params.getSize();request.source().from((page-1)*size).size(size);// 2-3 高亮request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));// 3.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析响应return handleResponse(response);}// 抽取查询条件代码private void buildBasicQuery(RequestParams params, SearchRequest request) {String key = params.getKey();BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();if (StrUtil.isEmpty(key)) {boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all", key));}// 品牌if (StrUtil.isNotEmpty(params.getBrand())) {boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}// 分类if (StrUtil.isNotEmpty(params.getCategory())) {boolQuery.filter(QueryBuilders.termQuery("category", params.getCategory()));}// 价格if (params.getMinPrice() != null && params.getMaxPrice() != null) {boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()*100).lte(params.getMaxPrice()*100));}request.source().query(boolQuery);}private PageDTO handleResponse(SearchResponse response) {// 4.解析响应SearchHits searchHits = response.getHits();// 4.1 获取总记录数long total = searchHits.getTotalHits().value;// 4.2 获取文档数组List items = new ArrayList<>();SearchHit[] hits = searchHits.getHits();for (SearchHit hit : hits) {// 获取文档sourceString json = hit.getSourceAsString();ItemDoc hotelDoc = JSON.parseObject(json, ItemDoc.class);// 处理高亮Map highlightFields = hit.getHighlightFields();if (CollectionUtil.isNotEmpty(highlightFields)) {HighlightField highlightField = highlightFields.get("name");if (highlightField!=null) {String name = highlightField.getFragments()[0].string();hotelDoc.setName(name);}}items.add(hotelDoc);}return new PageDTO<>(total,items);}
@RestController
@RequestMapping("/search")
public class SearchController {@Autowiredprivate SearchService searchService;// 分页条件查询@PostMapping("/list")public PageDTO search(@RequestBody RequestParams params) throws IOException {return searchService.search(params);}// 聚合查询@PostMapping("/filters")public Map> filters(@RequestBody RequestParams params) throws IOException{return searchService.filters(params);}
}
// 聚合查询public Map> filters(RequestParams params) throws IOException {// 1.创建requestSearchRequest request = new SearchRequest("item");// 2.准备DSL参数// 2-1 querybuildBasicQuery(params, request);// 2-2 设置sizerequest.source().size(0);// 2-3 聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));request.source().aggregation(AggregationBuilders.terms("categoryAgg").field("category").size(20));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析响应Map> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4-1 根据品牌名称List brandAgg = getAggByName(aggregations, "brandAgg");result.put("brand", brandAgg);// 4-2 根据分类名称List cityAgg = getAggByName(aggregations, "categoryAgg");result.put("category", cityAgg);return result;}private List getAggByName(Aggregations aggregations, String aggName) {// 根据名称获取聚合结果Terms terms = aggregations.get(aggName);// 获取bucktesList extends Terms.Bucket> buckets = terms.getBuckets();// 遍历List list = new ArrayList<>();for (Terms.Bucket bucket : buckets) {// 获取聚合名称(key)String key = bucket.getKeyAsString();// 添加集合list.add(key);}// 返回return list;}
基于RabbitMQ实现数据库、elasticsearch的数据同步,要求如下:
商品上架时:search-service新增商品到elasticsearch
商品下架时:search-service删除elasticsearch中的商品
坐标配置复制模板见此链接:http://t.csdn.cn/YQAwY
消费者或者生产者都可以创建交换机。记得交给spring管理就行。
@Configuration
public class MQConfig {// 创建交换机@Beanpublic DirectExchange itemDirect() {return new DirectExchange("item.direct");}// 上架队列@Beanpublic Queue upQueue() {return new Queue("up.queue");}// 下架队列@Beanpublic Queue downQueue() {return new Queue("down.queue");}// 上架队列绑定交换机@Beanpublic Binding bindingUpQueue(DirectExchange itemDirect,Queue upQueue){return BindingBuilder.bind(upQueue).to(itemDirect).with("item.up");}// 下架队列绑定交换机@Beanpublic Binding bindingDownQueue(DirectExchange itemDirect,Queue downQueue){return BindingBuilder.bind(downQueue).to(itemDirect).with("item.down");}}
@Autowiredprivate RabbitTemplate rabbitTemplate;// 上下架@PutMapping("/status/{id}/{status}")public void updateStataus(@PathVariable("id") Long id,@PathVariable("status") Integer status) {// update tb_item set status = ? where id = ?;// 在mp通用service中支持链式编程 (修改局部字段)itemService.update().set("status", status).eq("id", id).update();// 发送MQ消息if (status==1) {// 上架rabbitTemplate.convertAndSend("item.direct","item.up",id);}else{// 下架rabbitTemplate.convertAndSend("item.direct","item.down",id);}}
@Slf4j
@Component
public class ItemListener {@Autowiredprivate ItemClient itemClient;@Autowiredprivate RestHighLevelClient client;// 监听上架@RabbitListener(queues = "up.queue")public void listenUpQueue(Long id) {try {log.info("上架:{}", id);// 查询数据库Item item = itemClient.findById(id);// 转为es实体ItemDoc itemDoc = new ItemDoc(item);// 转为jsonString json = JSON.toJSONString(itemDoc);// 保存到索引库// 1.创建requestIndexRequest request = new IndexRequest("item").id(id.toString());// 2.准备DSLrequest.source(json, XContentType.JSON);// 3.发送请求client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}// 监听下架@RabbitListener(queues = "down.queue")public void listenDownQueue(Long id) {log.info("下架:{}", id);try {// 1.创建requestDeleteRequest request = new DeleteRequest("item", id.toString());// 2.发送请求client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}
}
@FeignClient("itemservice")
public interface ItemClient {// 查询商品数据@GetMapping("/item/list")public PageDTO- list(@RequestParam(value = "page", defaultValue = "1") Integer pageNum,@RequestParam(value = "size", defaultValue = "5") Integer pageSize);// 根据id查询@GetMapping("/item/{id}")public Item findById(@PathVariable("id") Long id);
}
完成索引库代码同步
ok了,写完了