Send info from twitter4j to elasticSeach with Async


#1

Hello, I'm trying to send twitter4j data to elasticSearch.

I can get all the posts to twitter and then for each one I create an entity, add to an array of entities and use the ansyc method to send the posts to elastic, but I'm getting some errors:

public static JSONObject newTweetsJson(Twitter twitter) throws TwitterException, JSONException, IllegalStateException, IOException, InterruptedException { 
			JSONObject json= new JSONObject();
	        JSONArray array=new JSONArray();
	        List<HttpEntity> listHttp=new ArrayList<>();
	        int pageno = 1;
	        List<Status> statuses = new ArrayList<>();
	        while (true) {

	          try {

	            int size = statuses.size(); 
	            Paging page = new Paging(pageno++, 100);
	            statuses.addAll(twitter.getHomeTimeline(page));
	            System.out.println("Status size "+statuses.size());
	           
	            if (statuses.size() == size)
	              break;
	          }
	          catch(TwitterException e) {

	            e.printStackTrace();
	          }
	        }
	         for (Status tweet : statuses) {
					JSONObject item = new JSONObject();
					item.put("ID", tweet.getId());		     			
					item.put("TweetLang", tweet.getLang());		     		
					item.put("Text", tweet.getText());		     			
					item.put("Screen_name", tweet.getUser().getScreenName());		     			
					item.put("Language", tweet.getUser().getLang());		     			
					item.put("Name", tweet.getUser().getName());
					item.put("Location", tweet.getUser().getLocation());
					HttpEntity ent=RestAPI.createHttpEntity(Long.toString(tweet.getId()), tweet.getLang(), tweet.getText(),
							tweet.getUser().getScreenName(), tweet.getUser().getName(), tweet.getUser().getLocation(),
							tweet.getUser().getLocation());
					
					listHttp.add(ent);
					array.put(item);
				}

	        System.out.println("Status size "+statuses.size());
	        System.out.println("HTTP size "+listHttp.size());
	        HttpEntity[] httpArray = new HttpEntity[ listHttp.size() ];
	        System.out.println("Screen name: "+ twitter.getScreenName());
	        RestAPI.putListOfTweets(twitter.getScreenName(), "tweet", listHttp.toArray(httpArray));
	        
			json.put("Tweets", array);
	        return json;  
	    }

******* create an entity ******

public static HttpEntity createHttpEntity(String id,String tweetLang, String text, String screenName, String language,
		String name, String location) throws JSONException{
		
		JSONObject obj = new JSONObject();

		obj.put("ID", id);
		obj.put("TweetLang", tweetLang);
		obj.put("Text", text);
		obj.put("Screen_name", screenName);
		obj.put("Language", language);
		obj.put("Name", name);
		obj.put("Location", location);

		final String jsonString = obj.toString();
		
		HttpEntity entity= new NStringEntity(
				 jsonString, ContentType.APPLICATION_JSON);
		return entity;
}

Async method ************************************

public static void putListOfTweets(String index,String type,HttpEntity[] entityArray) throws IOException, InterruptedException{
			RestClient restClient = initAPI();
			int numRequests = entityArray.length;
			final CountDownLatch latch = new CountDownLatch(numRequests);
			for (int i = 0; i < numRequests; i++) {
				restClient.performRequestAsync(
				"POST",
				"/"+index+"/"+type,
				Collections.<String, String>emptyMap(),
				entityArray[i],
				new ResponseListener() {
				@Override
				public void onSuccess(Response response) {
					System.out.println(response);
					latch.countDown();
				}
				@Override
				public void onFailure(Exception exception) {
				     System.out.println(exception.getMessage());
					 latch.countDown();
				}
				}
			    );
			}
			//wait for completion of all requests
			latch.await();
			closeAPI(restClient);
		}		

1- It adds some tweets to the elasticSearc, but then gives the following error:

Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Exception in thread "I/O dispatcher 3" java.lang.StackOverflowError
	at org.apache.http.ProtocolVersion.compareToVersion(ProtocolVersion.java:205)
	at org.apache.http.ProtocolVersion.lessEquals(ProtocolVersion.java:241)
	at org.apache.http.client.protocol.RequestExpectContinue.process(RequestExpectContinue.java:72)

2- For each tweet I create an entity, then compare the sizes and are quite different. Should not they be the same?

Status size 829
HTTP size 5258

this is solved. But now a get some null response:

null
null
null
null
null
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
null
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}

Can someone tell me why the error?

UPDATE: I resolve this problem with Bulk from transportClient. Is possible to do that with restClient?
Thank you


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.