Hello, I'm trying to send twitter4j data to elasticSearch.
I can get all the posts to twitter and then for each one I create an entity, add to an array of entities and use the ansyc method to send the posts to elastic, but I'm getting some errors:
public static JSONObject newTweetsJson(Twitter twitter) throws TwitterException, JSONException, IllegalStateException, IOException, InterruptedException {
JSONObject json= new JSONObject();
JSONArray array=new JSONArray();
List<HttpEntity> listHttp=new ArrayList<>();
int pageno = 1;
List<Status> statuses = new ArrayList<>();
while (true) {
try {
int size = statuses.size();
Paging page = new Paging(pageno++, 100);
statuses.addAll(twitter.getHomeTimeline(page));
System.out.println("Status size "+statuses.size());
if (statuses.size() == size)
break;
}
catch(TwitterException e) {
e.printStackTrace();
}
}
for (Status tweet : statuses) {
JSONObject item = new JSONObject();
item.put("ID", tweet.getId());
item.put("TweetLang", tweet.getLang());
item.put("Text", tweet.getText());
item.put("Screen_name", tweet.getUser().getScreenName());
item.put("Language", tweet.getUser().getLang());
item.put("Name", tweet.getUser().getName());
item.put("Location", tweet.getUser().getLocation());
HttpEntity ent=RestAPI.createHttpEntity(Long.toString(tweet.getId()), tweet.getLang(), tweet.getText(),
tweet.getUser().getScreenName(), tweet.getUser().getName(), tweet.getUser().getLocation(),
tweet.getUser().getLocation());
listHttp.add(ent);
array.put(item);
}
System.out.println("Status size "+statuses.size());
System.out.println("HTTP size "+listHttp.size());
HttpEntity[] httpArray = new HttpEntity[ listHttp.size() ];
System.out.println("Screen name: "+ twitter.getScreenName());
RestAPI.putListOfTweets(twitter.getScreenName(), "tweet", listHttp.toArray(httpArray));
json.put("Tweets", array);
return json;
}
******* create an entity ******
public static HttpEntity createHttpEntity(String id,String tweetLang, String text, String screenName, String language,
String name, String location) throws JSONException{
JSONObject obj = new JSONObject();
obj.put("ID", id);
obj.put("TweetLang", tweetLang);
obj.put("Text", text);
obj.put("Screen_name", screenName);
obj.put("Language", language);
obj.put("Name", name);
obj.put("Location", location);
final String jsonString = obj.toString();
HttpEntity entity= new NStringEntity(
jsonString, ContentType.APPLICATION_JSON);
return entity;
}
Async method ************************************
public static void putListOfTweets(String index,String type,HttpEntity[] entityArray) throws IOException, InterruptedException{
RestClient restClient = initAPI();
int numRequests = entityArray.length;
final CountDownLatch latch = new CountDownLatch(numRequests);
for (int i = 0; i < numRequests; i++) {
restClient.performRequestAsync(
"POST",
"/"+index+"/"+type,
Collections.<String, String>emptyMap(),
entityArray[i],
new ResponseListener() {
@Override
public void onSuccess(Response response) {
System.out.println(response);
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
System.out.println(exception.getMessage());
latch.countDown();
}
}
);
}
//wait for completion of all requests
latch.await();
closeAPI(restClient);
}
1- It adds some tweets to the elasticSearc, but then gives the following error:
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Exception in thread "I/O dispatcher 3" java.lang.StackOverflowError
at org.apache.http.ProtocolVersion.compareToVersion(ProtocolVersion.java:205)
at org.apache.http.ProtocolVersion.lessEquals(ProtocolVersion.java:241)
at org.apache.http.client.protocol.RequestExpectContinue.process(RequestExpectContinue.java:72)
2- For each tweet I create an entity, then compare the sizes and are quite different. Should not they be the same?
Status size 829
HTTP size 5258
this is solved. But now a get some null response:
null
null
null
null
null
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
null
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Connection refused: no further information
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Response{requestLine=POST /asdasdafsw/tweet HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 201 Created}
Can someone tell me why the error?
UPDATE: I resolve this problem with Bulk from transportClient. Is possible to do that with restClient?
Thank you