Write an Elasticsearch plugin to calculate words similarity by cooccurence

For about 10,000 candidate words, I want to calculate their similarites with a user given query word based on following strategy.

  1. Calculate the occurence for each candidate word and each document. This may be performed offline.

  2. Find the documents contains the query word, and for each of them:
    (1). Let C_i (i = 0, 1, ..., N) be the candidate words in the document, then for each occurence of the query word, calculate the partial similarity with C_i as the inverse of their position distance.
    (2). Sum up all the paritial similarities for C_i, given the similarity in current document.

  3. For each candidate word, sum up the similarities in every document, given the final similarity.

I want to impliment this as an aggregation plugin. A query may be like this:

// The documents:
PUT /test/sim/1
{
  "text": "foo bar for far ..."
  "candidates":[
    {1, 10},       // The first candidate word appears at position 10.
    {1, 20},       // The first candidate word appears at position 20 also.
    {2, 30},       // The second candidate word appears at position 30.
    {5, 120},      // The fifth candidate word appears at position 120.
  ]
}

// the query:
GET /test/sim/_search
{
  "aggr":{
    "similarities":{
      "coocurrence_similarity":{
        "query-word": "foo"
      }
    }
  }
}

// expacted result:
{
    { "candidate": 1, "similarity": 1.2 },
    { "candidate": 2, "similarity": 0.8 },
    { "candidate": 5, "similarity": 0.3 },
}

My questions:

  1. How can I get the occurences of the query word in each document?
  2. How can I access the "candidates" array structually, in each document?

Thank you very much!

Can we back up and ask what business problem you are trying to solve before leaping into implementation details?

It looks like your similarity scoring relies on position information only and there's no weighting for candidate word's IDF. Some candidates will be rarer than others I imagine and if "the" was a candidate word it is statistically most likely to be positioned close to any given choice of query word. Anyhoo, as I say let's start with a clearer definition of the problem scenario first because right now this doesn't look easy/efficient to implement in the current form.

Thanks for your reply!

We have collected more than 8, 000, 000 news and the number increases by 5, 000 evrey day. Beside this, we have a fixed candidate word (some domain specific concepts, stock names for example) collection which size is about 5, 000. Each of the news may mention some of the candidate words.

I'm implementing a recommendation-like algorithm, by which, for a user giver query phrase, find the top N relevant candidate words. The algorithm works as follows:

  • Tokenize the query phrase by some analyzer.
  • For each token calculate the partial relavant value for each candidate word weighted by it's IDF.
  • Sum up all partial relavant values given by each token, given the result of the query and each candidate word.

For each token T, the parital relevant value V with each candidate word C is calculated as:

  • Initialize: V = 0
  • If T and C occur in same sentence, V = V + 4, else
  • If T and C occur in same paragrph, V = V + 2, else
  • If T and C occur in same document, V= V + 1.

I'm going to implement this via an Elasticsearch plugin meanwhile get an in depth perception of Elasticsearch. Currently, I'm using ToParentBlockJointQuery and SpanNearQuery on the follwing steps (It may not fully implement the algorithm, but I'm working toward the goal):

private LeafCandidateState aggregateDocument(IndexSearcher searcher, GroupDocs<Integer> groupDocs, Spans spans, LeafCandidateState state) throws IOException {
    System.out.println("DocId from GroupDocs: " + groupDocs.groupValue);
    System.out.println("DocId from spans: " + spans.docID());

    Document parentDoc = searcher.doc(groupDocs.groupValue);

    while (spans.nextStartPosition() != Spans.NO_MORE_POSITIONS) {
        System.out.println("\t" + spans.startPosition() + ", " + spans.endPosition());
    }

    for (ScoreDoc scoreDoc : groupDocs.scoreDocs) {
        Document childDoc = searcher.doc(scoreDoc.doc);
        int candidateId = childDoc.getField(candidateIdField).numericValue().intValue();
        int[] positions = getOrderedDocumentFieldValues(childDoc, candidatePositionsField);

        state.increate(candidateId, 1);
    }

    return state;
}

private InternalAggregation buildLeafAggregation(LeafReaderContext leafReaderContext) throws IOException {
    SearchContext searchContext = context.searchContext();
    ObjectMapper nestedObjectMapper = searchContext.getObjectMapper(candidateInfoPath);
    Query childFilter = nestedObjectMapper.nestedTypeFilter();

    BitSetProducer parentFilter = new QueryBitSetProducer(Queries.newNonNestedFilter());

    ToParentBlockJoinQuery joinQuery = new ToParentBlockJoinQuery(childFilter, parentFilter, ScoreMode.None);
    ToParentBlockJoinCollector collector = new ToParentBlockJoinCollector(Sort.RELEVANCE, 1000, false, false);

    QueryShardContext queryShardContext = searchContext.getQueryShardContext();
    MappedFieldType fieldType = queryShardContext.fieldMapper(keywordTargetField);
    Analyzer analyzer = fieldType != null
        ? queryShardContext.getSearchAnalyzer(fieldType)
        : queryShardContext.getMapperService().searchAnalyzer();
    assert analyzer != null;

    List<SpanTermQuery> termQueries = new ArrayList<>();
    try (TokenStream tokenStream = analyzer.tokenStream(keywordTargetField, new StringReader(keyword))) {
        TermToBytesRefAttribute termAtt = tokenStream.getAttribute(TermToBytesRefAttribute.class);
        tokenStream.reset();

        while (tokenStream.incrementToken()) {
            termQueries.add(new SpanTermQuery(new Term(keywordTargetField, termAtt.getBytesRef())));
        }

        tokenStream.end();
    }

    SpanNearQuery spanNearQuery = new SpanNearQuery(termQueries.toArray(new SpanQuery[termQueries.size()]), keywordSlop, true);

    BooleanQuery booleanQuery = new BooleanQuery.Builder()
        .add(joinQuery, BooleanClause.Occur.MUST)
        .add(spanNearQuery, BooleanClause.Occur.MUST)
        .build();

    IndexReader reader = leafReaderContext.reader();
    IndexSearcher searcher = new IndexSearcher(reader);
    searcher.search(booleanQuery, collector);

    TopGroups<Integer> topGroups = collector.getTopGroupsWithAllChildDocs(joinQuery, Sort.RELEVANCE, 0, 0, false);
    if (topGroups == null || topGroups.groups == null) {
        return buildEmptyAggregation();
    }

    SpanWeight weight = spanNearQuery.createWeight(searcher, false);
    Spans spans = weight.getSpans(leafReaderContext, SpanWeight.Postings.OFFSETS);

    LeafCandidateState state = new LeafCandidateState();

    for (GroupDocs<Integer> groupDocs : topGroups.groups) {
        if (spans.nextDoc() == Spans.NO_MORE_DOCS){
            break;
        }

        state = aggregateDocument(searcher, groupDocs, spans, state);
    }

    return new InternalRecomm(state.number, state.scores, pipelineAggregators(), metaData());
}

The index mapping and some test documents:

PUT /test
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  }, 
  "mappings": {
    "recomm_source": {
      "properties": {
        "id": {
          "type": "integer",
          "store": true
        },
        "text": {
          "type": "text",
          "term_vector": "with_positions_offsets"
        },
        "sentence_ends": {
          "type": "integer",
          "store": true
        },
        "paragraph_ends": {
          "type": "integer",
          "store": true
        },
        "candidate_infos": {
          "type": "nested",
          "properties": {
            "id": {
              "type": "integer",
              "store": true
            },
            "positions": {
              "type": "integer",
              "store": true
            }
          }
        }
      }
    }
  }
}

PUT /test/recomm_source/1
{
  "id": 1,
  "text": "one red dogs jump over the desk.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/2
{
  "id": 2,
  "text": "two red dogs jump over the chair. wo red dogs jump over the chair.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/3
{
  "id": 3,
  "text": "three red dogs jump over the tree.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/4
{
  "id": 4,
  "text": "four red dogs jump over the river.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/5
{
  "id": 5,
  "text": "five red dogs jump over the hill. five red dogs jump over the hill.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/6
{
  "id": 6,
  "text": "six red dogs jump over the hill.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/7
{
  "id": 7,
  "text": "seven red dogs jump over the hill.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/8
{
  "id": 8,
  "text": "eight red dogs jump through the hill.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

PUT /test/recomm_source/9
{
  "id": 9,
  "text": "nine red dogs jump through the hill.",
  "sentence_ends": [0],
  "paragraph_ends": [0],
  "candidate_infos":[
    {
      "id": 0,
      "positions": [0, 3]
    },
    {
      "id": 1,
      "positions": [4, 7]
    }]
}

In the buildLeafAggregation function, the for (GroupDocs groupDocs : topGroups.groups) block, I expect the spas.nextDoc() and topGroups.groups following the same order which is proven wrong by some test.

Would you please help me about this? Thank your very much and sorry for my poor English!

It's probably worth taking a look at the existing significant_terms aggregation. It was designed with this use case in mind. This video provides some background on the stats that provide the useful signals.

Another thing to keep an eye on is the forthcoming significant_text aggregation which is designed to improve significant_terms specifically for use on free-text fields and includes the ability to de-duplicate noisy data typical of news sites.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.