Elasticsearch as a primary database

I think Kafka does not support random access and filter messages by time, how you handle this issue?

Just so we're all clear; This is your product you are promoting.

There's no issue with that, but it's always good to be transparent when suggesting commercial solutions.

2 Likes

No, it's the client's responsibility to backoff and resend these requests. They would never be acknowledged by Elasticsearch.

Write consistency all means that if not all replica shards are available at the start of an indexing request (up to a certain time window), Elasticsearch does not attempt to index the data at all and fails the request. If the write consistent is met, an indexing operation is performed on the primary and sent to the replicas and a replica fails to acknowledge an indexing request, we fail the replica and indicate that the write was not successful to all replicas in the response to the client. There is no rollback here.

Also, it's important to note that write consistency has been replaced by wait_for_active_shards which we think clarifies that it is a pre-flight check.

"Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position." from:

https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

The second parameter of seek is an offset. There are several ways to maintain timestamp -> offset mappings. None are precise; reset the offset generously; allow duplicates to collide and wipe each other out.

(1) the Snapshot script can save the current offsets.

(2) call consumer.offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)

(3) use KafkaBeat as described in part 2 of the excellent blogs by my friends Suyog and Tal:

This is not a commercial product; nothing for sale. It is a technique anyone can implement themselves. This solution does not require any plugins and uses the existing Elasticsearch Query DSL.

The relevance to this thread is that the web page shows great performance results for Elasticsearch Scroll programs for big datasets and large results sets.

These internal details do not matter to the customer. The point is that with Cassandra the client has to fix the problem itself. I said the operation is rolled back; I never said the data was changed and rolled back. Perhaps "operation is rejected with no changes to any shards".

It is not very nice for someone to nitpick details of phrasing when it doesn't affect the validity of the point being made.

You stated "ES will reject indexing requests and lose data". That is not an "internal detail", it is a misleading statement that could be interpreted as saying that Elasticsearch is doing something wrong, or is implemented poorly when in point of fact it is part of any resilient system to apply backpressure when it is overloaded. It is on the client to respond appropriately to these situations.

Pointing this out is not nitpicking, it is a very important distinction, it's the difference between Elasticsearch did something wrong ("ES...lose data") and the client did something wrong (it needs to respond correctly to backpressure).

Similarly, my explanation of how write consistency works is not nitpicking, and it is absolutely not merely an internal detail. The parameter write_consistency impacts client-facing behavior (so clearly not internal), and it's poorly understood (which again is why we changed the name to wait_for_active_shards). If you say that Elasticsearch rolls an operation back it is evocative of behavior that people think they understand in databases (e.g., a transaction rollback) that simply does not apply here. It's not nitpicking to point this out.

It's natural in a technical discussion, nay any discussion, for important points to be expanded upon and corrected. It's how we engage, progress, and learn from each other. Your expectations are wrong if you think it should be otherwise.

3 Likes

Fair enough; I don't want to imply that Elasticsearch is doing something wrong. David's short answer could imply the same.

The parameter write_consistency impacts client-facing behavior, but could have a variety of implementations and the internals don't matter. I have been explaining how to design an ETL pipeline that does not lose data in the face of backpressure.

Unlike FileBeat, Logstash does not handle backpressure. When all Logstash instances are busy, the network router will still send events, but those will be lost. So, one could say that the Elastic Stack is doing something wrong and loses data.

I'm just saying that one can implement an ETL pipeline that handles indexing backpressure by using Kafka to buffer the incoming events for Logstash. This approach is endorsed by Elastic in the above mentioned two-part blog.

My conclusion is that following my design suggestions, I highly recommend Elasticsearch as a good option for the gold-copy primary database for Time-Series Event systems.

We've all been assuming that your project is inserting 50K rows/sec 24x7. But, this text sounds like the project is inserting 50K rows/sec for a time period, stops inserting, and then generates reports.

My large-company customers get 15K network-events per second 24x7 for the entire corporation, over 1 billion documents per day.

Is your project (1) 50K rows/sec 24x7 or (2) 50K rows/sec for a time period?

you are right, it is 50K rows/sec 24x7

Wow, at 500 bytes/event that is 2TB/day. How many days do you want for a Retention Period? That will be the primary driver of cost.

This is not an accurate statement. Logstash has had a backpressure mechanism specifically to avoid data loss for many years.

For "the network router will still send events, but those will be lost" I assume you mean UDP syslog data? Your statement is not specific enough for me to really say with precision what you should expect.

The chosen communication protocol will dictate when and how data loss occurs. In a network partition (network outage, software failure, machine fault, software slowness, etc), UDP and TCP are both lossy under various scenarios. You can try to minimize these scenarios with buffering (Logstash's persistent queue feature, etc), but you cannot eliminate it without improving the communication protocol.

No application can prevent the OS or network from dropping UDP packets or prevent the OS from lying to the client about having successfully transmitted TCP payloads. It's good to design for your requirements, if you are able to do so; I know from experience that network appliances often don't give you a choice when it comes to accessing logs.

1 Like

yes this is an issue...
for now, we use HP Vertica for storing data and it has compression and encoding mechanism that decrease disk usage...
but still this issue is important. so we keep original data for 20 days and create summary for older data...
but for elasticsearch we should estimate the days that we can keep original (network packets) data and for now I have no idea about this ...

Sorry I was inaccurate. Yes, Logstash handles back pressure from Elasticsearch by suspending indexing requests.

And that is why I recommended Kafka. Logstash persistent queues are GA for just a few weeks, so it hasn't been an option for product teams.

These scenarios have nothing to do with log files. My customer makes the router and programs the embedded Linux VM. As the router does its job, it also sends HTTP Posts with event info such as "connection established".

We could have used the Logstash HTTP Input Plugin, but we wanted buffering. Until otherwise proven, Kafka is still the leading fire-hose drinker.

My customers have built many non-log Event systems using Elasticsearch:

  • Enterprise desktops, servers, phones and other devices send Security Events.
  • QA Automation uses hundreds of VMs in Build, Test, Destroy. The testing VMs send QA Events.
  • Network appliances send Network Events.
  • Advertising widgets send Ad Events (impression, click, action, ...)

Yes, you are right. Very roughly speaking, a single Elasticsearch node on an i2.4xlarge can handle 2TB of documents. With replication=1, you need two nodes per day. 40 data nodes and 3 dedicated master nodes would be a good data lake for this project. For Snapshots, you need 40TB of mountable storage just for one incrementally-maintained snapshot.

Elastic recommends max 50GB per shard. So, you need 40 primary shards per day. At this scale, query latency will be so long that only periodic jobs are feasible.

Also see:

https://www.elastic.co/elasticon/conf/2017/sf/small-medium-or-large-evolve-your-elastic-stack-to-fit

https://www.elastic.co/elasticon/conf/2016/sf/quantitative-cluster-sizing

thanks this is very helpful

50K msg/per second is to store raw packets? or Just for the metadata for the TCP/HTTP connections, like 5-tuples plus other attributes? We built a system (Kafka + ES) that can handle 60K msg/per (metadata) second easily.

I don't understand metadata but we should save packet information like protocols of each layer,
source and destination IPs and ports and so on.
how many nodes and do you have in your Elasticsearch cluster?
and how many shards does it have ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.