Hive overwhelming Elasticsearch

If you are running 7.12 with Kibana, then you have Monitoring included for free - Stack Monitoring | Kibana Guide [8.11] | Elastic

Both of those, yes.

all i see in the data node is just timeouts. (for my-cluster.log)
and about the same for the master node. is there a particular thing i should search for in the log?

(a specific phrase)

i'm going to try and add kibana and see if that shines line on anything. i'm not sure if it's a GC issue as much as its like my hadoop cluster is d-dossing the ES cluster. if i do a continuous ping when i start my hive query, after a couple minutes, one of the data nodes goes from 30ms to 3000ms (i'm pinging over a vpn, which is why its 30ms at rest)

hadoop cluster and es cluster are both on the same network.

i'll report back once i get kibana going. if there's anything else i can dig up, let me know.

It's useful if you can post your logs, you may miss something we would otherwise catch.

any particulare way to post a log file w/o blowing up the page?
i dont see an attach option

Use gist/pastebin/etc if they are to large to post.

sorry for the delay, i got kibana and filebeat setup. seems like filebeat isnt pulling the es log entries though. but here's what i get when the node drops off:

here are the log entires from elected master, and the node that dropped off:
master-gc.log
master-my-cluster.log
datanode-gc.log
datanode-my-cluster.log

hopefully this helps. lmk if i shoudl adjust anything for kibana/filebeat or anything else to help.

thanks!

Your cluster is not healthy by the looks of things. There's tonnes of master not discovered or elected yet in your logs which would suggest you have connection and/or configuration issues.

What do your elasticsearch.yml files look like?

i think those were from before i boot-strapped the cluster. is there a way i can roll the logs and re-run the insert?

here's the elasticsearch.yml files:


::MASTERNODE::
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
cluster.name: my-cluster
node.name: "es-m01"
node.data: false
node.master: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["es-m01","es-m02","es-m03","es-d01","es-d02"]
discovery.zen.minimum_master_nodes: 2
xpack.monitoring.collection.enabled: true
cluster.initial_master_nodes: ["es-m01"]

::DATANODES::
cluster.name: my-cluster
node.name: "es-d02"
node.data: true
node.master: false
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["es-m01","es-m02","es-m03","es-d01","es-d02"]
discovery.zen.minimum_master_nodes: 2
xpack.monitoring.collection.enabled: true

only difference with the other nodes is the other master nodes do not have the cluster.initial_master_nodes: ["es-m01"] setting, and node-names.

What's the output from the _cat/nodes?v API? Just want to check that your cluster is ok :slight_smile:

ip           heap.percent ram.percent cpu load_1m load_5m load_15m node.role  master name
192.168.4.63           75          33   0    0.20    0.05     0.02 cdfhilrstw -      es-d01
192.168.4.60           56          39   1    0.00    0.05     0.01 ilmr       -      es-m01
192.168.4.64           36          33   0    0.02    0.03     0.00 cdfhilrstw -      es-d02
192.168.4.62           61          28   0    0.00    0.00     0.00 ilmr       -      es-m03
192.168.4.61           20          29   0    0.00    0.00     0.01 ilmr       *      es-m02

but when i do the insert, i usually drop one of the data nodes.

odd thing is, if i do just one shard, it works fine, it's only when i do more than one.

hi mark, just following up...

Does anyone have thoughts here? i'm kinda lost...

any other solutions on loading a several hundred million records quickly?

Is Hive using the _bulk API, and if so, what are the bulk request sizes that it is sending over?

i'm not sure, whatever is build into the elasticsearch-hadoop library.

but i have started playing with loading pipe-delimited files with the elasticsearch python library, and i seem to be running into the same problem.

for example, a file i'm trying to load now is about 11 million rows (pipe-delimited text file, only about 8 columns).

i'm doing a parallel_bulk and it's periodically timing out as well.

last run i tried was 4 threads @ 10,000 each.

In an Elasticsearch cluster it is the data nodes that do most of the work so this is the node type you need to scale out I order to improve write performance. Based on your monitoring graph it looks like you have configured only 2GB of heap, which is not very much at all. If you are indexing a lot I am not surprised you are having issues.

What is the specification of the nodes in terms of hardware? What type of storage are you using? SSDs? Indexing is very IO intensive so for good performance you will need fast storage.

since that screen shot, i've increased the heap to 10gb.
both data nodes are dedicated data-nodes only. then i have 3 masters (dedicated as well)

all 5 instances are running on a Dell R820 using ubuntu / kvm.
the server itself has 32 cords, and 96gb of memory.

each node is allocated 4 cords, and 16gb of memory. the master servers and one data node are on a single ssd, and the other datanode is on it's own ssd.

again, right now, this is just a proof of concept. i just need to get a table to actually load from hive into es, once i get that working, i can start planning out real hardware requirements.

here's a new screenshot from when i'm trying the python insert:

i got the following exception from parallel_bulk: unavailable_shards_exception

heres my python script:

def es_bulk(inputfile,thread_count,batch_size):
    import csv
    from elasticsearch import Elasticsearch, helpers

    es = Elasticsearch(
        hosts=['es-m02','es-m03','es-d01','es-d02'],
        port=9200,
        timeout=120
    )

    with open(inputfile) as f:
        reader = csv.DictReader(
            f,
            delimiter='|',
            fieldnames=['f1','f2','f3','f4','f5','f6','f7','f8']
        )
        for success, info in helpers.parallel_bulk(es, reader, thread_count=thread_count, chunk_size=batch_size, index='my.index', request_timeout=120):
            if not success:
                errorfile=os.path.join(os.path.dirname(inputfile),"error",os.path.basename(inputfile))
                with open(errorfile, 'w') as ef:
                    print >> ef, info
                ef.close()

please node, it doesnt look like the error writting/logging is working, because it just barfs a bunch of json on the screen instead of outputting to the log. so i'm not sure i'm doing that part correctly.

also worth a note, kibana, and my elected master is es-01 which is why it's not in the hosts list.
lmk if you guys see anything i can/should adjust.

if i run just one import, it seems to work fine, if i wait a long period of time inbetween additional inserts, it seems okay too. but i have to wait like 30 mintues before i can insert another file, or it'll blow up.

also found this in recent log entries:

Recent Log Entries
Showing the most recent log entries for this cluster, up to 10 total log entries.

This table contains 10 rows.
Timestamp
Level
Type
Message
Component
Node
April 20, 2021 4:47:48 PM
INFO
Server
{"type": "server", "timestamp": "2021-04-20T16:47:48,791Z", "level": "INFO", "component": "o.e.m.j.JvmGcMonitorService", "cluster.name": "my-cluster", "node.name": "es-d02", "message": "[gc][70007] overhead, spent [262ms] collecting in the last [1s]", "cluster.uuid": "eUv12yWkRkmLIcYU2qFU2A", "node.id": "7frrjjyDRtykC5EKSqwRrw" }
o.e.m.j.JvmGcMonitorService
es-d02
April 20, 2021 4:40:18 PM
WARN
Server
Received response for a request that has timed out, sent [24.4s/24416ms] ago, timed out [9.6s/9607ms] ago, action [indices:monitor/stats[n]], node [{es-d01}{AQOeyh6hT7yv6caZUX1JPg}{WCzVZeWJSRGshvJsfgYL_g}{192.168.4.63}{192.168.4.63:9300}{cdfhilrstw}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=true}], id [1187962]
o.e.t.TransportService
es-m01
April 20, 2021 4:40:08 PM
WARN
Server
failed to retrieve shard stats from node [AQOeyh6hT7yv6caZUX1JPg]: [es-d01][192.168.4.63:9300][indices:monitor/stats[n]] request_id [1187962] timed out after [14809ms]
o.e.c.InternalClusterInfoService
es-m01
April 20, 2021 4:40:00 PM
WARN
Server
Received response for a request that has timed out, sent [11.2s/11207ms] ago, timed out [1.2s/1200ms] ago, action [internal:coordination/fault_detection/follower_check], node [{es-d01}{AQOeyh6hT7yv6caZUX1JPg}{WCzVZeWJSRGshvJsfgYL_g}{192.168.4.63}{192.168.4.63:9300}{cdfhilrstw}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=true}], id [1187924]
o.e.t.TransportService
es-m01
April 20, 2021 4:39:36 PM
WARN
Server
Received response for a request that has timed out, sent [10.6s/10607ms] ago, timed out [600ms/600ms] ago, action [internal:coordination/fault_detection/follower_check], node [{es-d01}{AQOeyh6hT7yv6caZUX1JPg}{WCzVZeWJSRGshvJsfgYL_g}{192.168.4.63}{192.168.4.63:9300}{cdfhilrstw}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=true}], id [1187682]
o.e.t.TransportService
es-m01
April 20, 2021 4:39:34 PM
WARN
Server
Received response for a request that has timed out, sent [19.6s/19613ms] ago, timed out [9.6s/9607ms] ago, action [internal:coordination/fault_detection/follower_check], node [{es-d01}{AQOeyh6hT7yv6caZUX1JPg}{WCzVZeWJSRGshvJsfgYL_g}{192.168.4.63}{192.168.4.63:9300}{cdfhilrstw}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=true}], id [1187587]
o.e.t.TransportService
es-m01
April 20, 2021 4:39:10 PM
WARN
Server
Received response for a request that has timed out, sent [10.8s/10809ms] ago, timed out [801ms/801ms] ago, action [internal:coordination/fault_detection/follower_check], node [{es-d01}{AQOeyh6hT7yv6caZUX1JPg}{WCzVZeWJSRGshvJsfgYL_g}{192.168.4.63}{192.168.4.63:9300}{cdfhilrstw}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=true}], id [1187485]
o.e.t.TransportService
es-m01
April 20, 2021 4:38:51 PM
WARN
Server
Received response for a request that has timed out, sent [9.8s/9814ms] ago, timed out [0s/0ms] ago, action [internal:coordination/fault_detection/leader_check], node [{es-m01}{roWYC3m8S1ipqbvyNaEoxQ}{oCYL4tVXRNK3s0zG8bTiCw}{192.168.4.60}{192.168.4.60:9300}{ilmr}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=false}], id [407915]
o.e.t.TransportService
es-d01
April 20, 2021 4:38:10 PM
WARN
Server
Received response for a request that has timed out, sent [19.4s/19458ms] ago, timed out [9.4s/9444ms] ago, action [internal:coordination/fault_detection/leader_check], node [{es-m01}{roWYC3m8S1ipqbvyNaEoxQ}{oCYL4tVXRNK3s0zG8bTiCw}{192.168.4.60}{192.168.4.60:9300}{ilmr}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=false}], id [407686]
o.e.t.TransportService
es-d01
April 20, 2021 4:38:08 PM
WARN
Server
Received response for a request that has timed out, sent [28.4s/28452ms] ago, timed out [18.4s/18447ms] ago, action [internal:coordination/fault_detection/leader_check], node [{es-m01}{roWYC3m8S1ipqbvyNaEoxQ}{oCYL4tVXRNK3s0zG8bTiCw}{192.168.4.60}{192.168.4.60:9300}{ilmr}{ml.machine_memory=16819191808, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=10737418240, transform.node=false}], id [407664]
o.e.t.TransportService
es-d01

How many indexing threads are you using? What is your bulk size?

Are you sending all bulk requests directly to the data nodes?

2 threads, 10,000 batch_size.
as for directly to datanodes, no, i'm using the elasticsearch object and giving it 4 of the 5 nodes. i can though. i'm still trying to figure out the best way to do this, i'm very new to ES and semi-new to python.

any/all pointers are welcome :slight_smile:

should i change the hosts to just the two data nodes?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.