wukong引擎源码分析之搜索——docid有序的数组里二分归并求交集,如果用跳表的话,在插入索引时会更快

searcher.Search(types.SearchRequest{Text: "百度中国"})?
// 查找满足搜索条件的文档,此函数线程安全
func (engine *Engine) Search(request types.SearchRequest) (output types.SearchResponse) {
    if !engine.initialized {
        log.Fatal("必须先初始化引擎")
    }

    var rankOptions types.RankOptions
    if request.RankOptions == nil {
        rankOptions = *engine.initOptions.DefaultRankOptions
    } else {
        rankOptions = *request.RankOptions
    }
    if rankOptions.ScoringCriteria == nil {
        rankOptions.ScoringCriteria = engine.initOptions.DefaultRankOptions.ScoringCriteria
    }

    // 收集关键词
    tokens := []string{}
    if request.Text != "" {
        querySegments := engine.segmenter.Segment([]byte(request.Text))
        for _, s := range querySegments {
            token := s.Token().Text()
            if !engine.stopTokens.IsStopToken(token) {
                tokens = append(tokens, s.Token().Text())
            }
        }
    } else {
        for _, t := range request.Tokens {
            tokens = append(tokens, t)
        }
    }

    // 建立排序器返回的通信通道
    rankerReturnChannel := make(
        chan rankerReturnRequest, engine.initOptions.NumShards)

    // 生成查找请求
    lookupRequest := indexerLookupRequest{
        countDocsOnly:       request.CountDocsOnly,
        tokens:              tokens,
        labels:              request.Labels,
        docIds:              request.DocIds,
        options:             rankOptions,
        rankerReturnChannel: rankerReturnChannel,
        orderless:           request.Orderless,
    }

    // 向索引器发送查找请求
    for shard := 0; shard < engine.initOptions.NumShards; shard++ {
        engine.indexerLookupChannels[shard] <- lookupRequest
    }

    // 从通信通道读取排序器的输出
    numDocs := 0
    rankOutput := types.ScoredDocuments{}
    timeout := request.Timeout
    isTimeout := false
    if timeout <= 0 {
        // 不设置超时
        for shard := 0; shard < engine.initOptions.NumShards; shard++ {
            rankerOutput := <-rankerReturnChannel
            if !request.CountDocsOnly {
                for _, doc := range rankerOutput.docs {
                    rankOutput = append(rankOutput, doc)
                }
            }
            numDocs += rankerOutput.numDocs
        }
    } else {
        // 设置超时
        deadline := time.Now().Add(time.Millisecond * time.Duration(request.Timeout))
        for shard := 0; shard < engine.initOptions.NumShards; shard++ {
            select {
            case rankerOutput := <-rankerReturnChannel:
                if !request.CountDocsOnly {
                    for _, doc := range rankerOutput.docs {
                        rankOutput = append(rankOutput, doc)
                    }
                }
                numDocs += rankerOutput.numDocs
            case <-time.After(deadline.Sub(time.Now())):
                isTimeout = true
                break
            }
        }
    }

    // 再排序
    if !request.CountDocsOnly && !request.Orderless {
        if rankOptions.ReverseOrder {
            sort.Sort(sort.Reverse(rankOutput))
        } else {
            sort.Sort(rankOutput)
        }
    }

    // 准备输出
    output.Tokens = tokens
    // 仅当CountDocsOnly为false时才充填output.Docs
    if !request.CountDocsOnly {
        if request.Orderless {
            // 无序状态无需对Offset截断
            output.Docs = rankOutput
        } else {
            var start, end int
            if rankOptions.MaxOutputs == 0 {
                start = utils.MinInt(rankOptions.OutputOffset, len(rankOutput))
                end = len(rankOutput)
            } else {
                start = utils.MinInt(rankOptions.OutputOffset, len(rankOutput))
                end = utils.MinInt(start+rankOptions.MaxOutputs, len(rankOutput))
            }
            output.Docs = rankOutput[start:end]
        }
    }
    output.NumDocs = numDocs
    output.Timeout = isTimeout
    return
}
索引器接受查找请求:
func (engine *Engine) indexerLookupWorker(shard int) {
    for {
        request := <-engine.indexerLookupChannels[shard] // 关键

        var docs []types.IndexedDocument
        var numDocs int        
        if request.docIds == nil {      
            docs, numDocs = engine.indexers[shard].Lookup(request.tokens, request.labels, nil, request.countDocsOnly)
        } else {               
            docs, numDocs = engine.indexers[shard].Lookup(request.tokens, request.labels, request.docIds, request.countDocsOnly)
        }
    
        if request.countDocsOnly {      
            request.rankerReturnChannel <- rankerReturnRequest{numDocs: numDocs}
            continue           
        }
    
        if len(docs) == 0 {    
            request.rankerReturnChannel <- rankerReturnRequest{}
            continue           
        }                      

        if request.orderless { 
            var outputDocs []types.ScoredDocument
            for _, d := range docs {        
                outputDocs = append(outputDocs, types.ScoredDocument{
                    DocId: d.DocId,                 
                    TokenSnippetLocations: d.TokenSnippetLocations,
                    TokenLocations:        d.TokenLocations})
            }

             request.rankerReturnChannel <- rankerReturnRequest{
                docs:    outputDocs,
                numDocs: len(outputDocs),
            }
            continue
        }

        rankerRequest := rankerRankRequest{
            countDocsOnly:       request.countDocsOnly,
            docs:                docs,
            options:             request.options,
            rankerReturnChannel: request.rankerReturnChannel,
        }
        engine.rankerRankChannels[shard] <- rankerRequest
    }
}
lookup函数实现:
// 查找包含全部搜索键(AND操作)的文档
// 当docIds不为nil时仅从docIds指定的文档中查找
func (indexer *Indexer) Lookup(
    tokens []string, labels []string, docIds map[uint64]bool, countDocsOnly bool) (docs []types.IndexedDocument, numDocs int) {
    if indexer.initialized == false {
        log.Fatal("索引器尚未初始化")
    }

    indexer.DocInfosShard.RLock()
    defer indexer.DocInfosShard.RUnlock()

    if indexer.DocInfosShard.NumDocuments == 0 {
        return
    }
    numDocs = 0

    // 合并关键词和标签为搜索键
    keywords := make([]string, len(tokens)+len(labels))
    copy(keywords, tokens)
    copy(keywords[len(tokens):], labels)

    indexer.InvertedIndexShard.RLock()

    table := make([]*types.KeywordIndices, len(keywords))
    for i, keyword := range keywords {
        indices, found := indexer.InvertedIndexShard.InvertedIndex[keyword]
        if !found {
            // 当反向索引表中无此搜索键时直接返回
            indexer.InvertedIndexShard.RUnlock()
            return
        } else {
            // 否则加入反向表中
            table[i] = indices
        }
    // 当没有找到时直接返回
    if len(table) == 0 {
        indexer.InvertedIndexShard.RUnlock()
        return
    }

    // 归并查找各个搜索键出现文档的交集
    // 从后向前查保证先输出DocId较大文档
    indexPointers := make([]int, len(table))
    for iTable := 0; iTable < len(table); iTable++ {
        indexPointers[iTable] = indexer.getIndexLength(table[iTable]) - 1
    }
    // 平均文本关键词长度,用于计算BM25
    avgDocLength := indexer.InvertedIndexShard.TotalTokenLength / float32(indexer.DocInfosShard.NumDocuments)
    indexer.InvertedIndexShard.RUnlock()

    for ; indexPointers[0] >= 0; indexPointers[0]-- {
        // 以第一个搜索键出现的文档作为基准,并遍历其他搜索键搜索同一文档
        baseDocId := indexer.getDocId(table[0], indexPointers[0])

        // 全局范围查找目标文档是否存在
        if _, ok := indexer.DocInfosShard.DocInfos[baseDocId]; !ok {
            // if !IsDocExist(baseDocId) {
            // 文档信息中不存在反向索引文档时,跳过
            // 该情况由不对称删除操作所造成
            continue
        }

        if docIds != nil {
            _, found := docIds[baseDocId]
            if !found {
                continue
            }
        }
        iTable := 1
        found := true
        for ; iTable < len(table); iTable++ {
            // 二分法比简单的顺序归并效率高,也有更高效率的算法,
            // 但顺序归并也许是更好的选择,考虑到将来需要用链表重新实现
           // 以避免反向表添加新文档时的写锁。
            // TODO: 进一步研究不同求交集算法的速度和可扩展性。
            position, foundBaseDocId := indexer.searchIndex(table[iTable],
                0, indexPointers[iTable], baseDocId)
            if foundBaseDocId {
                indexPointers[iTable] = position
            } else {
                if position == 0 {
                    // 该搜索键中所有的文档ID都比baseDocId大,因此已经没有
                    // 继续查找的必要。
                    return
                } else {
                    // 继续下一indexPointers[0]的查找
                    indexPointers[iTable] = position - 1
                    found = false
                    break
                }
            }
        }

        if found {
            indexedDoc := types.IndexedDocument{}

            // 当为LocationsIndex时计算关键词紧邻距离
            if indexer.initOptions.IndexType == types.LocationsIndex {
                // 计算有多少关键词是带有距离信息的
                numTokensWithLocations := 0
                for i, t := range table[:len(tokens)] {
                    if len(t.Locations[indexPointers[i]]) > 0 {
                        numTokensWithLocations++
                    }
                }
                if numTokensWithLocations != len(tokens) {
                    if !countDocsOnly {
                        docs = append(docs, types.IndexedDocument{
                            DocId: baseDocId,
                        })
                    }
                    numDocs++
                    break
                }
                // 计算搜索键在文档中的紧邻距离
                tokenProximity, tokenLocations := computeTokenProximity(table[:len(tokens)], indexPointers, tokens)
                indexedDoc.TokenProximity = int32(tokenProximity)
                indexedDoc.TokenSnippetLocations = tokenLocations

                // 添加TokenLocations
                indexedDoc.TokenLocations = make([][]int, len(tokens))
                for i, t := range table[:len(tokens)] {
                    indexedDoc.TokenLocations[i] = t.Locations[indexPointers[i]]
                }
            }

            // 当为LocationsIndex或者FrequenciesIndex时计算BM25
            if indexer.initOptions.IndexType == types.LocationsIndex ||
                indexer.initOptions.IndexType == types.FrequenciesIndex {
                bm25 := float32(0)
                d := indexer.DocInfosShard.DocInfos[baseDocId].TokenLengths
                for i, t := range table[:len(tokens)] {
                    var frequency float32
                    if indexer.initOptions.IndexType == types.LocationsIndex {
                        frequency = float32(len(t.Locations[indexPointers[i]]))
                    } else {
                        frequency = t.Frequencies[indexPointers[i]]
                    }

                    // 计算BM25
                    if len(t.DocIds) > 0 && frequency > 0 && indexer.initOptions.BM25Parameters != nil && avgDocLength != 0 {
                        // 带平滑的idf
                        idf := float32(math.Log2(float64(indexer.DocInfosShard.NumDocuments)/float64(len(t.DocIds)) + 1))
                        k1 := indexer.initOptions.BM25Parameters.K1
                        b := indexer.initOptions.BM25Parameters.B
                        bm25 += idf * frequency * (k1 + 1) / (frequency + k1*(1-b+b*d/avgDocLength))
                    }
                }
                indexedDoc.BM25 = float32(bm25)
            }

            indexedDoc.DocId = baseDocId
            if !countDocsOnly {
                docs = append(docs, indexedDoc)
            }
            numDocs++
        }
    }
    return
}
?

相关内容推荐