Need help with search_after and PIT for deep pagination

okay so i need help. i am currently working on a project where millions of documents have to be displayed. Currently, there are 50 documents on each page, and there are 200 pages, totaling 10,000 documents. however, i want the user to click next page or the arrow and it allows them to go to page 201, 202, 300, 400, etc, until it reaches the end of the documents. yes I know it is a lot, but it is required.

i am currently using Elastic Search for this. as well as Python and javascript. however, what i have right now is not working. i read Elasticsearch documentation and it says that i need to use the search_after parameter along with the point in time. however, i keep getting errors with what i have. these error messages are making me crazy. below is what I currently have so far.

@blueprint.route("/browse-all", methods=["GET"])
def browse_all():
    """
    This view function handles GET requests for the Browse All page.
    A query is made for the initial page of certificates shown.

    :return: Template for the browse all page and a list of certificates to display.
             Redirect to public.view_certificate if only one certificate is returned.
    """
    # Get default year range/values
    try:
        cached_year_range = get_year_range()
        if cached_year_range and cached_year_range.year_min and cached_year_range.year_max:
            default_year_range_value = f"{cached_year_range.year_min} - {cached_year_range.year_max}"
        else:
            raise ValueError("Invalid year range")
    except Exception as e:
        current_app.logger.error(f"Error getting year range: {str(e)}")
        default_year_range_value = "1855 - 1949"  # Fallback range

    form = BrowseAllForm()
    page = request.args.get('page', 1, type=int)
    page_size = 50

    # Set up variables for search
    q_list = []

    for key, value in request.args.items():
        if key != 'page' and value:
            if key == "year_range":
                year_range = [int(year) for year in value.split() if year.isdigit()]
                if len(year_range) == 2:
                    q_list.append(Q("range", year={"gte": year_range[0], "lte": year_range[1]}))
            elif key in ["first_name", "last_name"]:
                q_list.append(Q("multi_match", query=value.capitalize(), fields=[key, f"spouse_{key}"]))
            elif key == "certificate_type" and value == "marriage":
                q_list.append(Q("terms", cert_type=[certificate_types.MARRIAGE, certificate_types.MARRIAGE_LICENSE]))
            else:
                q_list.append(Q("match", **{key: value}))

    q = Q("bool", must=q_list) if q_list else Q("match_all")

    # Define the sort order
    sort_order = [{"_doc": "asc"}]

    # Create a new PIT for each request
    pit = es.open_point_in_time(index="certificates", keep_alive="1m")
    pit_id = pit['id']

    # Get search_after from request arguments
    search_after = request.args.get('search_after')
    if search_after:
        search_after = search_after.split(',')

    # Construct the search body
    body = {
        "size": page_size,
        "query": q.to_dict(),
        "sort": sort_order,
        "pit": {
            "id": pit_id,
            "keep_alive": "1m"
        }
    }

    if search_after and page > 1:
        body["search_after"] = search_after

    # Perform the search
    try:
        response = es.search(body=body)
    except Exception as e:
        current_app.logger.error(f"Elasticsearch error: {str(e)}")
        return render_template("public/error.html", error_message="An error occurred while searching. Please try again.")

    # Get the total count
    count = response["hits"]["total"]["value"]

    # If only one certificate is returned, go directly to the view certificate page
    if count == 1:
        es.close_point_in_time(id=pit_id)
        return redirect(url_for("public.view_certificate", certificate_id=response["hits"]["hits"][0]["_id"]))

    # Process hits to include image URLs
    processed_hits = []
    for hit in response['hits']['hits']:
        processed_hit = hit['_source']
        processed_hit['_id'] = hit['_id']
        if 'image_url' not in processed_hit:
            processed_hit['image_url'] = url_for('static', filename='images/placeholder.png')
        processed_hits.append(processed_hit)

    # Set form data from previous form submissions
    for field in form:
        if field.name in request.args:
            field.data = request.args[field.name]

    # Simplified remove_filters logic
    remove_filters = {}
    for key, value in request.args.items():
        if key not in ['page', 'search_after'] and value:
            if not (key == "year_range" and value == default_year_range_value):
                label = value
                if key == "certificate_type":
                    label = certificate_types.CERTIFICATE_TYPE_VALUES.get(value, value)
                elif key == "county":
                    label = counties.COUNTY_VALUES.get(value, value)
                
                args_without_filter = request.args.copy()
                args_without_filter.pop(key)
                new_url = url_for('public.browse_all', **args_without_filter)
                remove_filters[key] = (label, new_url)

    # Define pagination
    pagination = Pagination(page=page, total=count, per_page=page_size, css_framework="bootstrap4")
    
    next_page = page + 1 if len(response['hits']['hits']) == page_size else None

    # Prepare next_args for the next page URL
    next_args = request.args.copy()
    if next_page:
        next_args['page'] = next_page
        if response['hits']['hits']:
            next_args['search_after'] = ','.join(map(str, response['hits']['hits'][-1]['sort']))
    else:
        next_args.pop('page', None)
        next_args.pop('search_after', None)

    # Close the PIT
    es.close_point_in_time(id=pit_id)

    return render_template(
        "public/browse_all.html",
        form=form,
        certificates=processed_hits,
        pagination=pagination,
        remove_filters=remove_filters,
        num_results=count,
        next_page=next_page,
        next_args=next_args,
        certificate_types=certificate_types,
        counties=counties,
        min=min,
        default_year_range_value=default_year_range_value
    )

From Elastic Search to Elasticsearch

Removed docker

Welcome!

What are the errors?

1 Like

It’s a variety of different errors. undefined errors, syntax errors, jenja errors, Elasticsearch errors. I try and fix one error and then the other just comes up. It’s like a loop of errors for me.

I’m currently off my PC right now, it’s late here so unfortunately I cannot show the exact errors. However, just a basic rundown is that I have millions of documents/certificates that need to be shown. I am currently using ES 7.16.1 / 7.17 (one of those two) and I have 50 documents on the page, and 200 pages that can be shown.

But i do want the user to be able to go past page 200, such as 201,300,401, etc. Until there are no more documents. I understand I have to use search_after and tried my best but it doesn’t work.

This is not possible AFAIK with PIT.

You can use:

  • the size and from parameters to display by default up to 10000 records to your users. If you want to change this limit, you can change index.max_result_window setting but be aware of the consequences (ie memory).
  • the search after feature to do deep pagination.
  • the Scroll API if you want to extract a resultset to be consumed by another tool later. (Not recommended anymore)

I currently have search_after in my code but keep getting errors. Its in the code i sent above.

There's no way to help without seeing the errors.

1 Like

Below is my vrp/public/views.py file. And the section where there is the browse_all. I reimplemented it right now and did some tweaks. I only used search_after here as well. Now when I do docker-compose up flask-dev and go to the link, I am able to see the first 50 documents aka page 1. However, when I click next, I get the following error message. " TypeError
TypeError: jinja2.runtime.Context.call() got multiple values for keyword argument 'page'".

• Why does the first page say 10,000 documents? I want it to say everything.
• When I click next that error comes up to go to the next page.

@blueprint.route("/browse-all", methods=["GET"])
def browse_all():
    """
    This view function handles GET requests for the Browse All page.
    A query is made for the initial page of certificates shown.

    :return: Template for the browse all page and a list of certificates to display.
             Redirect to public.view_certificate if only one certificate is returned.
    """
    # Get default year range/values
    cached_year_range = get_year_range()
    default_year_range_value = "{} - {}".format(cached_year_range.year_min, cached_year_range.year_max)

    form = BrowseAllForm()
    page = request.args.get('page', 1, type=int)

    # Set up variables for search
    size = 50
    q_list = []

    for key, value in [
        ("cert_type", request.args.get("certificate_type", "")),
        ("number", request.args.get("number", "").lstrip("0")),
        ("county", request.args.get("county", "")),
        ("year", request.args.get("year", "")),
        ("year_range", request.args.get("year_range", "")),
        ("first_name", request.args.get("first_name", "")),
        ("last_name", request.args.get("last_name", ""))
    ]:
        if value:
            if key == "year_range":
                year_range = [int(year) for year in value.split() if year.isdigit()]
                q_list.append(Q("range", year={"gte": year_range[0], "lte": year_range[1]}))
            elif key in ["first_name", "last_name"]:
                q_list.append(Q("multi_match", query=value.capitalize(), fields=[key, "spouse_"+key]))
            # Marriage certificates and marriage licenses are considered the same record type to the user
            elif key == "cert_type" and value == "marriage":
                q_list.append(Q("terms", cert_type=[certificate_types.MARRIAGE, certificate_types.MARRIAGE_LICENSE]))
            else:
                q_list.append(Q("match", **{key: value}))
    q = Q("bool", must=q_list)

    # Create Search object
    s = Search(using=es, index="certificates").query(q)

    # Add sort
    s = s.sort("cert_type", "year", "last_name", "county", "_id")

    # Apply search_after if it exists and we're not on the first page
    if 'search_after' in session and page > 1:
        s = s.extra(search_after=session['search_after'])

    # Set size
    s = s[:size]

    # Execute search
    res = s.execute()

    # Store search_after values for next page
    if len(res) > 0:
        session['search_after'] = list(res[-1].meta.sort)  # Convert AttrList to a regular list
    else:
        session.pop('search_after', None)

    # Get total count (this will be approximate for large result sets)
    count = res.hits.total.value

    # If only one certificate is returned, go directly to the view certificate page
    if count == 1:
        return redirect(url_for("public.view_certificate", certificate_id=res[0].id))

    # Set form data from previous form submissions
    form.certificate_type.data = request.args.get("certificate_type", "")
    form.county.data = request.args.get("county", "")
    form.year_range.data = request.args.get("year_range", "")
    form.year.data = request.args.get("year", "")
    form.number.data = request.args.get("number", "")
    form.last_name.data = request.args.get("last_name", "")
    form.first_name.data = request.args.get("first_name", "")

    # Handle filter labels and URLs
    remove_filters = {}
    current_args = request.args.to_dict()
    for key, value in request.args.items():
        if key != "page" and value:
            if not (key == "year_range" and value == default_year_range_value):
                # Handle filter label
                if key == "certificate_type":
                    value = certificate_types.CERTIFICATE_TYPE_VALUES.get(value)
                elif key == "county":
                    value = counties.COUNTY_VALUES.get(value)
                elif key == "number":
                    value = "Certificate Number: {}".format(value)
                elif key == "first_name":
                    value = "First Name: {}".format(value)
                elif key == "last_name":
                    value = "Last Name: {}".format(value)

                # Handle new URL
                current_args.pop(key)
                new_url = "{}?{}".format(request.base_url, urlencode(current_args))

                remove_filters[key] = (value, new_url)
        current_args = request.args.to_dict()

    # Custom pagination for search_after
    total_pages = (count + size - 1) // size
    has_prev = page > 1
    has_next = page < total_pages

    return render_template("public/browse_all.html",
                           form=form,
                           year_range_min=cached_year_range.year_min,
                           year_range_max=cached_year_range.year_max,
                           year_min_value=year_range[0] if request.args.get("year_range", "") else cached_year_range.year_min,
                           year_max_value=year_range[1] if request.args.get("year_range", "") else cached_year_range.year_max,
                           certificates=res,
                           page=page,
                           has_prev=has_prev,
                           has_next=has_next,
                           num_results=format(count, ",d"),
                           remove_filters=remove_filters,
                           certificate_types=certificate_types)

And this is my vro/search/utils.py where Elasticsearch is being called and creating and requesting docs.

import psycopg2
from elasticsearch.helpers import bulk

from vro.constants import certificate_types
from vro.extensions import es
from vro.settings import DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, DATABASE_HOST, DATABASE_PORT, DATABASE_SSLMODE


def recreate():
    """Deletes then recreates the index"""
    es.indices.delete('*', ignore=[400, 404])
    create_index()
    num_success, _ = bulk(es, create_docs(), chunk_size=5000)
    print("Successfully created %s certificates docs." % num_success)

def create_index():
    """Creates indices """
    es.indices.create(
        index='certificates',
        body={
            "settings": {
                "index": {
                    "sort.field": ["cert_type", "year", "last_name", "county"],
                    "sort.order": ["asc", "asc", "asc", "asc"],
                    "sort.missing": ["_last", "_last", "_last", "_last"]
                }
            },
            "mappings": {
                "properties": {
                    "cert_type": {
                        "type": 'keyword',
                    },
                    "number": {
                        "type": 'keyword',
                    },
                    "county": {
                        "type": 'keyword',
                    },
                    "year": {
                        "type": "date",
                        "format": "yyyy"
                    },
                    "first_name": {
                        "type": "keyword"
                    },
                    "last_name": {
                        "type": "keyword"
                    },
                    "full_name": {
                        "type": "keyword"
                    },
                    "display_string": {
                        "type": "keyword"
                    },
                    "spouse_first_name": {
                        "type": "keyword"
                    },
                    "spouse_last_name": {
                        "type": "keyword"
                    },
                    "spouse_name": {
                        "type": "keyword"
                    }
                }
            }

        }
    )

def create_docs():
    """Creates elasticsearch request docs for every certificate"""
    if not es:
        return

    conn = psycopg2.connect(
        dbname=DATABASE_NAME,
        user=DATABASE_USER,
        password=DATABASE_PASSWORD,
        host=DATABASE_HOST,
        port=DATABASE_PORT,
        sslmode=DATABASE_SSLMODE,
    )
    cursor = conn.cursor("fetch_large_result")

    cursor.execute("""
        SELECT certificates.id,
            certificates.type,
            certificates.number,
            certificates.county,
            certificates.year,
            certificates.first_name,
            certificates.last_name,
            certificates.filename,
            certificates.soundex,
            json_agg(
                json_build_object(
                    'first_name', marriage_data_1.first_name,
                    'last_name', marriage_data_1.last_name,
                    'soundex', marriage_data_1.soundex
                )
            ) AS items
        FROM certificates
            LEFT OUTER JOIN marriage_data AS marriage_data_1 ON certificates.id = marriage_data_1.certificate_id
        WHERE certificates.filename IS NOT NULL
        GROUP BY certificates.id limit 500000;
    """)

    count = 0

    while True:
        certificates = cursor.fetchmany(size=1000)

        if not certificates:
            break

        for c in certificates:
            if c[5] is not None:
                name = "{} {}".format(c[5], c[6])
            elif c[1] == "marriage_license":
                name = "Not Indexed"
            else:
                name = c[6]

            spouse_name = None
            spouse_first_name = None
            spouse_last_name = None

            # Get spouse metadata to store in index
            if c[1] == certificate_types.MARRIAGE:
                spouse_list = c[9]
                for spouse in spouse_list:
                    if spouse["soundex"] != c[8] or spouse["first_name"] != c[5]:
                        spouse_first_name = spouse["first_name"]
                        spouse_last_name = spouse["last_name"]
                        if spouse_first_name is not None:
                            spouse_name = "{} {}".format(spouse_first_name, spouse_last_name)
                        else:
                            spouse_name = spouse_last_name

            # Check for special character in number
            number = c[2]
            if c[2][-1].isalpha():
                unspaced_number = c[2].replace(" ", "")
                spaced_number = unspaced_number.replace(c[2][-1], " " + c[2][-1])
                number = [unspaced_number, spaced_number]

            yield {
                "_index": "certificates",
                "_id": c[0],
                "_source": {
                    "id": c[0],
                    "cert_type": c[1],
                    "number": number,
                    "county": c[3],
                    "year": c[4],
                    "first_name": c[5],
                    "last_name": c[6],
                    "full_name": name,
                    "display_string": c[7][:-4],
                    "spouse_first_name": spouse_first_name,
                    "spouse_last_name": spouse_last_name,
                    "spouse_name": spouse_name
                }
            }

            count = count + 1
            print("COUNT: ", count)

def delete_doc(certificate_id):
    """Delete a specific doc in the index"""
    es.delete(index="certificates",
              id=certificate_id)

Why does the first page say 10,000 documents? I want it to say everything.

It's because you are not reading the whole response object. The response tells you that it's greater or equals to 10000 docs. You can get the exact number if the end user really cares about that number, by setting: track_total_hits. See The search API | Elasticsearch Guide [8.15] | Elastic. Most of the time, the user does not need the exact number of docs. Think of Google for example. You don't see that number, and you can start iterate over the first 10 pages at first... So when you don't need an exact number of hits above 10000, we optimize the response by just saying "more than 10000"...

I can't tell anything from that Python error. It's not an elasticsearch error.
If you can somehow print the http error which is coming back from Elasticsearch, we might help.

I’m currently on my way to college right now so I won’t be free until 5-6pm est. Thank you for your quick responses though, I really appreciate it.

Regarding my code layout and using search_after, does that look correct to you?

I don't read Python code. So I can't comment on that.

If you need help, first try to use the Kibana developper console and reproduce the issue.
Then once you know what you are doing, you can translate it to Python code.

1 Like