相信负责过“搜索服务”的招制伙伴,最害怕的敌老一句话就是:“数据怎么又搜索不出来了!!板再!也不用”。搜索数据每当收到这句话,而操都会心中一颤,招制因为面对几千万甚至几亿的敌老索引数据,我真的板再无从下手,不知道业务要搜索什么,也不用也不知道是搜索数据哪些数据出了问题…. 目前,“搜索”已经成为后端管理平台的而操必备功能,在这个业务场景中,招制很多人都会基于 elasticsearch 强大的敌老检索能力构建自己的搜索服务。但实际开发中,板再elasticsearch 的引入是非常小的一部分,往往大头是索引模型的数据管理,在整个过程中,我们 如此繁琐的事情,哪一环出现问题都会收到业务的投诉。 对搜索场景中的最佳实践进行封装,从而: 首先,增加对 spring data elasticsearch 的支持,具体 maven 坐标如下: org.springframework.boot spring-boot-starter-data-elasticsearch 在 application.yml 中添加 es 的配置信息,具体如下: spring: elasticsearch: uris: http://localhost:9200 connection-timeout: 10s 新建 SpringESConfiguration 配置信息,指定 ES Repository 的包信息,居然如下: @Configuration @EnableElasticsearchRepositories(basePackages = "com.geekhalo.lego.wide.es") public class SpringESConfiguration { 最后,引入 lego-starter,具体如下: com.geekhalo.lego lego-starter 至此,就完成了项目的准备工具,可以着手构建索引模型。 构造模型之前,需要构建一个 Enum 用以管理模型中所有关联数据,网站模板具体如下: public enum WideOrderType implements WideItemType { ORDER, // 订单主数据 USER, // 用户数据 ADDRESS, // 用户地址数据 PRODUCT // 购买商品数据 WideOrderType 枚举实现 WideItemType 接口,用于与框架进行集成。 接下来,构建待索引的宽表模型,具体如下: @Data @NoArgsConstructor @AllArgsConstructor @Document(indexName = "wide_order") public class WideOrder extends BindFromBasedWide { @org.springframework.data.annotation.Id private Long id; @BindFrom(sourceClass = Order.class, field = "userId") private Long userId; @BindFrom(sourceClass = Order.class, field = "addressId") private Long addressId; @BindFrom(sourceClass = Order.class, field = "productId") private Long productId; @BindFrom(sourceClass = Order.class, field = "descr") private String orderDescr; @BindFrom(sourceClass = User.class, field = "name") private String userName; @BindFrom(sourceClass = Address.class, field = "detail") private String addressDetail; @BindFrom(sourceClass = Product.class, field = "name") private String productName; @BindFrom(sourceClass = Product.class, field = "price") private Integer productPrice; public WideOrder(Long orderId){ setId(orderId); } @Override public Long getId(){ return id; } @Override public boolean isValidate(){ return userId != null && addressId != null && productId != null; } @Override public List getItemsKeyByType(WideOrderType wideOrderType){ switch (wideOrderType){ case ORDER: return Collections.singletonList(new WideItemKey(wideOrderType, getId())); case USER: return Collections.singletonList(new WideItemKey(wideOrderType, getUserId())); case ADDRESS: return Collections.singletonList(new WideItemKey(wideOrderType, getAddressId())); case PRODUCT: return Collections.singletonList(new WideItemKey(wideOrderType, getProductId())); } return Collections.emptyList(); } 该模型有如下几个特点: 至此,模型就建立完毕。 有了模型后,我们需要构建一些组件用于为“宽表”提供数据,这就是 WideItemDataProvider 体系。 我们以 OrderProvider 为例,具体如下: @Component @org.springframework.core.annotation.Order(value = Ordered.HIGHEST_PRECEDENCE) public class OrderProvider implements WideItemDataProvider { @Autowired private OrderDao orderDao; @Override public List apply(List key){ return orderDao.findAllById(key); } @Override public WideOrderType getSupportType(){ return WideOrderType.ORDER; } 该类有如下特点: 每一类关联数据都会提供自己的数据提供器,简单看下 UserProvider 实现,具体如下: @Component public class UserProvider implements WideItemDataProvider { @Autowired private UserDao userDao; @Override public List apply(List key){ return userDao.findAllById(key); } @Override public WideOrderType getSupportType(){ return WideOrderType.USER; } 和 OrderProvider 没有本质区别,当然,demo 中还提供了多种实现,如: 数据都准备好了,需要将 “宽表” 进行持久化,将其放入最合适的存储引擎,以便更好的处理查询请求。 基于 ElasticsearchRepository 的 WideOrderRepository 具体如下: @Repository public class WideOrderRepository implements WideCommandRepository { @Autowired private WideOrderESDao wideOrderDao; @Override public void save(List wides){ wideOrderDao.saveAll(wides); } @Override public List findByIds(List masterIds){ return Lists.newArrayList(wideOrderDao.findAllById(masterIds)); } @Override public void consumeByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer){ switch (wideOrderType){ case PRODUCT: this.wideOrderDao.findByProductId((Long) key).forEach(wideConsumer); case ADDRESS: this.wideOrderDao.findByAddressId((Long) key).forEach(wideConsumer); case ORDER: this.wideOrderDao.findById((Long) key).ifPresent(wideConsumer); case USER: this.wideOrderDao.findByUserId((Long) key).forEach(wideConsumer); } } @Override public boolean supportUpdateFor(WideOrderType wideOrderType){ return false; } @Override public void updateByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer){ ConsumerupdateAndSave = wideConsumer.andThen(wideOrder -> wideOrderDao.save(wideOrder)); switch (wideOrderType){ case PRODUCT: this.wideOrderDao.findByProductId((Long) key).forEach(updateAndSave); case ADDRESS: this.wideOrderDao.findByAddressId((Long) key).forEach(updateAndSave); case ORDER: this.wideOrderDao.findById((Long) key).ifPresent(updateAndSave); case USER: this.wideOrderDao.findByUserId((Long) key).forEach(updateAndSave); } } @Override public void updateByItem(WideOrderType wideOrderType, KEY key, WideItemData item){ } 仓库具有如下特征: 所依赖的 WideOrderESDao 基于 ElasticsearchRepository 实现,具体如下: public interface WideOrderESDao extends ElasticsearchRepository { List findByProductId(Long productId); List findByAddressId(Long addressId); List findByUserId(Long userId); 所有组件都已准备好,现在需要将他们整合在一起。 @Configuration public class WideOrderConfiguration extends WideConfigurationSupport { @Autowired private WideOrderRepository wideOrderRepository; @Autowired private List @Bean public WideIndexService createWideIndexService(){ return super.createWideIndexService(); } @Bean public WideOrderPatrolService wideOrderPatrolService(){ return new WideOrderPatrolService(createWidePatrolService()); } @Bean protected WideService createWideService( WideIndexService wideIndexService, WideOrderPatrolService wideOrderPatrolService){ return super.createWideService(wideIndexService, wideOrderPatrolService); } @Override protected WideFactory getWideFactory() { return WideOrder::new; } @Override protected WideCommandRepository getWideCommandRepository() { return this.wideOrderRepository; } @Override protected List return this.wideItemDataProviders; } WideOrderConfiguration 具有如下特点: 其中自定义巡检 WideOrderPatrolService 代码如下: public class WideOrderPatrolService implements WidePatrolService{ private final WidePatrolService widePatrolService; public WideOrderPatrolService(WidePatrolService widePatrolService){ this.widePatrolService = widePatrolService; } @Override @DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "SingleIndex", consumerGroup = "order_patrol_group", delayLevel = 2) public void index(Long aLong){ this.widePatrolService.index(aLong); } @Override public void index(List longs){ WideOrderPatrolService wideOrderPatrolService = ((WideOrderPatrolService)AopContext.currentProxy()); longs.forEach(id -> wideOrderPatrolService.index(id)); } @Override public void updateItem(WideOrderType wideOrderType, KEY key){ ((WideOrderPatrolService)AopContext.currentProxy()).updateItem(wideOrderType, (Long) key); } @DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "UpdateByItem", consumerGroup = "order_patrol_group", delayLevel = 2) public void updateItem(WideOrderType wideOrderType, Long id){ this.widePatrolService.updateItem(wideOrderType, id); } @Override public void setReindexConsumer(Consumer this.widePatrolService.setReindexConsumer(consumer); } WideOrderPatrolService 具体实现如下: 万事具备只欠东风,写个测试用例测试下功能。 2.6.1. 数据索引 首先,对数据进行索引,示例如下: // 保存 User this.user = new User(); this.user.setName("测试"); this.userDao.save(this.user); // 保存 Address this.address = new Address(); this.address.setDetail("详细地址"); this.address.setUserId(this.user.getId()); this.addressDao.save(this.address); // 保存 Product this.product = new Product(); this.product.setName("商品名称"); this.product.setPrice(100); this.productDao.save(this.product); // 保存 Order this.order = new Order(); this.order.setUserId(this.user.getId()); this.order.setAddressId(this.address.getId()); this.order.setProductId(this.product.getId()); this.order.setDescr("我的订单"); this.orderDao.save(this.order); // 进行索引 this.wideOrderService.index(this.order.getId()); // 比对数据 Optional optional = wideOrderDao.findById(this.order.getId()); Assertions.assertTrue(optional.isPresent()); WideOrder wideOrder = optional.get(); Assertions.assertEquals(order.getId(), wideOrder.getId()); Assertions.assertEquals(order.getAddressId(), wideOrder.getAddressId()); Assertions.assertEquals(order.getProductId(), wideOrder.getProductId()); Assertions.assertEquals(order.getUserId(), wideOrder.getUserId()); Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr()); Assertions.assertEquals(user.getName(), wideOrder.getUserName()); Assertions.assertEquals(address.getDetail(), wideOrder.getAddressDetail()); Assertions.assertEquals(product.getName(), wideOrder.getProductName()); 单测成功运行后,数据已经成功写入到 ES,具体如下: 2.6.2. 数据更新 更新操作,具体单测如下: // 更新订单描述 this.order.setDescr("订单详情"); this.orderDao.save(this.order); // 触发索引更新 this.wideOrderService.updateOrder(this.order.getId()); // 验证更新结果 Optionaloptional = wideOrderDao.findById(this.order.getId()); Assertions.assertTrue(optional.isPresent()); WideOrder wideOrder = optional.get(); Assertions.assertEquals(order.getId(), wideOrder.getId()); 单测成功运行后,数据已经完成更新,ES 数据具体如下: 2.6.3. 数据巡检 仔细观察日志,会发现存在一组 Delay Task 日志,具体如下: [ main] c.g.l.core.delay.DelayMethodInterceptor : success to sent Delay Task to RocketMQ for [126] 第一条日志是在提交索引时由主线程打印,向 RocketMQ 提交一个延时任务,用于对 id 为 126 的数据进行校验; 第二条是时间达到后由 Message Consumer 线程打印,表明 DB 与 ES 中的数据是相同的; 如果巡检时发现数据不同,将会自动对 126 进行索引,从而保障两者的一致性; 整体架构设计如下: 从功能角度,整体可分为如下几部分: 3.2. 功能扩展 wide 为宽表提供了索引和巡检能力支持,但在实际业务中需要处理多种情况,常见如下: 基于领域事件的索引。监听应用程序发出的领域事件,从而触发新数据的索引; 基于 binlog 的索引。MySQL 的变化全部记录在 binlog 中,可以通过 canal 等框架将 binlog 进行导出,用于触发数据索引; 由于业务需要 ES 检索模型发生变更,需要重跑历史数据; 系统故障导致数据不一致,通过手工触发的方式对问题数据进行修复; 避免错误在 ES 进行累计,也就是在索引和巡检两个机制都不生效的情况下,对问题数据进行修复; 索引优化,在数据完成重建后,可以调用 ES 索引优化接口,对索引进行合并,从而提升系统查询性能; 项目仓库地址:https://gitee.com/litao851025/lego 项目文档地址:https://gitee.com/litao851025/lego/wikis/support/Wide%20%E5%AE%BD%E8%A1%A81. 概览
2. 快速入门
2.1. 准备环境3. 设计&扩展
3.1. 核心设计4. 项目信息