(Java) What is the most efficient way to write huge data to ElasticSearch?


(Wu Sheng) #1

This is what I am doing.

public static void main(String[] args) {
        try {
            TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("test_es"), 9300));
            IndexOperator operator = new IndexOperator(client);

            for (int i = 0; i < 1_000_000_000; i++) {
                IndexMetaCollection collection = new IndexMetaCollection();
                for (int j = 0; j < 100; j++) {
                    RequestSpan requestSpan =
                            RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString()).setApplicationId("1").setCallType("1").setLevelId(0)
                                    .setProcessNo(19287).setStartDate(System.currentTimeMillis()).setTraceId(
                                    TraceId.newBuilder().addSegments(201611).addSegments(j).addSegments(8504828).addSegments(2277).addSegments(53).addSegments(3).build())
                                    .setUserId("1").setViewPointId("http://localhost:8080/wwww/test/helloWorld").setRouteKey(i).build();
                    IndexMetaInfo info = new IndexMetaInfo(new RequestSpanData(requestSpan), new DataFileNameDesc(), i, j);
                    collection.add(info);
                }

                operator.batchUpdate(collection);

                if (i % 100 == 0) {
                    System.out.println(" num=" + i + " ");
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

    public int batchUpdate(IndexMetaCollection metaInfos) {
        BulkRequestBuilder requestBuilder = client.prepareBulk();
        for (IndexMetaInfo indexMetaInfo : metaInfos) {
            try {
                requestBuilder.add(client.prepareIndex(INDEX_NAME, INDEX_TYPE).setSource(buildSource(indexMetaInfo)));
            } catch (Exception e) {
                logger.error("Failed to update index.", e);
                HealthCollector.getCurrentHeathReading("IndexOperator").updateData(HeathReading.ERROR, "Failed to " + "update index.");
            }
        }

        BulkResponse bulkRequest = requestBuilder.get();
        if (bulkRequest.hasFailures()) {
            HealthCollector.getCurrentHeathReading("IndexOperator")
                    .updateData(HeathReading.ERROR, "Failed to " + "update index. Error message : " + bulkRequest.buildFailureMessage());
        }

        return metaInfos.size();
    }

I use BulkRequestBuilder to send a data list to elastic search, and do it one by one. And requestBuilder.get() seems to run in a fsync way.

Is any better way to do something like this?

Add,
requestBuilder.execute().addListener();

What is the diff between these two ways? addListener means async?


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.