Hi team,
I am relatively new to the ELK world. I am writing a simple Python script to fetch around 200K records from a local instance of SQL Server and index them in my local ELK instance running on my laptop. The script fails with a read timeout error every time after inserting around 22K rows or so (this number varies every time).
I am completely lost, any pointers would help.
Error message -
Elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10))
Code snippet -
<
from collections import deque
from http.client import REQUEST_TIMEOUT
import Elasticsearch
from Elasticsearch import helpers
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.dialects.mssql import NVARCHAR, BIGINT
import logging
from settings import *
import time
Set logging
logging.basicConfig(level=logging.DEBUG,
format=' %(asctime)s - %(levelname)s - %(message)s')
Database connect
conn_smi = f"mssql+pyodbc://{SQL_DB_USER}:{SQL_DB_PASSWORD}@{SQL_DB_HOST}:{SQL_DB_PORT}/{SQL_DB_NAME}"
f"?driver=ODBC Driver 17 for SQL Server"
Main class.
class Instagram:
def create_index(self):
logging.info(f" SMI => {conn_smi}")
sql_db_engine_smi = sa.create_engine(conn_smi)
# Data type dictionary
col_types_dict={
'Id':sa.types.BIGINT,
'Name':sa.types.NVARCHAR(length=256),
'Email':sa.types.NVARCHAR(length=256),
'Category':sa.types.NVARCHAR(length=256),
'ProfilePictureUrl':sa.types.NVARCHAR(length=2048),
'HashTags':sa.types.NVARCHAR(length=4000),
'UserId':sa.types.NVARCHAR(length=64),
'UserName':sa.types.NVARCHAR(length=256),
'Biography':sa.types.NVARCHAR(length=2048),
'Followers':sa.types.FLOAT,
'Following':sa.types.FLOAT,
'EngagementRate':sa.types.REAL,
'TotalEngagement':sa.types.REAL,
'EngagementPerPost':sa.types.REAL,
'Posts':sa.types.INT
}
str_sql = """SELECT ..............."""
df_insta = pd.read_sql(sql=str_sql,
con=sql_db_engine_smi)
# Loop through the data frame.
for index, record in df_insta.iterrows():
insta_account = {}
logging.info(f"Adding user => {record['UserId']}")
insta_account['AccountId'] = int(record['Id'])
insta_account['Name'] = str(record['Name'])
insta_account['Email'] = str(record['Email'])
insta_account['Category'] = str(record['Category'])
insta_account['Profilepicturelink'] = str(record['ProfilePictureUrl'])
insta_account['Hashtag'] = str(record['HashTags'])
insta_account['Userid'] = str(record['UserId'])
insta_account['Username'] = str(record['UserName'])
insta_account['Biography'] = str(record['Biography'])
insta_account['Followers'] = int(record['Followers'])
insta_account['Following'] = int(record['Following'])
insta_account['Engagementrate'] = float(record['EngagementRate'])
insta_account['Totalengagement'] = float(record['TotalEngagement'])
insta_account['Engagementperpost'] = float(record['EngagementPerPost'])
insta_account['Posts'] = int(record['Posts'])
yield insta_account
***************************************************************************
Main Program
if name == "main":
es = Elasticsearch.Elasticsearch(timeout=30)
data_loader = Instagram()
es.indices.delete(index="instagram",ignore=404)
deque(helpers.parallel_bulk(es,data_loader.create_index(),index="instagram",chunk_size=300,thread_count=2,queue_size=1), maxlen=0)
es.indices.refresh()
***************************************************************************
/>