I'm facing SocketTimeoutException
while retrieving/inserting data from/to elastic. This is happening when there are around 10-30 request/second
. These requests are combination of get/put.
Here is my elastic configuration(it's on azure):
-
3 master nodes
each of4GB RAM
-
2 data nodes
each of8GM RAM
- Azure load balancer which connects to above data node (seems only 9200 port is opened on it)
- Elastic Version: 7.2.0
- Rest High Level Client:
Index Information:
-
Index shards: 2
-
Index replica: 1
-
Index total fields: 10000
-
Size of index from kibana: Total-
27.2 MB
&Primaries: 12.2MB
-
Index structure:
{"dev-index":{"mappings":{"properties":{"dataObj": {"type":"object","enabled":false},"generatedID":{"type":"keyword"},"transNames": {"type":"keyword"}}}}}
-
Dynamic mapping is disabled.
Following is my elastic Config
file. Here I've two connection bean, one is for read & another for write to elastic.
ElasticConfig.java:
@Configuration
public class ElasticConfig {
private String elasticHost;
private int elasticPort;
private String elasticUser;
private String elasticPass;
private int timeout;
@Bean(destroyMethod = "close")
@Qualifier("readClient")
public RestHighLevelClient readClient(){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));
RestClientBuilder builder = RestClient
.builder(new HttpHost(elasticHost, elasticPort))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
);
builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(10000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(0)
);
RestHighLevelClient restClient = new RestHighLevelClient(builder);
return restClient;
}
@Bean(destroyMethod = "close")
@Qualifier("writeClient")
public RestHighLevelClient writeClient(){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));
RestClientBuilder builder = RestClient
.builder(new HttpHost(elasticHost, elasticPort))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
);
builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(10000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(0)
);
RestHighLevelClient restClient = new RestHighLevelClient(builder);
return restClient;
}
}
Here is the function which makes a call to elastic, if data is available in elastic it will take it else it will generate data & put into elastic.
public Object getData(Request request) {
DataObj elasticResult = elasticService.getData(request);
if(elasticResult!=null){
return elasticResult;
}
else{
//code to generate data
DataObj generatedData = getData();//some function which will generated data
//put above data into elastic by Async call.
elasticAsync.putData(generatedData);
return generatedData;
}
}
ElasticService.java getData Function:
@Service
public class ElasticService {
private String elasticIndex;
@Autowired
@Qualifier("readClient")
private RestHighLevelClient readClient;
public DataObj getData(Request request){
String generatedId = request.getGeneratedID();
GetRequest getRequest = new GetRequest()
.index(elasticIndex) //elastic index name
.id(generatedId); //retrieving by index id from elastic _id field (as key-value)
DataObj result = null;
try {
GetResponse response = readClient.get(getRequest, RequestOptions.DEFAULT);
if(response.isExists()) {
ObjectMapper objectMapper = new ObjectMapper();
result = objectMapper.readValue(response.getSourceAsString(), DataObj.class);
}
} catch (Exception e) {
LOGGER.error("Exception occurred during fetch from elastic !!!! " + ,e);
}
return result;
}
}
ElasticAsync.java Async Put Data Function:
@Service
public class ElasticAsync {
private static final Logger LOGGER = Logger.getLogger(ElasticAsync.class.getName());
private String elasticIndex;
@Autowired
@Qualifier("writeClient")
private RestHighLevelClient writeClient;
@Async
public void putData(DataObj generatedData){
ElasticVO updatedRequest = toElasticVO(generatedData);//ElasticVO matches to the structure of index given above.
try {
ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(updatedRequest);
IndexRequest request = new IndexRequest(elasticIndex);
request.id(generatedData.getGeneratedID());
request.source(jsonString, XContentType.JSON);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
request.timeout(TimeValue.timeValueSeconds(5));
IndexResponse indexResponse = writeClient.index(request, RequestOptions.DEFAULT);
LOGGER.info("response id: " + indexResponse.getId());
}
} catch (Exception e) {
LOGGER.error("Exception occurred during saving into elastic !!!!",e);
}
}
}
Here is the some part of the stack trace when exception is occurred during saving data into elastic:(will add in follow up comment)
I've gone through couple of stackoverflow
& elastic
related blogs where they have mentioned this issue could be due to RAM
& cluster
configuration of elastic. Then I've changed my shards from 5 to 2 as there were only two data nodes. Also increased ram of Data nodes from 4GB to 8GB, as I get to know that elastic will use only 50%
of total RAM
. The occurrences of exception have decreased but problem still persist.
What I'm missing from java/elastic configuration point of view which frequently throwing this kind of SocketTimeoutException
?