本片文章只是对之前写的文章的补充,
es与mysql之间的数据同步
http://t.csdn.cn/npHt4
补充一:
之前的文章对于交换机、队列、绑定,使用的是@bean,
而这里使用的是纯注解版
在消费方,声明交换机:
package com.hmall.search.mq;import com.hmall.search.service.IsearchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.events.Event;/*** @author ning* @since 2022/12/12 0:16*/@Slf4j
@Component
public class ItemListener {@Autowiredprivate IsearchService isearchService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "up.queue"),//声明队列exchange = @Exchange(name = "item.topic",type = ExchangeTypes.TOPIC),//声明交换机key = "item.up"//声明绑定关系))private void listenItemUp(Long id){log.info("监听到上架消息:"+ id);isearchService.saveitById(id);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "down.queue"),exchange = @Exchange(name = "item.topic",type = ExchangeTypes.TOPIC),key = "item.down"))private void listenItemDown(Long id){log.info("监听到下架消息:"+ id);isearchService.deleteItemById(id);}}
补充二:
之前的文章是直接使用es操作数据,新增和修改,这样做不是很合适,而且没有遵守单一原则,所以这里使用feign远程调用其他模块的接口方法,
可以参考
http://t.csdn.cn/GqMVN
例如:需要根据id查询数据库,就把其他模块写完的根据id查询数据库的方法写到接口里
package com.hmall.client;import com.hmall.common.dto.Item;
import com.hmall.common.dto.PageDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;/*** 商品模块的远程调用** @author ning* @since 2022/12/9 18:39*/
//表示对应的是itemservice服务器
@FeignClient("itemservice")
public interface ItemClient {//分页查询@GetMapping("/item/list")public PageDTO- list(@RequestParam("page") Integer page, @RequestParam("size") Integer size);//根据id查询数据@GetMapping("/item/{id}")public Item selectById(@PathVariable("id") Long id);
}
以上接口中还有分页查询的内容,详情可以参考
使用分页导入的方式把大量数据从mysql导入es
http://t.csdn.cn/XECXD
//注入es的工具类@Autowiredprivate RestHighLevelClient client;//注入feign远程调用的接口@Autowiredprivate ItemClient itemClient;@Overridepublic void saveitById(Long id) {try {//使用feign的远程调用接口,查询数据库//查询一条商品数据,并转为jsonItem item = itemClient.selectById(id);ItemDoc itemDoc = new ItemDoc(item);String jsonString = JSON.toJSONString(itemDoc);//创建请求IndexRequest request = new IndexRequest("item").id(id.toString());//设置参数request.source(jsonString, XContentType.JSON);//执行请求client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}