Error during bulk import while inserting around 200K documents from SQL Server

Hi team,
I am relatively new to the ELK world. I am writing a simple Python script to fetch around 200K records from a local instance of SQL Server and index them in my local ELK instance running on my laptop. The script fails with a read timeout error every time after inserting around 22K rows or so (this number varies every time).
I am completely lost, any pointers would help.
Error message -
Elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10))

Code snippet -
<
from collections import deque
from http.client import REQUEST_TIMEOUT
import Elasticsearch
from Elasticsearch import helpers
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.dialects.mssql import NVARCHAR, BIGINT
import logging
from settings import *
import time

Set logging

logging.basicConfig(level=logging.DEBUG,
format=' %(asctime)s - %(levelname)s - %(message)s')

Database connect

conn_smi = f"mssql+pyodbc://{SQL_DB_USER}:{SQL_DB_PASSWORD}@{SQL_DB_HOST}:{SQL_DB_PORT}/{SQL_DB_NAME}"
f"?driver=ODBC Driver 17 for SQL Server"

Main class.

class Instagram:
def create_index(self):
logging.info(f" SMI => {conn_smi}")
sql_db_engine_smi = sa.create_engine(conn_smi)

    # Data type dictionary
    col_types_dict={
    'Id':sa.types.BIGINT,
    'Name':sa.types.NVARCHAR(length=256),
    'Email':sa.types.NVARCHAR(length=256),
    'Category':sa.types.NVARCHAR(length=256),
    'ProfilePictureUrl':sa.types.NVARCHAR(length=2048),
    'HashTags':sa.types.NVARCHAR(length=4000),
    'UserId':sa.types.NVARCHAR(length=64),
    'UserName':sa.types.NVARCHAR(length=256),
    'Biography':sa.types.NVARCHAR(length=2048),
    'Followers':sa.types.FLOAT,
    'Following':sa.types.FLOAT,
    'EngagementRate':sa.types.REAL,
    'TotalEngagement':sa.types.REAL,
    'EngagementPerPost':sa.types.REAL,
    'Posts':sa.types.INT
    }
    
    str_sql = """SELECT ..............."""

    df_insta = pd.read_sql(sql=str_sql,
            con=sql_db_engine_smi)

    # Loop through the data frame.
    for index, record in df_insta.iterrows():
        insta_account = {}
        logging.info(f"Adding user => {record['UserId']}")
        insta_account['AccountId'] = int(record['Id'])
        insta_account['Name'] = str(record['Name'])
        insta_account['Email'] = str(record['Email'])
        insta_account['Category'] = str(record['Category'])
        insta_account['Profilepicturelink'] = str(record['ProfilePictureUrl'])
        insta_account['Hashtag'] = str(record['HashTags'])
        insta_account['Userid'] = str(record['UserId'])
        insta_account['Username'] = str(record['UserName'])
        insta_account['Biography'] = str(record['Biography'])
        insta_account['Followers'] = int(record['Followers'])
        insta_account['Following'] = int(record['Following'])
        insta_account['Engagementrate'] = float(record['EngagementRate'])
        insta_account['Totalengagement'] = float(record['TotalEngagement'])
        insta_account['Engagementperpost'] = float(record['EngagementPerPost'])
        insta_account['Posts'] = int(record['Posts'])

        yield insta_account

***************************************************************************

Main Program

if name == "main":
es = Elasticsearch.Elasticsearch(timeout=30)
data_loader = Instagram()
es.indices.delete(index="instagram",ignore=404)
deque(helpers.parallel_bulk(es,data_loader.create_index(),index="instagram",chunk_size=300,thread_count=2,queue_size=1), maxlen=0)
es.indices.refresh()

***************************************************************************

/>

Welcome to our community! :smiley:

Please format your code/logs/config using the </> button, or markdown style back ticks. It helps to make things easy to read which helps us help you :slight_smile:

1 Like

Few more things -
I am trying to insert around 200K documents. There are two big fields of length NVARCHAR(2048) and NVARCHAR(4000) respectively.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.