Hi
I have two csv files , s1.csv with following data,
1,2,3
1,2,3
2,3,4
And s2.csv with following data,
1,4
2,5
Consider column name as specified in configuration files like for s1 - A,B,C and for s2 Aa,Bb
Now I want to add new field called "Bbb" and copy "Bb" column from s2 file to s1 event by matching A = Aa. so I want to get following result
1,2,3,4
1,2,3,4
2,3,4,5
logstash-sample1.conf with following configuration
input {
file {
path => "/home/workload/s1.csv"
start_position => "beginning"
stat_interval => 30
type => "a"
}
}
filter {
if ([type] == "a") {
csv {
columns => ["A","B","C"]
separator => ","
convert => {"A"=>"integer"}
convert => {"B"=>"integer"}
convert => {"C" => "integer"}
}
}
}
output {
elasticsearch {
action => "index"
index => "s1index"
hosts => "10.0.1.73"
}
stdout {
codec=>"rubydebug"
}
}
And logstash-sample2.conf with following
input {
file {
path => "/home/workload/s2.csv"
start_position => "beginning"
stat_interval => 30
type => "b"
}
}
filter {
if ([type] == "b") {
csv {
columns => ["Aa","Bb"]
separator => ","
convert => {"Aa"=>"integer"}
convert => {"Bb"=>"integer"}
}
}
elasticsearch {
hosts => ["centrifyfw2"]
query => "_index:s1index AND type:a AND A:%{[Aa]} "
fields => ["B", "Bbb"]
}
}
output {
stdout {
codec=>"rubydebug"
}
}
I am getting following error
Starting pipeline {:id=>"base", :pipeline_workers=>4, :batch_size=>125, :batch_delay=>5, :max_inflight=>500, :level=>:info}
Pipeline started {:level=>:info}
Logstash startup completed
Failed to query elasticsearch for previous event {:query=>"_index:s1index AND type:a AND A:1 ", :event=>#<LogStash::Event:0x193eed2b @metadata_accessors=#<LogStash::Util::Accessors:0x1de75774 @store={"path"=>"/home/workload/s2.csv"}, @lut={"[path]"=>[{"path"=>"/home/workload/s2.csv"}, "path"]}>, @cancelled=false, @data={"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, @metadata={"path"=>"/home/workload/s2.csv"}, @accessors=#<LogStash::Util::Accessors:0x184abbb @store={"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, @lut={"path"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "path"], "host"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "host"], "type"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "type"], "[type]"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "type"], "message"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "message"], "[Aa]"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "Aa"], "[Bb]"=>[{"message"=>"1,4\r", "@version"=>"1", "@timestamp"=>"2016-04-13T13:17:38.436Z", "path"=>"/home/workload/s2.csv", "host"=>"centrifyfw2", "type"=>"b", "Aa"=>1, "Bb"=>4}, "Bb"]}>>, :error=>#<NoMethodError: undefined method `start_with?' for nil:NilClass>, :level=>:warn}
{
"message" => "1,4\r",
"@version" => "1",
"@timestamp" => "2016-04-13T13:17:38.436Z",
"path" => "/home/workload/s2.csv",
"host" => "centrifyfw2",
"type" => "b",
"Aa" => 1,
"Bb" => 4
}
Failed to query ..
{
"message" => "2,5\r",
"@version" => "1",
"@timestamp" => "2016-04-13T13:17:38.603Z",
"path" => "/home/workload/s2.csv",
"host" => "centrifyfw2",
"type" => "b",
"Aa" => 2,
"Bb" => 5
}
How to resolve it?