How to stop logstash with php exec


(Henri) #1

Hi all I use php to manage input info and after use a php exec to load the logstash conf file like that :

the php file :

<?php 

$sql4="SELECT pm1.document_id, pm2.nom_maire FROM `cm_documentid` as pm1 LEFT join cm_datas as pm2 ON pm1.CODGEO = pm2.CODGEO WHERE `document_id` LIKE '%mairie%' ORDER BY `pm1`.`CODGEO` ASC";
$result4 = $wpdb->get_results($sql4, 'ARRAY_A' );

unlink('sp_maires.csv');
$fp3 = fopen("sp_maires.csv", "w");
foreach($result4 as $result41):
    fputcsv($fp3, $result41);
endforeach;
fclose($fp3);

echo exec('sudo /usr/share/logstash/bin/logstash -f /home/dev/public_html/datas/sp_maires.conf');
?>

And the .conf file :

input {
    file {
        path => "/home/dev/public_html/sp_maire.csv"
        start_position => beginning
		sincedb_path => "/dev/null"	
	}
}
filter {
	csv {
        columns => [
		"identifiant","nom_maire"
		]
        separator => ","
		skip_empty_columns => true
    }
	
}
output {
    	elasticsearch {
			hosts => "http://localhost:9200"
			index => "my_index"
			document_id => "%{identifiant}"
			timeout => 30
			workers => 1
			doc_as_upsert => true
			action => "update"
		}
	
	stdout { codec => rubydebug }
} 

The problem is that this logstash don't stop after doing the job, and if I load too many logstash jobs, elasticsearch will stop...

The question is how can I close the logstash job after the end of it?

Thanks


(Magnus Bäck) #2

Logstash's file input doesn't have a notion of "done". It's meant to continuously monitor log files. You can use the stdin input instead (and rewrite your script to pass the files to Logstash via stdin) but then you won't be able to keep track of how much of the data that has been processed (i.e. if you restart Logstash it'll process all the data from the beginning).

Another option could be to point Logstash to a directory where files are placed and have it continuously monitor that directory for new files. When your PHP script has a file it wants to have parsed it just copies the file into that directory. Then you'll never need more than one Logstash process running at a time.


(Henri) #3

Thanks Magnus that's very clear I understand the logstash process now!

Do you know if I could create several logstash instances like one for the update, another for delete old files, etc...? Or do I must create one .conf file with the update and delete on the same conf file?

Thanks a lot for your feedback


(Magnus Bäck) #4

You want to delete documents from ES based on something you figure out in your PHP script? Perhaps you should configure Logstash to monitor two directories and have two file inputs; one for updates and one for deletions.

file {
  path => ["/path/to/updates/*.txt"]
}

file {
  path => ["/path/to/deletions/*.txt"]
  tags => ["delete"]
}

Events originating from files in the deletions directory will be tagged "delete" so you can add conditionals to the rest of your configuration to tread those events differently.


(Henri) #5

Indeed this is the right way to do that

thanks!


(Henri) #6

Hi Magnus,

I change my script but with my new conf file logstash doesn't do anything, maybe I made an error ...

input {
    file {
        path => "/home/2803/public_html/datas/updates/*.csv"
    }
	file {
        path => "/home/2803/public_html/datas/deletions/*.csv"
    	tags => ["delete"]
	}
}
filter {
	if "delete" in [tags] {
		csv {
			columns => [
			"id","date","document_id","CODGEO","dateupdate"
			]
			separator => ","
			skip_empty_columns => true
		}
	} else {
		csv {
			columns => [
			"miseajour","nom","identifiant","CODGEO","street","cp","nomcommune","streetpostale","cppostale","nomcommunepostale","latpostale","lngpostale","tel","fax","email","siteweb","horaires","lat","lng","dept","DEPET","provider"
			]
			separator => ","
			skip_empty_columns => true
		}
		mutate {
			add_field => {
			  "[location][lat]" => "%{lat}"
			  "[location][lon]" => "%{lng}"
			}
			remove_field => [ "lng" ]
			remove_field => [ "lat" ]
		}
		mutate {
		  convert => {
			"[location][lat]" => "float"
			"[location][lon]" => "float"
		  }
		}
	}
}
output {
	if "delete" in [tags] {
		elasticsearch {
			hosts => "http://localhost:9200"
			index => "sirene"
			document_id => "%{document_id}"
			timeout => 30
			workers => 1
			doc_as_upsert => true
			action => "delete"
		}
	} else {
		elasticsearch {
			hosts => "http://localhost:9200"
			index => "sirene"
			document_id => "%{identifiant}"
			timeout => 30
			workers => 1
			doc_as_upsert => true
			action => "update"
		}
	}
	stdout { codec => rubydebug }
}

Thanks again!

Update I forgot to add

start_position => beginning
sincedb_path => "/dev/null"	

And now it's work


(system) #7

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.