How to get top distinct users that reply to a specific user?

I'm curious what combination of aggregations would be best for the following use-case. I would like to see the top users (percentage) that have replied to a collection of original tweets made by a user.

For example, the first stage would be getting all tweets made by a user (in this case, Donald Trump). For each tweet, I want to get a list of all users that have have replied to each of his tweets. I then want to see which users have replied the most to all of the original users tweets.

I was thinking a terms aggregation to start, but when computing the top percentage of users that replied, I only want to count each user once per original tweet.

So if Trump made 3 tweets, and a user replied 6 times to his first tweet, once to his second tweet and did not reply to his third tweet, that user would have a score of ~66.6% (the total number of original tweets that user replied to).

Would this involve using a pipeline aggregation? I don't care if a user replied multiple times to any one tweet, what I ultimately want to know is a ranking of a list of users that replied at least one time to multiple tweets.

Thanks!

Ok, I think I've cracked this one. Since it's always nice to see an answer to a question even if the answer is from the original author, I'm going to answer my own question and invite comments, etc.

Here is the Python code to do what I'm trying to do (see below).

Basically, what this does is first select all tweets where the "in_reply_to_user_id" is equal to Trump's twitter id. Since this field alone doesn't necessarily imply the tweet is a reply, I also include the "in_reply_to_status_id" field and set the range to the last three days using the tweet id (Tweet ids contain the epoch millisecond of when the tweet was made which is what the "get_tweet_id_from_time" method achieves -- this is part of their "Snowflake" algo).

Using that selection, I then create an aggregration of all Twitter users that have made a reply to Trump's tweet using a terms aggregation. I then do a cardinality sub-aggregration on that using the "in_reply_to_status_id" field. This will give the count of the total distinct tweets made by Trump that the user replied to.

Feel free to make any suggestions but this looks like it achieves what I'm after. Hopefully this helps someone else.

#!/usr/bin/env python3

import requests
import ujson as json
import sys
from datetime import datetime
import time
from collections import defaultdict

def default_to_regular(d):
    if isinstance(d, defaultdict):
        d = {k: default_to_regular(v) for k, v in d.items()}
    return d

def add_aggretation(q):
    '''Create Aggregation Filter'''
    size = 5000
    q['aggs']['user.screen_name'] = defaultdict(lambda : defaultdict(dict))
    q['aggs']['user.screen_name']['terms'] = {}
    q['aggs']['user.screen_name']['terms']['field'] = 'user.screen_name.keyword'
    q['aggs']['user.screen_name']['terms']['collect_mode'] = 'breadth_first'
    q['aggs']['user.screen_name']['terms']['size'] = size
    q['aggs']['user.screen_name']['terms']['order'] = {'status_ids_count':'desc'}
    q['aggs']['user.screen_name']['terms']['shard_size'] = (size * 10) + 10
    q['aggs']['user.screen_name']['aggs'] = {}
    q['aggs']['user.screen_name']['aggs']['status_ids_count'] = {}
    q['aggs']['user.screen_name']['aggs']['status_ids_count']['cardinality'] = {'field':'in_reply_to_status_id'}

def get_tweet_id_from_time(epoch_milli):
    '''Convert millisecond epoch time to corresponding tweet id'''
    return (int(epoch_milli) - 1288834974657) << 22

def get_tweets(q):
    '''Fetch data from Elasticsearch cluster'''
    q = default_to_regular(q)
   headers = {"content-type": "application/json"}
    url = "http://dreamcast:9200/tweets/_search"
    r = requests.get(url, headers=headers, data=json.dumps(q))
    return r.json()

size = 0
trump_user_id = 25073877
q = defaultdict(lambda : defaultdict(dict))
q['size'] = size
q['query']['bool']['must'] = must = []
must.append({"match":{"in_reply_to_user_id": trump_user_id}})
must.append({"range":{"in_reply_to_status_id":{"gte":get_tweet_id_from_time((time.time()-(86400*3))*1000)}}})
add_aggregation(q)
data = get_tweets(q)

buckets = data['aggregations']['user.screen_name']['buckets']
data['aggregations']['user.screen_name'].pop('buckets', None)
print(data['aggregations']['user.screen_name'])
for obj in buckets:
    print(obj)

Looks about right. A couple of things to watch:

  1. lower the precision_threshold on the cardinality aggregation if you experience memory issues. This will trade accuracy for less space
  2. Check the reported accuracy in the search results. Low shard_size is better for memory but can mean some terms get overlooked - the reported error margins will reflect this. See this wizard for the options.
2 Likes

Thanks Mark for the tips / suggestions! Much appreciated.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.