Send info from twitter4j to elasticSeach with Async

Hello, I'm trying to send twitter4j data to elasticSearch.

I can get all the posts to twitter and then for each one I create an entity, add to an array of entities and use the ansyc method to send the posts to elastic, but I'm getting some errors:

public static JSONObject newTweetsJson(Twitter twitter) throws TwitterException, JSONException, IllegalStateException, IOException, InterruptedException { 
			JSONObject json= new JSONObject();
	        JSONArray array=new JSONArray();
	        List<HttpEntity> listHttp=new ArrayList<>();
	        int pageno = 1;
	        List<Status> statuses = new ArrayList<>();
	        while (true) {

	          try {

	            int size = statuses.size(); 
	            Paging page = new Paging(pageno++, 100);
	            statuses.addAll(twitter.getHomeTimeline(page));
	            System.out.println("Status size "+statuses.size());
	           
	            if (statuses.size() == size)
	              break;
	          }
	          catch(TwitterException e) {

	            e.printStackTrace();
	          }
	        }
	         for (Status tweet : statuses) {
					JSONObject item = new JSONObject();
					item.put("ID", tweet.getId());		     			
					item.put("TweetLang", tweet.getLang());		     		
					item.put("Text", tweet.getText());		     			
					item.put("Screen_name", tweet.getUser().getScreenName());		     			
					item.put("Language", tweet.getUser().getLang());		     			
					item.put("Name", tweet.getUser().getName());
					item.put("Location", tweet.getUser().getLocation());
					HttpEntity ent=RestAPI.createHttpEntity(Long.toString(tweet.getId()), tweet.getLang(), tweet.getText(),
							tweet.getUser().getScreenName(), tweet.getUser().getName(), tweet.getUser().getLocation(),
							tweet.getUser().getLocation());
					
					listHttp.add(ent);
					array.put(item);
				}

	        System.out.println("Status size "+statuses.size());
	        System.out.println("HTTP size "+listHttp.size());
	        HttpEntity[] httpArray = new HttpEntity[ listHttp.size() ];
	        System.out.println("Screen name: "+ twitter.getScreenName());
	        RestAPI.putListOfTweets(twitter.getScreenName(), "tweet", listHttp.toArray(httpArray));
	        
			json.put("Tweets", array);
	        return json;  
	    }

******* create an entity ******

public static HttpEntity createHttpEntity(String id,String tweetLang, String text, String screenName, String language,
		String name, String location) throws JSONException{
		
		JSONObject obj = new JSONObject();

		obj.put("ID", id);
		obj.put("TweetLang", tweetLang);
		obj.put("Text", text);
		obj.put("Screen_name", screenName);
		obj.put("Language", language);
		obj.put("Name", name);
		obj.put("Location", location);

		final String jsonString = obj.toString();
		
		HttpEntity entity= new NStringEntity(
				 jsonString, ContentType.APPLICATION_JSON);
		return entity;
}

Async method ************************************

public static void putListOfTweets(String index,String type,HttpEntity[] entityArray) throws IOException, InterruptedException{
			RestClient restClient = initAPI();
			int numRequests = entityArray.length;
			final CountDownLatch latch = new CountDownLatch(numRequests);
			for (int i = 0; i < numRequests; i++) {
				restClient.performRequestAsync(
				"POST",
				"/"+index+"/"+type,
				Collections.<String, String>emptyMap(),
				entityArray[i],
				new ResponseListener() {
				@Override
				public void onSuccess(Response response) {
					System.out.println(response);
					latch.countDown();
				}
				@Override
				public void onFailure(Exception exception) {
				     System.out.println(exception.getMessage());
					 latch.countDown();
				}
				}
			    );
			}
			//wait for completion of all requests
			latch.await();
			closeAPI(restClient);
		}		

1- It adds some tweets to the elasticSearc, but then gives the following error:

Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Exception in thread "I/O dispatcher 3" java.lang.StackOverflowError
	at org.apache.http.ProtocolVersion.compareToVersion(ProtocolVersion.java:205)
	at org.apache.http.ProtocolVersion.lessEquals(ProtocolVersion.java:241)
	at org.apache.http.client.protocol.RequestExpectContinue.process(RequestExpectContinue.java:72)

2- For each tweet I create an entity, then compare the sizes and are quite different. Should not they be the same?

Status size 829
HTTP size 5258

this is solved. But now a get some null response:

null
null
null
null
null
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
null
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}

Can someone tell me why the error?

UPDATE: I resolve this problem with Bulk from transportClient. Is possible to do that with restClient?
Thank you

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.