新聞中心
1. 概覽
相信負(fù)責(zé)過“搜索服務(wù)”的伙伴,最害怕的一句話就是:“數(shù)據(jù)怎么又搜索不出來了!??!”。每當(dāng)收到這句話,都會心中一顫,因為面對幾千萬甚至幾億的索引數(shù)據(jù),我真的無從下手,不知道業(yè)務(wù)要搜索什么,也不知道是哪些數(shù)據(jù)出了問題….

1.1. 背景
目前,“搜索”已經(jīng)成為后端管理平臺的必備功能,在這個業(yè)務(wù)場景中,很多人都會基于 elasticsearch 強(qiáng)大的檢索能力構(gòu)建自己的搜索服務(wù)。但實際開發(fā)中,elasticsearch 的引入是非常小的一部分,往往大頭是索引模型的數(shù)據(jù)管理,在整個過程中,我們
- 需要根據(jù)業(yè)務(wù)需求構(gòu)建檢索模型和ES存儲模型;
- 需要從多個數(shù)據(jù)源中獲取數(shù)據(jù),并填充到檢索模型;
- 需要關(guān)注所有數(shù)據(jù)源的數(shù)據(jù)變化,并對變更數(shù)據(jù)進(jìn)行索引重建;
- 需要對不一致的數(shù)據(jù)進(jìn)行識別和處理…
如此繁瑣的事情,哪一環(huán)出現(xiàn)問題都會收到業(yè)務(wù)的投訴。
1.2. 目標(biāo)
對搜索場景中的最佳實踐進(jìn)行封裝,從而:
- 降低開發(fā)成本,開發(fā)人員將精力放在模型構(gòu)建上,拋開繁瑣的技術(shù)細(xì)節(jié);
- 對數(shù)據(jù)索引、關(guān)聯(lián)數(shù)據(jù)更新有很好的支持;
- 引入數(shù)據(jù)實時巡檢能力,對于數(shù)據(jù)不一致的情況進(jìn)行自動修復(fù);
- 引入天級對賬機(jī)制,保障數(shù)據(jù)的一致性;
2. 快速入門
2.1. 準(zhǔn)備環(huán)境
首先,增加對 spring data elasticsearch 的支持,具體 maven 坐標(biāo)如下:
org.springframework.boot
spring-boot-starter-data-elasticsearch
在 application.yml 中添加 es 的配置信息,具體如下:
spring:
elasticsearch:
uris: http://localhost:9200
connection-timeout: 10s
socket-timeout: 30s
新建 SpringESConfiguration 配置信息,指定 ES Repository 的包信息,居然如下:
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.geekhalo.lego.wide.es")
public class SpringESConfiguration {
}
最后,引入 lego-starter,具體如下:
com.geekhalo.lego lego-starter 0.1.14-wide-SNAPSHOT
至此,就完成了項目的準(zhǔn)備工具,可以著手構(gòu)建索引模型。
2.2. 構(gòu)建模型
構(gòu)造模型之前,需要構(gòu)建一個 Enum 用以管理模型中所有關(guān)聯(lián)數(shù)據(jù),具體如下:
public enum WideOrderType implements WideItemType{
ORDER, // 訂單主數(shù)據(jù)
USER, // 用戶數(shù)據(jù)
ADDRESS, // 用戶地址數(shù)據(jù)
PRODUCT // 購買商品數(shù)據(jù)
}
WideOrderType 枚舉實現(xiàn) WideItemType 接口,用于與框架進(jìn)行集成。
接下來,構(gòu)建待索引的寬表模型,具體如下:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName = "wide_order")
public class WideOrder extends BindFromBasedWide{
@org.springframework.data.annotation.Id
private Long id;
@BindFrom(sourceClass = Order.class, field = "userId")
private Long userId;
@BindFrom(sourceClass = Order.class, field = "addressId")
private Long addressId;
@BindFrom(sourceClass = Order.class, field = "productId")
private Long productId;
@BindFrom(sourceClass = Order.class, field = "descr")
private String orderDescr;
@BindFrom(sourceClass = User.class, field = "name")
private String userName;
@BindFrom(sourceClass = Address.class, field = "detail")
private String addressDetail;
@BindFrom(sourceClass = Product.class, field = "name")
private String productName;
@BindFrom(sourceClass = Product.class, field = "price")
private Integer productPrice;
public WideOrder(Long orderId){
setId(orderId);
}
@Override
public Long getId(){
return id;
}
@Override
public boolean isValidate(){
return userId != null && addressId != null && productId != null;
}
@Override
public ListgetItemsKeyByType(WideOrderType wideOrderType){
switch (wideOrderType){
case ORDER:
return Collections.singletonList(new WideItemKey(wideOrderType, getId()));
case USER:
return Collections.singletonList(new WideItemKey(wideOrderType, getUserId()));
case ADDRESS:
return Collections.singletonList(new WideItemKey(wideOrderType, getAddressId()));
case PRODUCT:
return Collections.singletonList(new WideItemKey(wideOrderType, getProductId()));
}
return Collections.emptyList();
}
}
該模型有如下幾個特點:
- 存在很多屬性,是由多個表數(shù)據(jù)共同組成的“寬表”;
- 除 id 屬性外,其他屬性上都有 @BindFrom 注解,用于標(biāo)明該字段的數(shù)據(jù)是來自于哪個實體的那個字段;
- 繼承自 BindFromBasedWide
,其中 Long 為模型主鍵,WideOrderType 為剛建的枚舉,BindFromBasedWide 將根據(jù)字段上的 @BindFrom 注解自動完成 數(shù)據(jù)更新 和 數(shù)據(jù)比對; - Long getId() 方法返回模型的主鍵信息;
- boolean isValidate() 用于對數(shù)據(jù)的有效性進(jìn)行驗證,無效數(shù)據(jù)將不會進(jìn)行持久化處理
- List
getItemsKeyByType(WideOrderType wideOrderType) 根據(jù)關(guān)聯(lián)數(shù)據(jù)類型(WideOrderType)返回不同鍵信息,以進(jìn)行數(shù)據(jù)組裝;
至此,模型就建立完畢。
2.3. 數(shù)據(jù)提供器
有了模型后,我們需要構(gòu)建一些組件用于為“寬表”提供數(shù)據(jù),這就是 WideItemDataProvider 體系。
我們以 OrderProvider 為例,具體如下:
@Component
@org.springframework.core.annotation.Order(value = Ordered.HIGHEST_PRECEDENCE)
public class OrderProvider implements WideItemDataProvider{
@Autowired
private OrderDao orderDao;
@Override
public Listapply(List key){
return orderDao.findAllById(key);
}
@Override
public WideOrderType getSupportType(){
return WideOrderType.ORDER;
}
}
該類有如下特點:
- 實現(xiàn) WideItemDataProvider 接口,其中 WideOrderType 為剛剛定義的枚舉,Long 為 Order 模型的關(guān)聯(lián)鍵類型,Order 為要提供的數(shù)據(jù);
- List
apply(List key),根據(jù) key 獲得對應(yīng)的數(shù)據(jù); - WideOrderType getSupportType(),獲取該組件所支持的 關(guān)聯(lián)類型;
- @Component 標(biāo)記該類為 Spring 的托管 Bean;
- @Order(value = Ordered.HIGHEST_PRECEDENCE) 指定組件的順序,由于為 WideOrder 提供主數(shù)據(jù),優(yōu)先級調(diào)到最高;
每一類關(guān)聯(lián)數(shù)據(jù)都會提供自己的數(shù)據(jù)提供器,簡單看下 UserProvider 實現(xiàn),具體如下:
@Component
public class UserProvider implements WideItemDataProvider{
@Autowired
private UserDao userDao;
@Override
public Listapply(List key){
return userDao.findAllById(key);
}
@Override
public WideOrderType getSupportType(){
return WideOrderType.USER;
}
}
和 OrderProvider 沒有本質(zhì)區(qū)別,當(dāng)然,demo 中還提供了多種實現(xiàn),如:
- OrderProvider,提供訂單主數(shù)據(jù);
- UserProvider,提供用戶信息;
- AddressProvider,提供用戶地址信息;
- ProductProvider,提供商品信息;
2.4. 構(gòu)建寬表倉庫
數(shù)據(jù)都準(zhǔn)備好了,需要將 “寬表” 進(jìn)行持久化,將其放入最合適的存儲引擎,以便更好的處理查詢請求。
基于 ElasticsearchRepository 的 WideOrderRepository 具體如下:
@Repository
public class WideOrderRepository implements WideCommandRepository{
@Autowired
private WideOrderESDao wideOrderDao;
@Override
public void save(Listwides){
wideOrderDao.saveAll(wides);
}
@Override
public ListfindByIds(List masterIds){
return Lists.newArrayList(wideOrderDao.findAllById(masterIds));
}
@Override
publicvoid consumeByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer){
switch (wideOrderType){
case PRODUCT:
this.wideOrderDao.findByProductId((Long) key).forEach(wideConsumer);
case ADDRESS:
this.wideOrderDao.findByAddressId((Long) key).forEach(wideConsumer);
case ORDER:
this.wideOrderDao.findById((Long) key).ifPresent(wideConsumer);
case USER:
this.wideOrderDao.findByUserId((Long) key).forEach(wideConsumer);
}
}
@Override
public boolean supportUpdateFor(WideOrderType wideOrderType){
return false;
}
@Override
publicvoid updateByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer){
ConsumerupdateAndSave = wideConsumer.andThen(wideOrder -> wideOrderDao.save(wideOrder));
switch (wideOrderType){
case PRODUCT:
this.wideOrderDao.findByProductId((Long) key).forEach(updateAndSave);
case ADDRESS:
this.wideOrderDao.findByAddressId((Long) key).forEach(updateAndSave);
case ORDER:
this.wideOrderDao.findById((Long) key).ifPresent(updateAndSave);
case USER:
this.wideOrderDao.findByUserId((Long) key).forEach(updateAndSave);
}
}
@Override
publicvoid updateByItem(WideOrderType wideOrderType, KEY key, WideItemData item){
}
}
倉庫具有如下特征:
- 實現(xiàn) WideCommandRepository
接口,其中 Long 是模型主鍵(也是寬表主鍵),WideOrderType 是之前定義的枚舉,WideOrder 是寬表; - void save(List
wides) 提供批量保存方法; - List
findByIds(List masterIds) 提供根據(jù)主鍵批量查詢方法; - void consumeByItem(WideOrderType wideOrderType, KEY key, Consumer
wideConsumer),該方法主要用于數(shù)據(jù)巡檢,根據(jù)類型 和 鍵信息 從底層引擎中獲取數(shù)據(jù),并進(jìn)行部分比對,用于發(fā)現(xiàn)數(shù)據(jù)不一致情況; - boolean supportUpdateFor(WideOrderType wideOrderType),該實現(xiàn)用于判斷是否支持特定類型的批量更新,及依賴引擎能力批量對數(shù)據(jù)進(jìn)行更新操作;
- void updateByItem(WideOrderType wideOrderType, KEY key, WideItemData
item),supportUpdateFor 返回為 true 時,調(diào)用該方法,使用引擎的更新能力批量對數(shù)據(jù)進(jìn)行更新; - void updateByItem(WideOrderType wideOrderType, KEY key, Consumer
wideConsumer),supportUpdateFor 返回為 false 時,調(diào)用該方法,根據(jù) 類型 和 鍵信息 依次查詢所有數(shù)據(jù),在內(nèi)存中完成更新,并寫回存儲引擎;
所依賴的 WideOrderESDao 基于 ElasticsearchRepository 實現(xiàn),具體如下:
public interface WideOrderESDao extends ElasticsearchRepository{
ListfindByProductId(Long productId);
ListfindByAddressId(Long addressId);
ListfindByUserId(Long userId);
}
2.5. 配置&整合
所有組件都已準(zhǔn)備好,現(xiàn)在需要將他們整合在一起。
@Configuration
public class WideOrderConfiguration extends WideConfigurationSupport{
@Autowired
private WideOrderRepository wideOrderRepository;
@Autowired
private List>> wideItemDataProviders;
@Bean
public WideIndexServicecreateWideIndexService(){
return super.createWideIndexService();
}
@Bean
public WideOrderPatrolService wideOrderPatrolService(){
return new WideOrderPatrolService(createWidePatrolService());
}
@Bean
protected WideServicecreateWideService(
WideIndexServicewideIndexService,
WideOrderPatrolService wideOrderPatrolService){
return super.createWideService(wideIndexService, wideOrderPatrolService);
}
@Override
protected WideFactorygetWideFactory() {
return WideOrder::new;
}
@Override
protected WideCommandRepositorygetWideCommandRepository() {
return this.wideOrderRepository;
}
@Override
protected List>> getWideItemProviders() {
return this.wideItemDataProviders;
}
}
WideOrderConfiguration 具有如下特點:
- 繼承自 WideConfigurationSupport
,父類中存在大量的 createXXX 方法,可以大幅簡單代碼量; - 使用 WideOrderRepository 作為寬表的倉庫;
- 直接使用 Spring 容器中的所有 WideItemDataProvider 實現(xiàn);
- 使用定制的 WideOrderPatrolService,為巡檢增加延時支持;
其中自定義巡檢 WideOrderPatrolService 代碼如下:
public class WideOrderPatrolService implements WidePatrolService{
private final WidePatrolServicewidePatrolService;
public WideOrderPatrolService(WidePatrolServicewidePatrolService){
this.widePatrolService = widePatrolService;
}
@Override
@DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "SingleIndex", consumerGroup = "order_patrol_group", delayLevel = 2)
public void index(Long aLong){
this.widePatrolService.index(aLong);
}
@Override
public void index(Listlongs){
WideOrderPatrolService wideOrderPatrolService = ((WideOrderPatrolService)AopContext.currentProxy());
longs.forEach(id -> wideOrderPatrolService.index(id));
}
@Override
publicvoid updateItem(WideOrderType wideOrderType, KEY key){
((WideOrderPatrolService)AopContext.currentProxy()).updateItem(wideOrderType, (Long) key);
}
@DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "UpdateByItem", consumerGroup = "order_patrol_group", delayLevel = 2)
public void updateItem(WideOrderType wideOrderType, Long id){
this.widePatrolService.updateItem(wideOrderType, id);
}
@Override
public void setReindexConsumer(Consumer> consumer){
this.widePatrolService.setReindexConsumer(consumer);
}
}
WideOrderPatrolService 具體實現(xiàn)如下:
- 將大部分請求直接轉(zhuǎn)發(fā)給內(nèi)部的 widePatrolService 實例;
- 在索引和更新方法上增加了 @DelayBasedRocketMQ 注解,該注解使的方法擁有延時執(zhí)行的能力,如果對該注解感興趣可以翻找下之前的文章;
- 使用 AopContext 在類內(nèi)獲取 Proxy 對象并調(diào)用其方法,由于 AOP 實現(xiàn)的限制,在類中直接調(diào)用本類中的其他方法,不會觸發(fā)攔截器;
2.6. 實現(xiàn)效果
萬事具備只欠東風(fēng),寫個測試用例測試下功能。
2.6.1. 數(shù)據(jù)索引
首先,對數(shù)據(jù)進(jìn)行索引,示例如下:
// 保存 User
this.user = new User();
this.user.setName("測試");
this.userDao.save(this.user);
// 保存 Address
this.address = new Address();
this.address.setDetail("詳細(xì)地址");
this.address.setUserId(this.user.getId());
this.addressDao.save(this.address);
// 保存 Product
this.product = new Product();
this.product.setName("商品名稱");
this.product.setPrice(100);
this.productDao.save(this.product);
// 保存 Order
this.order = new Order();
this.order.setUserId(this.user.getId());
this.order.setAddressId(this.address.getId());
this.order.setProductId(this.product.getId());
this.order.setDescr("我的訂單");
this.orderDao.save(this.order);
// 進(jìn)行索引
this.wideOrderService.index(this.order.getId());
// 比對數(shù)據(jù)
Optionaloptional = wideOrderDao.findById(this.order.getId());
Assertions.assertTrue(optional.isPresent());
WideOrder wideOrder = optional.get();
Assertions.assertEquals(order.getId(), wideOrder.getId());
Assertions.assertEquals(order.getAddressId(), wideOrder.getAddressId());
Assertions.assertEquals(order.getProductId(), wideOrder.getProductId());
Assertions.assertEquals(order.getUserId(), wideOrder.getUserId());
Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr());
Assertions.assertEquals(user.getName(), wideOrder.getUserName());
Assertions.assertEquals(address.getDetail(), wideOrder.getAddressDetail());
Assertions.assertEquals(product.getName(), wideOrder.getProductName());
Assertions.assertEquals(product.getPrice(), wideOrder.getProductPrice());
單測成功運(yùn)行后,數(shù)據(jù)已經(jīng)成功寫入到 ES,具體如下:
2.6.2. 數(shù)據(jù)更新
更新操作,具體單測如下:
// 更新訂單描述
this.order.setDescr("訂單詳情");
this.orderDao.save(this.order);
// 觸發(fā)索引更新
this.wideOrderService.updateOrder(this.order.getId());
// 驗證更新結(jié)果
Optionaloptional = wideOrderDao.findById(this.order.getId());
Assertions.assertTrue(optional.isPresent());
WideOrder wideOrder = optional.get();
Assertions.assertEquals(order.getId(), wideOrder.getId());
Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr());
單測成功運(yùn)行后,數(shù)據(jù)已經(jīng)完成更新,ES 數(shù)據(jù)具體如下:
2.6.3. 數(shù)據(jù)巡檢
仔細(xì)觀察日志,會發(fā)現(xiàn)存在一組 Delay Task 日志,具體如下:
[ main] c.g.l.core.delay.DelayMethodInterceptor : success to sent Delay Task to RocketMQ for [126]
[MessageThread_2] c.g.l.c.w.s.SimpleWidePatrolService : id 126 is same
第一條日志是在提交索引時由主線程打印,向 RocketMQ 提交一個延時任務(wù),用于對 id 為 126 的數(shù)據(jù)進(jìn)行校驗;
第二條是時間達(dá)到后由 Message Consumer 線程打印,表明 DB 與 ES 中的數(shù)據(jù)是相同的;
如果巡檢時發(fā)現(xiàn)數(shù)據(jù)不同,將會自動對 126 進(jìn)行索引,從而保障兩者的一致性;
3. 設(shè)計&擴(kuò)展
3.1. 核心設(shè)計
整體架構(gòu)設(shè)計如下:
從功能角度,整體可分為如下幾部分:
- Index 索引部分。內(nèi)部可以看成是一個基于 檢索模型 的 Pipeline,從眾多數(shù)據(jù)提供器中獲取數(shù)據(jù),并寫入 檢索模型,最終將填充完數(shù)據(jù)的檢索模型寫入的 ES 進(jìn)行持久化存儲;
- Query 查詢部分。直接使用 ES 的 api 對成功索引的數(shù)據(jù)進(jìn)行查詢。
- 巡檢部分。在數(shù)據(jù)變更時,會自動增加一個延時任務(wù)用于數(shù)據(jù)比較,巡檢任務(wù)獲取變更數(shù)據(jù)后與ES存儲記錄進(jìn)行比較,如果數(shù)據(jù)不一致則向 Index 模塊重新提交索引任務(wù),對問題數(shù)據(jù)進(jìn)行再次索引,從而對數(shù)據(jù)進(jìn)行恢復(fù);
3.2. 功能擴(kuò)展
wide 為寬表提供了索引和巡檢能力支持,但在實際業(yè)務(wù)中需要處理多種情況,常見如下:
- 自動觸發(fā),這是系統(tǒng)核心流程之一,數(shù)據(jù)發(fā)生變化后,向 Index 提交新的索引任務(wù)。常見的實現(xiàn)策略有:
基于領(lǐng)域事件的索引。監(jiān)聽?wèi)?yīng)用程序發(fā)出的領(lǐng)域事件,從而觸發(fā)新數(shù)據(jù)的索引;
基于 binlog 的索引。MySQL 的變化全部記錄在 binlog 中,可以通過 canal 等框架將 binlog 進(jìn)行導(dǎo)出,用于觸發(fā)數(shù)據(jù)索引;
- 手工回溯,手工觸發(fā)索引流程,常見的場景有:
由于業(yè)務(wù)需要 ES 檢索模型發(fā)生變更,需要重跑歷史數(shù)據(jù);
系統(tǒng)故障導(dǎo)致數(shù)據(jù)不一致,通過手工觸發(fā)的方式對問題數(shù)據(jù)進(jìn)行修復(fù);
- 天級數(shù)據(jù)重建。每天凌晨對前一天的數(shù)據(jù)進(jìn)行索引重建,主要目的為:
避免錯誤在 ES 進(jìn)行累計,也就是在索引和巡檢兩個機(jī)制都不生效的情況下,對問題數(shù)據(jù)進(jìn)行修復(fù);
索引優(yōu)化,在數(shù)據(jù)完成重建后,可以調(diào)用 ES 索引優(yōu)化接口,對索引進(jìn)行合并,從而提升系統(tǒng)查詢性能;
4. 項目信息
項目倉庫地址
項目文檔地址
網(wǎng)頁標(biāo)題:「一招制敵」老板再也不用為“搜索不到數(shù)據(jù)”而操心了
URL分享:http://www.5511xx.com/article/dhshdoi.html


咨詢
建站咨詢
