Extraction valeurs champ message

Bonjour,

Je souhaite indexer des paquets réseau et en extraire certaines informations mais logstash envoi tout dans un champ unique.
Voici le code logstash :

input {
  kafka {
    bootstrap_servers => "172.16.238.204:9092"
    topics => ["ids"]
  }
}

filter {
  mutate {
    gsub => [ "message","(\\")", "" ]
    gsub => [ "message","(\")", "" ]
    gsub => [ "message","[{]", "" ]
    gsub => [ "message","[}]", "" ]
    gsub => [ "message","[,]", "" ]
    gsub => [ "message","[:]", "" ]
  }
  mutate {
    split => { "message" => " " }
  }
  grok {
    match => { "message" => "%{IP:ip_src}" }
  }
}

output {
  elasticsearch {
    index => "ids"
    hosts => ["172.16.238.203:9200"]
  }
}```

les paquets sont indexés comme suit :

timestamp, 1628782345484, layers, frame, frame_frame_interface_id, 0, frame_interface_id_frame_interface_name, eth0, frame_frame_encap_type, 1, frame_frame_time, Aug, 12, 2021, 113225.484168109, EDT, frame_frame_offset_shift, 0.000000000, frame_frame_time_epoch, 1628782345.484168109, frame_frame_time_delta, 0.000462296, frame_frame_time_delta_displayed, 0.000462296, frame_frame_time_relative, 1.597838613, frame_frame_number, 162, frame_frame_len, 66, frame_frame_cap_len, 66, frame_frame_marked, 0, frame_frame_ignored, 0, frame_frame_protocols, ethethertypeiptcp, eth, eth_eth_dst, 00155dee2d1b, eth_dst_eth_dst_resolved, Microsof_ee2d1b, eth_dst_eth_addr, 00155dee2d1b, eth_dst_eth_addr_resolved, Microsof_ee2d1b, eth_dst_eth_lg, 0, eth_dst_eth_ig, 0, eth_eth_src, 00155dee2d1a, eth_src_eth_src_resolved, Microsof_ee2d1a, eth_src_eth_addr, 00155dee2d1a, eth_src_eth_addr_resolved, Microsof_ee2d1a, eth_src_eth_lg, 0, eth_src_eth_ig, 0, eth_eth_type, 0x00000800, ip, ip_ip_version, 4, ip_ip_hdr_len, 20, ip_ip_dsfield, 0x00000000, ip_dsfield_ip_dsfield_dscp, 0, ip_dsfield_ip_dsfield_ecn, 0, ip_ip_len, 52, ip_ip_id, 0x000084a7, ip_ip_flags, 0x00004000, ip_flags_ip_flags_rb, 0, ip_flags_ip_flags_df, 1, ip_flags_ip_flags_mf, 0, ip_flags_ip_frag_offset, 0, ip_ip_ttl, 64, ip_ip_proto, 6, ip_ip_checksum, 0x00008063, ip_ip_checksum_status, 2, ip_ip_src, 172.16.238.203, ip_ip_addr, [172.16.238.203, 172.16.238.204], ip_ip_src_host, 172.16.238.203, ip_ip_host, [172.16.238.203, 172.16.238.204], ip_ip_dst, 172.16.238.204, ip_ip_dst_host, 172.16.238.204, tcp, tcp_tcp_srcport, 44708, tcp_tcp_dstport, 9092, tcp_tcp_port, [44708, 9092], tcp_tcp_stream, 1, tcp_tcp_len, 0, tcp_tcp_seq, 1788, tcp_tcp_nxtseq, 1788, tcp_tcp_ack, 542127, tcp_tcp_hdr_len, 32, tcp_tcp_flags, 0x00000010, tcp_flags_tcp_flags_res, 0, tcp_flags_tcp_flags_ns, 0, tcp_flags_tcp_flags_cwr, 0, tcp_flags_tcp_flags_ecn, 0, tcp_flags_tcp_flags_urg, 0, tcp_flags_tcp_flags_ack, 1, tcp_flags_tcp_flags_push, 0, tcp_flags_tcp_flags_reset, 0, tcp_flags_tcp_flags_syn, 0, tcp_flags_tcp_flags_fin, 0, tcp_flags_tcp_flags_str, \u00c2\u00b7\u00c2\u00b7\u00c2\u00b7\u00c2\u00b7\u00c2\u00b7\u00c2\u00b7\u00c2\u00b7A\u00c2\u00b7\u00c2\u00b7\u00c2\u00b7\u00c2\u00b7, tcp_tcp_window_size_value, 14480, tcp_tcp_window_size, 14480, tcp_tcp_window_size_scalefactor, -1, tcp_tcp_checksum, 0x00005a87, tcp_tcp_checksum_status, 2, tcp_tcp_urgent_pointer, 0, tcp_tcp_options, 0101080a8c59e7d6bb8e4ec4, tcp_options_tcp_options_nop, [01, 01], tcp_options_nop_tcp_option_kind, [1, 1], tcp_options_tcp_options_timestamp, 080a8c59e7d6bb8e4ec4, tcp_options_timestamp_tcp_option_kind, 8, tcp_options_timestamp_tcp_option_len, 10, tcp_options_timestamp_tcp_options_timestamp_tsval, 2354702294, tcp_options_timestamp_tcp_options_timestamp_tsecr, 3146665668, tcp_tcp_analysis, tcp_analysis_tcp_analysis_acks_frame, 161, tcp_analysis_tcp_analysis_ack_rtt, 0.000462296, tcp_text, Timestamps, text_tcp_time_relative, 1.251021314, text_tcp_time_delta, 0.000462296```

Je souhaiterais extraire les valeurs de certain champ. par ex la valeur d'ip_ip_len, ou celle de tcp_flags_tcp_flags_syn.

Merci

Bonjour,

Tout d'abord, je pense que l'utilisation de split n'est pas un très bon choix ici.
En l'occurence, comme le split est mis en place sur le caractère espace, vous allez séparer certaines données comme par exemple [01, 01] qui est la valeur de tcp_options_tcp_options_nop.

Je pense que le meilleur moyen de récupérer les données est de mettre en place un ou plusieurs pattern grok.

#Cas avec un pattern grok
grok {
  match => {
    "message" => "ip_ip_len, %{BASE10NUM:ip_ip_len}.*tcp_flags_tcp_flags_syn, %{BASE10NUM:tcp_flags_tcp_flags_syn}"
  }
}
#Cas avec plusieurs pattern grok
grok {
  match => {
    "message" => "ip_ip_len, %{BASE10NUM:ip_ip_len}"
  }
}

grok {
  match => {
    "message" => "tcp_flags_tcp_flags_syn, %{BASE10NUM:tcp_flags_tcp_flags_syn}"
  }
}

Cad.

Bonjour,

Je m'en suis sortir avec ceci qui fonctionne uniquement pour extraire les champs du protocole IP :

grok {
match => { "message" => "%{WORD:timestamp} %{NUMBER:date} %{WORD:couche} %{WORD:toto} %{IP:ip_src} %{WORD:toto} %{IP:ip_dst} %{WORD:toto} %{NUMBER:ip_flag_offset} %{WORD:toto} %{NUMBER:ip_flag_rb} %{WORD:toto} %{NUMBER:ip_flag_df} %{WORD:toto} %{NUMBER:ip_flag_mf} %{WORD:toto} %{NUMBER:ip_ttl} %{WORD:toto} %{NUMBER:ip_len}" }
}

mutate {
remove_field => ["toto"]
remove_field => ["date"]
remove_field => ["couche"]
remove_field => ["message"]
remove_field => ["timestamp"]
}
}

par contre les format de message n'est pas le même à chaque fois s'il s'agit d'udp, tcp, ssh, http, bref selon les protocoles utilisés.

Il faut que j'utilise un index par protocole (IP, TCP, ...) avec un GROK par index respectant le format de message ?

Bonjour,

Tous dépend de la façon dont vous voulez les traiter par la suite.
Personnellement, oui, j'aurai mis en place un index pour chaque type de valeur.
Je pense que le meilleur moyen de procéder est le suivant

input{
  kafka {
    bootstrap_servers => "172.16.238.204:9092"
    topics => ["ids"]
  }
}

filter {
# Si mon message contient "http_"
  if [message] =~ /http_/ {
    # Ajouter le tag HTTP
    mutate {
      add_tag => [ "HTTP" ]
    }
    # Utiliser la regex grok lui correspondant
    grok {
      match => { "message" => "TODO pattern pour HTTP" }
    }
  } else if [message] =~ /tcp_/ {
    mutate {
      add_tag => [ "TCP" ]
    }
    grok {
      match => { "message" => "TODO pattern pour TCP" }
    }
  } else if [message] =~ /udp_/ {
    mutate {
      add_tag => [ "UDP" ]
    }
    grok {
      match => { "message" => "TODO pattern pour UDP" }
    }
  } else if [message] =~ /ip_/ {
    mutate {
      add_tag => [ "IP" ]
    }
    grok {
      match => { "message" => "%{WORD:timestamp} %{NUMBER:date} %{WORD:couche} %{WORD:toto} %{IP:ip_src} %{WORD:toto} %{IP:ip_dst} %{WORD:toto} %{NUMBER:ip_flag_offset} %{WORD:toto} %{NUMBER:ip_flag_rb} %{WORD:toto} %{NUMBER:ip_flag_df} %{WORD:toto} %{NUMBER:ip_flag_mf} %{WORD:toto} %{NUMBER:ip_ttl} %{WORD:toto} %{NUMBER:ip_len}" }
    }
  }
}
output {
  # Selon le tag mis en place, utiliser l'index lui correspondant
  if "IP" in [tags] {
    elastcisearch {
      #TODO completer
      index => "logs-IP"
    }
  } else if "TCP" in [tags] {
    elastcisearch {
      #TODO completer
      index => "logs-TCP"
    }
  } else if "UDP" in [tags] {
    elastcisearch {
      #TODO completer
      index => "logs-UDP"
    }
  } else if "HTTP" in [tags] {
    elastcisearch {
      #TODO completer
      index => "logs-HTTP"
    }
  } else {
    elastcisearch {
      #TODO completer
      index => "logs-UNKNOWN"
    }
  }
}

C'est ce que j'ai fait un final. Par contre j'ai modifié en amont afin de créer plusieurs topic sous kafka ce qui simplifie les choses pour logstash.

input {
kafka {
bootstrap_servers => "172.16.xxx.xxx:9092"
topics => "_ip"
group_id => "_ip"
type => "ip"
}
kafka {
bootstrap_servers => "172.16.xxx.xxx:9092"
topics => "_tcp"
group_id => "_tcp"
type => "tcp"
}
kafka {
bootstrap_servers => "172.16.xxx.xxx:9092"
topics => "_udp"
group_id => "_udp"
type => "udp"
}
}

Par contre compliqué d'identifier le bon pattern
if [message] =~ /http_/ {
car les motifs reviennent dans plusieurs trames.

Bonjour,

Comme vous faite le travail en amont dans kafka, il n'y a plus besoin de faire de condition sur le contenu du message. Vous pouvez directement placer la condition sur le champ type que vous instancié dans l'input.

if [type] == "ip" {...

Type que vous pouvez ajouter à la fin de votre index pour différencier les données dans elasticsearch. Ce qui est plus efficace, en terme de performance, que la succession de if que j'avais pu mettre en exemple dans ma précédente réponse.

index => "index-%{[type]}"

Cad.