Bulk insert issue with ElasticSearch.Net 6.0.2

Hi

Following code in NEST with lowlevelclient

ElasticLowLevelClient objClient = new ElasticLowLevelClient(objSettings);

    string strData = "{\"RollNo\": \"1\",\"Name\": \"Abc\"}";
    JObject data =JObject.Parse(strData);

       
      var objIndex = 
                                        new
                                        {
                                            index =
                                            new
                                            {
                                                _index = "strIndex",
                                                _type = "strType",
                                                _id = "strId"
                                            }
                                        };
         lstRowsToCache.Add(objIndex ); //Adding to elastic

         lstRowsToCache.Add(data); //Not adding to elastic

        indexResponse = objClient.Bulk<Stream>(lstRowsToCache);
  • Data in AnonymousTypes are accepting but not JObject

Thanks
Aneesh L

Could you please elaborate on what the issue is?

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "test1",
        "_type": "test1",
        "_id": "1",
        "_score": 1,
        "_source": {
          "RollNo": [],
          "Name": []
        }
      }
    ]
  }
}

Data is saved in elastic like this.
Instead of values, empty arrays added to elastic.
Is there any problem with my parsing?
How we can add string to elastic instead of JSON?

There's a couple things incorrect:

  1. objClient.Bulk<Stream>(lstRowsToCache); can't be using Elasticsearch.Net 6.0.2 because this version of the client does not support returning Stream from the bulk request. With the low level client, this should be one of VoidResponse, StringResponse or BytesResponse. It looks like you might be using a Elasticsearch.Net 5.x version
  2. The default serializer within Elasticsearch.Net does not know how to serialize a Json.NET JObject to JSON because it has no dependency on Json.NET.

A minimum working example in 6.0.2 would be

var client = new ElasticLowLevelClient();
var data = new { RollNo = "1", Name = "Abc"};
var lstRowsToCache = new List<object>();

var objIndex = new
{
  index =
  new
  {
      _index = "strIndex",
      _type = "strType",
      _id = "strId"
  }
};

lstRowsToCache.Add(objIndex);
lstRowsToCache.Add(data); 

var indexResponse = client.Bulk<VoidResponse>(PostData.MultiJson(lstRowsToCache));

which uses an anonymous type for the document. This sends the following request to Elasticsearch

POST http://localhost:9200/_bulk
{"index":{"_index":"strIndex","_type":"strType","_id":"strId"}}
{"RollNo":"1","Name":"Abc"}

If you want to use JObject, you'll need to hook in a serializer that understands JObject e.g. implement an IElasticsearchSerializer that uses Json.NET for serialization. A naive implementation may look like

private static void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var settings = new ConnectionConfiguration(pool, new JsonNetSerializer());
    var client = new ElasticLowLevelClient(settings);

    string strData = "{\"RollNo\": \"1\",\"Name\": \"Abc\"}";
    JObject data = JObject.Parse(strData);
    var lstRowsToCache = new List<object>();

    var objIndex = new
    {
      index =
      new
      {
          _index = "strIndex",
          _type = "strType",
          _id = "strId"
      }
    };
    
    lstRowsToCache.Add(objIndex);
    lstRowsToCache.Add(data);

    var indexResponse = client.Bulk<VoidResponse>(PostData.MultiJson(lstRowsToCache));
}

public class JsonNetSerializer : IElasticsearchSerializer
{
    private static JsonSerializer serializer = JsonSerializer.CreateDefault();
    private static JsonSerializer indentedSerializer = 
        JsonSerializer.CreateDefault(new JsonSerializerSettings
        {
            Formatting = Newtonsoft.Json.Formatting.Indented
        });
    
    public object Deserialize(Type type, Stream stream)
    {
        var streamReader = new StreamReader(stream);
        var reader = new JsonTextReader(streamReader);
        return serializer.Deserialize(reader, type);
    }

    public T Deserialize<T>(Stream stream)
    {
        return (T)Deserialize(typeof(T), stream);
    }

    public Task<object> DeserializeAsync(Type type, Stream stream, CancellationToken cancellationToken = default(CancellationToken))
    {
        var o = Deserialize(type, stream);
        return Task.FromResult(o);
    }

    public Task<T> DeserializeAsync<T>(Stream stream, CancellationToken cancellationToken = default(CancellationToken))
    {
        var o = Deserialize<T>(stream);
        return Task.FromResult(o);
    }

    public void Serialize<T>(T data, Stream stream, SerializationFormatting formatting = SerializationFormatting.Indented)
    {
        var writer = new StreamWriter(stream);
        if (formatting == SerializationFormatting.Indented)
            indentedSerializer.Serialize(writer, data, typeof(T));
        else
            serializer.Serialize(writer, data, typeof(T));    
            
        writer.Flush();
    }

    public Task SerializeAsync<T>(T data, Stream stream, SerializationFormatting formatting = SerializationFormatting.Indented, CancellationToken cancellationToken = default(CancellationToken))
    {
        Serialize(data, stream, formatting);
        return Task.CompletedTask;
    }
}

NOTE: the implementation does not wrap IDisposable types in using statements because the client library will close the stream.

Example with anonymous type, we can't use because columns are dynamic
We need to use JObject, But for Deserialize(Stream stream), what type need to send.

Take a look at the last example I provided, which uses JObject and configures an IElasticsearchSerializer that uses Newtonsoft.Json

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.