diff --git a/Influxer/Program.cs b/Influxer/Program.cs index 70ca730..859e9ba 100644 --- a/Influxer/Program.cs +++ b/Influxer/Program.cs @@ -1,4 +1,5 @@ //Copyright - Adarsha@AdysTech +//https://github.com/AdysTech/Influxer/blob/master/Influxer/Program.cs using System; using System.Collections.Generic; using System.IO; @@ -16,6 +17,20 @@ namespace AdysTech.Influxer { class Program { + enum Filters + { + None, + Measurement, + Field, + Columns + } + + enum FileFormats + { + Perfmon, + Generic + } + const string InfluxUrlSwitch = "-influx"; const string InfluxDBSwitch = "-dbname"; const string TagsSwitch = "-tags"; @@ -26,6 +41,8 @@ class Program const string InfluxPwdSwitch = "-pass"; const string FileFormatSwitch = "-format"; const string TableNameSwitch = "-table"; + const string FilterSwitch = "-filter"; + const string ColumnsSwitch = "-columns"; private static string influxUrl; private static string influxDB; @@ -35,9 +52,14 @@ class Program private static string inputFileName; private static char seperator = ','; private static string timeFormat = "MM/dd/yyyy HH:mm:ss.fff"; - private static string fileFormat = "perfmon"; + private static FileFormats fileFormat; private static string tableName; private static Regex pattern; + private static Filters filter; + private static string filteredColumns; + + private static char[] influxIdentifiers = new char[] { ' ', ';', '_', '(', ')', '%', '#', '.', '/', '[', ']', '{', '}', '"' }; + private static char[] whiteSpace = new char[] { '_' }; static int Main(string[] args) { @@ -55,6 +77,11 @@ static int Main(string[] args) Console.WriteLine ("-timeformat : Timeformat used by PerfMon (default MM/dd/yyyy hh:mm:ss.fff)"); Console.WriteLine ("-format : CSV File format PerfMon(default) and Generic are supported. For generic TableName is required"); Console.WriteLine ("-table : Influx Table name needed for generic fomat only"); + Console.WriteLine ("-filter : supported:measurement, field, columns."); + Console.WriteLine ("-columns : Comma seperated list of columns from input files"); + Console.WriteLine ("-filter-measurement or field is to restrict the input file to only measurements or fileds that already present in the database"); + Console.WriteLine ("-filter-columns is to restrict to only few columns from the input irrespective of existing data in database"); + Console.WriteLine ("-columns will be ignored in other cases. Replace any inline commas in columns names with a space!"); return 0; } @@ -86,6 +113,8 @@ static int Main(string[] args) if ( cmdArgs.ContainsKey (TagsSwitch) ) tags = cmdArgs[TagsSwitch].Replace (' ', '_'); + if ( cmdArgs.ContainsKey (TableNameSwitch) ) + tableName = cmdArgs[TableNameSwitch]; if ( cmdArgs.ContainsKey (InputFileNameSwitch) ) { @@ -116,23 +145,67 @@ static int Main(string[] args) if ( cmdArgs.ContainsKey (FileFormatSwitch) ) { - fileFormat = cmdArgs[FileFormatSwitch].ToLower (); - if ( !new[] { "perfmon", "generic" }.Contains (fileFormat) ) + if ( !Enum.TryParse (cmdArgs[FileFormatSwitch], true, out fileFormat) || !Enum.IsDefined (typeof (FileFormats), fileFormat) ) { - Console.WriteLine ("Not supported format{0}!!", fileFormat); + Console.WriteLine ("Not supported format{0}!!", cmdArgs[FileFormatSwitch]); + return 1; + } + } + { + fileFormat = FileFormats.Perfmon; + } + + if ( cmdArgs.ContainsKey (FilterSwitch) ) + { + if ( !Enum.TryParse (cmdArgs[FilterSwitch], true, out filter) || !Enum.IsDefined (typeof (Filters), filter) ) + { + Console.WriteLine ("Not supported filter:{0}!!", cmdArgs[FilterSwitch]); + return 1; + } + } + else + { + filter = Filters.None; + } + + if ( cmdArgs.ContainsKey (ColumnsSwitch) ) + { + if ( filter != Filters.Columns ) + { + Console.WriteLine ("Column filtering is supported only with -filter Columns!!"); + return 1; + } + if ( String.IsNullOrWhiteSpace (cmdArgs[ColumnsSwitch]) ) + { + Console.WriteLine ("Invalid Column filter!!"); + return 1; + } + + filteredColumns = cmdArgs[ColumnsSwitch]; + + try + { + var temp = ParsePerfMonFileHeader (filteredColumns, false); + if ( temp.Count == 0 ) + { + Console.WriteLine ("No columns filtered!!"); + return 1; + } + } + catch ( Exception e ) + { + Console.WriteLine ("Unable to parse column filters"); return 1; } } - if ( cmdArgs.ContainsKey (TableNameSwitch) ) - tableName = cmdArgs[TableNameSwitch]; #endregion try { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); - + Stopwatch stopwatch = new Stopwatch (); + stopwatch.Start (); + HttpClientHandler handler = new HttpClientHandler (); handler.UseDefaultCredentials = true; handler.PreAuthenticate = true; @@ -143,7 +216,7 @@ static int Main(string[] args) if ( !( String.IsNullOrWhiteSpace (influxDBUserName) && String.IsNullOrWhiteSpace (influxDBPassword) ) ) client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue ("Basic", Convert.ToBase64String (System.Text.ASCIIEncoding.ASCII.GetBytes (string.Format ("{0}:{1}", influxDBUserName, influxDBPassword)))); - if ( !VerifyDatabase (client, influxDB).Result ) + if ( !VerifyDatabaseAsync (client, influxDB).Result ) { Console.WriteLine ("Unable to create DB {0}", influxDB); return -1; @@ -152,19 +225,20 @@ static int Main(string[] args) var result = false; switch ( fileFormat ) { - case "perfmon": + case FileFormats.Perfmon: result = ProcessPerfMonLog (inputFileName, client).Result; break; - case "generic": + case FileFormats.Generic: if ( String.IsNullOrWhiteSpace (tableName) ) throw new ArgumentException ("Generic format needs TableName input"); result = ProcessGenericFile (inputFileName, tableName, client).Result; break; } - stopwatch.Stop(); + + stopwatch.Stop (); if ( result ) { - Console.WriteLine ("\n Finished!! Processed in {0}", stopwatch.Elapsed.ToString()); + Console.WriteLine ("\n Finished!! Processed in {0}", stopwatch.Elapsed.ToString ()); return 0; } @@ -178,7 +252,7 @@ static int Main(string[] args) return 0; } - private static async Task VerifyDatabase(HttpClient client, string DBName) + private static async Task VerifyDatabaseAsync(HttpClient client, string DBName) { var influxAddress = new Uri (influxUrl + "/query?"); @@ -190,6 +264,11 @@ private static async Task VerifyDatabase(HttpClient client, string DBName) return true; else { + if ( filter == Filters.Measurement || filter == Filters.Field ) + { + Console.WriteLine ("Measurement/Field filtering is not applicable for new database!!"); + filter = Filters.None; + } return await CreateInfluxDBAsync (client, DBName, influxAddress); } } @@ -221,6 +300,62 @@ private static async Task> GetInfluxDBNamesAsync(HttpClient client, return null; } + private static async Task>> GetInfluxDBStructureAsync(HttpClient client, Uri InfluxPath, string dbName) + { + var dbStructure = new Dictionary> (); + try + { + var builder = new UriBuilder (InfluxPath); + //builder.UserName = influxDBUserName; + //builder.Password = influxDBPassword; + builder.Query = await new FormUrlEncodedContent (new[] { + new KeyValuePair("u",influxDBUserName) , + new KeyValuePair("p", influxDBPassword) , + new KeyValuePair("db", dbName) , + new KeyValuePair("q", "SHOW FIELD KEYS") + }).ReadAsStringAsync (); + HttpResponseMessage response = await client.GetAsync (builder.Uri); + + if ( response.StatusCode == HttpStatusCode.OK ) + { + var content = await response.Content.ReadAsStringAsync (); + var values = Regex.Matches (content, "([a-zA-Z0-9_]+)").Cast ().Select (match => match.Value).ToList (); + string measurement; + //one pass loop through the entries in returned structure. Each new measurement starts with "name",measurement name, "columns","fieldKey","values",list of columns + //we will search for name, and once found grab measurement name, skip 3 lines, and grab column names + for ( int i = 0; i < values.Count; i++ ) + { + if ( values[i] != "name" ) + continue; + if ( values[i] == "name" ) + { + if ( ++i == values.Count ) + throw new InvalidDataException ("Invalid data returned from InfluxDB"); + //i is incremented + measurement = values[i]; + dbStructure.Add (measurement, new List ()); + for ( int j = i + 4; j < values.Count; j++ ) + { + if ( values[j] == "name" ) + { + i = j - 1; + break; + } + dbStructure[measurement].Add (values[j]); + } + } + } + } + } + catch ( HttpRequestException e ) + { + + } + + return dbStructure; + } + + private static async Task CreateInfluxDBAsync(HttpClient client, string dbName, Uri InfluxPath) { try @@ -231,7 +366,7 @@ private static async Task CreateInfluxDBAsync(HttpClient client, string db builder.Query = await new FormUrlEncodedContent (new[] { new KeyValuePair("u",influxDBUserName) , new KeyValuePair("p", influxDBPassword) , - new KeyValuePair("q", "CREATE DATABASE "+dbName) + new KeyValuePair("q", "CREATE DATABASE "+ dbName) }).ReadAsStringAsync (); HttpResponseMessage response = await client.GetAsync (builder.Uri); @@ -265,7 +400,7 @@ private static async Task PostToInfluxAsync(HttpClient client, Uri InfluxP else return false; } - catch ( HttpRequestException e ) + catch ( HttpRequestException ex ) { return false; } @@ -288,53 +423,60 @@ private static async Task ProcessPerfMonLog(string InputFileName, HttpClie var failedCount = 0; StringBuilder content = new StringBuilder (); - //Stopwatch stopwatch = new Stopwatch(); - //stopwatch.Start(); + Stopwatch stopwatch = new Stopwatch (); + stopwatch.Start (); var influxAddress = new Uri (influxUrl + "/write?db=" + influxDB + "&precision=s"); - List pecrfCounters = new List (); - IEnumerable> perfGourp = null; - var firstLine = File.ReadLines (InputFileName).FirstOrDefault (); - var columns = pattern.Split (firstLine.Replace ("\"", "")); - if ( !columns[0].Contains ("PDH-CSV") ) + var firstCol = firstLine.Substring (0, firstLine.IndexOf (',')); + if ( !firstCol.Contains ("PDH-CSV") ) throw new Exception ("Input file is not a Standard Perfmon csv file"); - var x = Regex.Matches (columns[0], "([-0-9]+)"); + var x = Regex.Matches (firstCol, "([-0-9]+)"); if ( x.Count > 0 ) minOffset = int.Parse (x[3].ToString ()); + //get the column headers - var column = 1; - var influxIdentifiers = new char[]{' ',';','_','(',')','%'}; - var whiteSpace = new char[]{'_'}; - pecrfCounters.AddRange (columns.Skip (1).Where (s => s.StartsWith ("\\")).Select (p => - p.Replace (influxIdentifiers, "_").Split ('\\')).Select (p => - new PerfmonCounter () - { - ColumnIndex = column++, - Host = p[2].Trim(whiteSpace), - PerformanceObject = p[3].Trim(whiteSpace), - CounterName = p[4].Trim(whiteSpace) - })); - perfGourp = pecrfCounters.GroupBy (p => p.PerformanceObject); + List pecrfCounters; + try + { + pecrfCounters = ParsePerfMonFileHeader (firstLine); + } + catch ( Exception ex ) + { + throw new InvalidDataException ("Unable to parse file headers", ex); + } + + var filterColumns = ParsePerfMonFileHeader (filteredColumns, false); + + Dictionary> dbStructure; + IEnumerable> perfGroup; + if ( filter != Filters.None ) + { + dbStructure = await GetInfluxDBStructureAsync (client, new Uri (influxUrl + "/query?"), influxDB); + perfGroup = FilterPerfmonLogColumns (pecrfCounters, filterColumns, dbStructure).GroupBy (p => p.PerformanceObject); + } + else + { + perfGroup = pecrfCounters.GroupBy (p => p.PerformanceObject); + } //Parallel.ForEach (File.ReadLines (inputFileName).Skip (1), (string line) => foreach ( var line in File.ReadLines (InputFileName).Skip (1) ) { try { - if ( await ProcessPerfmonLogLine (line, perfGourp, minOffset, pattern, client, influxAddress) ) - { - lineCount++; - Console.Write ("\r Processed {0} ", lineCount); - } + if ( !await ProcessPerfmonLogLine (line, perfGroup, minOffset, pattern, client, influxAddress) ) + failedCount++; + + lineCount++; + + if ( failedCount > 0 ) + Console.Write ("\r{0} Processed {1}, Failed - {2} ", stopwatch.Elapsed.ToString (@"hh\:mm\:ss"), lineCount, failedCount); else - { - failedCount++; lineCount++; - Console.Write ("\r Processed {0}, Failed - {1} ", lineCount, failedCount); - } + Console.Write ("\r{0} Processed {1} ", stopwatch.Elapsed.ToString (@"hh\:mm\:ss"), lineCount); } catch ( Exception e ) { @@ -345,7 +487,7 @@ private static async Task ProcessPerfMonLog(string InputFileName, HttpClie } - //stopwatch.Stop(); + stopwatch.Stop (); //Debug.WriteLine("Done Async Processing, Time elapsed: {0}", stopwatch.Elapsed); lineCount = 0; @@ -363,7 +505,40 @@ private static async Task ProcessPerfMonLog(string InputFileName, HttpClie return true; } - private static async Task ProcessPerfmonLogLine(string line, IEnumerable> perfGourp, int minOffset, Regex pattern, HttpClient client, Uri InfluxPath) + private static List ParsePerfMonFileHeader(string headerLine, bool quoted = true) + { + List perfCounters = new List (); + if ( String.IsNullOrWhiteSpace (headerLine) ) return perfCounters; + var columns = pattern.Split (headerLine); + var column = 1; + + perfCounters.AddRange (columns.Skip (quoted ? 1 : 0).Where (s => quoted ? s.StartsWith ("\"\\") : s.StartsWith ("\\")).Select (p => + p.Replace (influxIdentifiers, "_").Split ('\\')).Select (p => + new PerfmonCounter () + { + ColumnIndex = column++, + Host = p[2].Trim (whiteSpace), + PerformanceObject = p[3].Trim (whiteSpace), + CounterName = p[4].Trim (whiteSpace) + })); + return perfCounters; + } + + private static List FilterPerfmonLogColumns(List columns, List filterColumns, Dictionary> dbStructure) + { + switch ( filter ) + { + case Filters.Measurement: + return columns.Where (p => dbStructure.ContainsKey (p.PerformanceObject)).ToList (); + case Filters.Field: + return columns.Where (p => dbStructure[p.PerformanceObject].Contains (p.CounterName)).ToList (); + case Filters.Columns: + return columns.Where (p => filterColumns.Any (f => p.PerformanceObject == f.PerformanceObject && p.CounterName == f.CounterName)).ToList (); + } + return columns; + } + + private static async Task ProcessPerfmonLogLine(string line, IEnumerable> perfGroup, int minOffset, Regex pattern, HttpClient client, Uri InfluxPath) { StringBuilder content = new StringBuilder (); DateTime timeStamp; @@ -380,26 +555,30 @@ private static async Task ProcessPerfmonLogLine(string line, IEnumerable p.Host) ) { lineStartIndex = content.Length; content.AppendFormat ("{0},Host={1}", group.Key, hostGrp.Key); + if ( tags != null ) content.AppendFormat (",{0} ", tags); else content.Append (" "); var useCounter = false; + foreach ( var counter in hostGrp ) { if ( !String.IsNullOrWhiteSpace (columns[counter.ColumnIndex]) && Double.TryParse (columns[counter.ColumnIndex], out value) ) { content.AppendFormat ("{0}={1:0.00},", counter.CounterName, value); useCounter = true; + } } + if ( useCounter ) content.AppendFormat (" {0}\n", epoch); else @@ -452,16 +631,16 @@ private static async Task ProcessGenericFile(string InputFileName, string { try { - if ( await ProcessGenericLine (line, columnHeaders, pattern, client, influxAddress) ) - { - lineCount++; - Console.Write ("\r Processed {0} ", lineCount); - } - else - { - failedCount++; lineCount++; + if ( !await ProcessGenericLine (line, columnHeaders, pattern, client, influxAddress) ) + failedCount++; + + lineCount++; + + if ( failedCount > 0 ) Console.Write ("\r Processed {0}, Failed - {1} ", lineCount, failedCount); - } + else + Console.Write ("\r Processed {0} ", lineCount); + } catch ( Exception e ) {