How to get current/existing pipelines from ELK

I just became the owner of a very old ELK cluster and need to get all of the configs pulled from it to migrate to a cloud hosted instance.
The system shows there are several pipelines in Kibana Management, Logstash, Pipelines.. but they are all unavailable as it says "This pipeline wasn't created using centralized configuration management. It can't be managed or edited here."

Is there any way to get them exported or anything so I can create them on our new installation?
I have done a full snapshot and restored it, with global state, and these don't show up.

Second, is there a way to get a list of users/roles that are assigned if the current auth method is active directory?
I've looked through the API to get users, roles and pipelines but it doesn't have anything except the default settings in the output.

This is what I am referring to

pipelines

I have go into the API to try to pull pipelines and I don't think it's doing anything as what I'm getting back looks to be all defaults and nothing related to what is listed in the GUI

when running GET on _ingest/pipline this is what comes back

{
  "xpack_monitoring_2" : {
    "description" : "This pipeline upgrades documents from the older version of the Monitoring API to the newer version (6) by fixing breaking changes in those older documents before they are indexed from the older version (2).",
    "version" : 6070299,
    "processors" : [
      {
        "script" : {
          "source" : "boolean legacyIndex = ctx._index == '.monitoring-data-2';if (legacyIndex || ctx._index.startsWith('.monitoring-es-2')) {if (ctx._type == 'cluster_info') {ctx._type = 'cluster_stats';ctx._id = null;} else if (legacyIndex || ctx._type == 'cluster_stats' || ctx._type == 'node') {String index = ctx._index;Object clusterUuid = ctx.cluster_uuid;Object timestamp = ctx.timestamp;ctx.clear();ctx._id = 'xpack_monitoring_2_drop_bucket';ctx._index = index;ctx._type = 'legacy_data';ctx.timestamp = timestamp;ctx.cluster_uuid = clusterUuid;}if (legacyIndex) {ctx._index = '<.monitoring-es-6-{now}>';}}"
        }
      },

If I do GET on _ingest/processor/grok, I get this

{
  "patterns" : {
    "BACULA_CAPACITY" : "%{INT}{1,3}(,%{INT}{3})*",
    "PATH" : "(?:%{UNIXPATH}|%{WINPATH})",
    "MONGO_SLOWQUERY" : "%{WORD} %{MONGO_WORDDASH:database}\\.%{MONGO_WORDDASH:collection} %{WORD}: %{MONGO_QUERY:query} %{WORD}:%{NONNEGINT:ntoreturn} %{WORD}:%{NONNEGINT:ntoskip} %{WORD}:%{NONNEGINT:nscanned}.*nreturned:%{NONNEGINT:nreturned}..+ (?<duration>[0-9]+)ms",
    "NAGIOS_EC_ENABLE_SVC_NOTIFICATIONS" : "ENABLE_SVC_NOTIFICATIONS",

and a whole lot more.. which I would think this was them, but it's only one processor and I don't know if that's different than the pipeline or what, especially since it has to do with grok.

Last one I found was GET _nodes/stats/ingest?filter_path=nodes.*.ingest
and that spits out the nodes and several things, but all of the counts are 0 and none of them match the names listed in the pipelines page, they only include the xpack-monitor2 stuff.

These are different things, there is no API to get those pipelines you will need to connect to every logstash server you have to get the pipelines.

The _ingest/pipeline API is related to Elasticsearch Ingest Pipelines, which is a way to parse and transform data directly on Elasticsearch.

Elasticsearch Ingest Pipelines and Logstash Pipelines are completely different from each other.

The pipelines you shared are Logstash pipelines, probably from the monitoring part if I'm not wrong, it has been a couple of years since I used this version and a lot of things changed.

what do you mean they are different?

The screen shot is from Kibana UI, under Management, Logstash, Pipelines. You're saying the API calls are not for those, but ones built into elastic?

So, the pipelines that I'm looking for would be considered a parser?
As an example, this is the 3 files in a parsers folder for some logs named BGCS

00-kafka-input.conf

input {
  kafka {
    bootstrap_servers => "${ZOOKEEPER1},${ZOOKEEPER2},${ZOOKEEPER3}"
    topics => ["bgcs-logstash"]
    group_id => "bgcs-logstash"
    codec => "json"
    session_timeout_ms => "30000"
    max_poll_records => "250"
    consumer_threads => 4
    decorate_events => true
  }
}
filter {
  mutate {
    copy => { "[@metadata][kafka]" => "[metadata][kafka]" }
  }
}

10-bgcs-filter.conf

filter {
    if [type] == "bgcs" {
        grok {
            match => ["message", "(?m)\[(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})\] %{WORD:level}\s+\[%{NOTSPACE:class}\] %{GREEDYDATA:Message}" ]
        }
        date {
            locale => "en"
            match => ["timestamp", "YY-MM-dd HH:mm:ss,SSS"]
        }
    }
    else if [type] == "WLP" {
                 grok {
                match => ["message", "(?m)\[%{DATE:date} %{TIME:time} %{WORD:tz2}] %{WORD:ThreadID} %{NOTSPACE:ShortName}\s*%{WORD:EventType}\s*%{GREEDYDATA:Message}"]
                  add_field => [ "timestamp", "%{date} %{time} %{tz2}" ]
                }
                grok {
                  match => [ "Message", "(?<MessageCode>[A-Z]{4,5}[0-9]{4}[A-Z]):"]
                  tag_on_failure => []
                }
                grok {
                  match => [ "Message", "\(\(Tenant=%{NOTSPACE:tenant}\)\)"]
                  tag_on_failure => []
                }
                grok {
                  match => [ "Message", "\(\(User=%{NOTSPACE:user}\)\)"]
                  tag_on_failure => []
                }
                mutate{
                        gsub => ["timestamp","CEST","+0200"]
                        }
                date{
                        locale => "en"
                        match => ["timestamp", "MM/dd/YY HH:mm:ss:SSS zzz", "M/d/YY HH:mm:ss:SSS zzz", "MM/d/YY HH:mm:ss:SSS zzz", "M/dd/YY HH:mm:ss:SSS zzz", "MM/dd/YY H:mm:ss:SSS zzz", "M/d/YY H:mm:ss:SSS zzz", "MM/d/YY H:mm:ss:SSS zzz", "M/dd/YY H:mm:ss:SSS zzz", "MM/dd/YY HH:mm:ss:SSS ZZZ", "M/d/YY HH:mm:ss:SSS ZZZ", "MM/d/YY HH:mm:ss:SSS ZZZ", "M/dd/YY HH:mm:ss:SSS ZZZ", "MM/dd/YY H:mm:ss:SSS ZZZ", "M/d/YY H:mm:ss:SSS ZZZ", "MM/d/YY H:mm:ss:SSS ZZZ", "M/dd/YY H:mm:ss:SSS ZZZ", "M/dd/YY HH:mm:ss:SSS Z", "M/dd/YY HH:mm:ss:SSS z"]
               }
        }
}

20-elasticsearch.output.conf

output{
  if [type] == "bgcs" {
      elasticsearch{
        document_type => "bgcs"
        hosts => ["${ELASTICSEARCH1}","${ELASTICSEARCH2}","${ELASTICSEARCH3}"]
        ssl => true
        cacert => "ca.der"
        index => "logstash-bgcs-%{+xxxx.ww}"
        user => "${ELASTICUSER}"
        password => "${ELASTICPASSWORD}"
        manage_template => false
      }
    }
    else if [type] == "WLP" {
         elasticsearch{
        document_type => "bgcs"
        hosts => ["${ELASTICSEARCH1}","${ELASTICSEARCH2}","${ELASTICSEARCH3}"]
        ssl => true
        cacert => "ca.der"
        index => "logstash-bgcs-wlp-%{+xxxx.ww}"
        user => "${ELASTICUSER}"
        password => "${ELASTICPASSWORD}"
        manage_template => false
      }
    }
}

From what I'm understanding (from you pointing me the right way, thanks again) each of these parser config files would need to be converted over to elasticsearch parsers... with or without grok if they use it.

Guess I get to go down that rabbit hole, of how to convert the hosts, user, pass and all the other stuff in those parsers into whatever can be used in elastic cloud.

Thanks again.

Yes, the API is just for the Ingest Pipelines, which are a Elasticsearch feature.

You are looking for Logstash pipelines, which is how logstash receives, process and send data, logstash pipelines have 3 blocks.

  • input, which is the source of the data, in your example the input block has a kafka input, means that logstash is consuming data from a kafka topic.
  • filter, this is where you can transform your data, parse your message and create fields according to the data, rename fields, enrich the data etc.
  • output, this is where the data will got, where logstash needs to send the data, in your example it sends the data to elasticsearch.

When you need to migrate from Logstash to Elasticsearch Ingest pipelines, you will only need to migrate the filters you have in the filter blocks, the input will be the data the elasticsearch is receiving and the output will also be elasticsearch with the specified index.

So yes, you will need to check every filter you have in every Logstash pipeline and see how you will migrate it to an ingest pipeline in Elasticsearch.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.