diff --git a/go.mod b/go.mod index f501611257..e01eac022c 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,8 @@ require ( github.com/swaggo/swag v1.6.7 github.com/ungerik/go-dry v0.0.0-20210209114055-a3e162a9e62e github.com/urfave/cli/v2 v2.8.1 + github.com/vektah/gqlparser/v2 v2.5.1 + github.com/lib/pq v1.10.2 golang.org/x/net v0.15.0 google.golang.org/grpc v1.50.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/sqle/server/auditplan/meta.go b/sqle/server/auditplan/meta.go index 42b45f75df..42add55ac8 100644 --- a/sqle/server/auditplan/meta.go +++ b/sqle/server/auditplan/meta.go @@ -19,26 +19,27 @@ type Meta struct { } const ( - TypeDefault = "default" - TypeMySQLSlowLog = "mysql_slow_log" - TypeMySQLMybatis = "mysql_mybatis" - TypeMySQLSchemaMeta = "mysql_schema_meta" - TypeMySQLProcesslist = "mysql_processlist" - TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log" - TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log" - TypeHuaweiRdsMySQLSlowLog = "huawei_rds_mysql_slow_log" - TypeOracleTopSQL = "oracle_top_sql" - TypeTiDBAuditLog = "tidb_audit_log" - TypeAllAppExtract = "all_app_extract" - TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log" - TypeSQLFile = "sql_file" + TypeDefault = "default" + TypeMySQLSlowLog = "mysql_slow_log" + TypeMySQLMybatis = "mysql_mybatis" + TypeMySQLSchemaMeta = "mysql_schema_meta" + TypeMySQLProcesslist = "mysql_processlist" + TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log" + TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log" + TypeOracleTopSQL = "oracle_top_sql" + TypeTiDBAuditLog = "tidb_audit_log" + TypeAllAppExtract = "all_app_extract" + TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log" + TypeSQLFile = "sql_file" + TypePostgreSQLSchemaMeta = "Postgresql_schema_meta" ) const ( - InstanceTypeAll = "" - InstanceTypeMySQL = "MySQL" - InstanceTypeOracle = "Oracle" - InstanceTypeTiDB = "TiDB" + InstanceTypeAll = "" + InstanceTypeMySQL = "MySQL" + InstanceTypeOracle = "Oracle" + InstanceTypeTiDB = "TiDB" + InstanceTypePostgreSQL = "PostgreSQL" ) const ( @@ -371,6 +372,26 @@ var Metas = []Meta{ InstanceType: InstanceTypeAll, CreateTask: NewDefaultTask, }, + { + Type: TypePostgreSQLSchemaMeta, + Desc: "库表元数据", + InstanceType: InstanceTypePostgreSQL, + CreateTask: NewPostgreSQLSchemaMetaTask, + Params: []*params.Param{ + { + Key: paramKeyCollectIntervalMinute, + Desc: "采集周期(分钟)", + Value: "60", + Type: params.ParamTypeInt, + }, + { + Key: "collect_view", + Desc: "是否采集视图信息", + Value: "0", + Type: params.ParamTypeBool, + }, + }, + }, } var MetaMap = map[string]Meta{} diff --git a/sqle/server/auditplan/task.go b/sqle/server/auditplan/task.go old mode 100644 new mode 100755 index 132816207b..8bd32b593f --- a/sqle/server/auditplan/task.go +++ b/sqle/server/auditplan/task.go @@ -1,10 +1,12 @@ package auditplan +import "C" import ( "bytes" "context" "encoding/json" "fmt" + "github.com/actiontech/sqle/sqle/driver" "net/http" "strconv" "strings" @@ -1550,3 +1552,444 @@ func NewBaiduRdsMySQLSlowLogTask(entry *logrus.Entry, ap *model.AuditPlan) Task return b } + +// PostgreSQLSchemaMetaTask : PostgreSQL库表元数据 +type PostgreSQLSchemaMetaTask struct { + *sqlCollector +} + +type PostgreSQLSchema struct { + schemaName string +} + +type PostgreSQLTablesAndViews struct { + schemaName string + tableName string + tableType string +} + +type PostgreSQLViewInfo struct { + schemaName string + tableName string + viewSql string +} + +type PostgreSQLCreateTableSql struct { + createTableSql string + createIndexSql string +} + +type PostgreSQLTableColumnInfo struct { + schemaName string + tableName string + columnSql string +} + +type PostgreSQLConstraint struct { + schemaName string + tableName string + constraintDefinition string +} + +type PostgreSQLIndex struct { + schemaName string + tableName string + indexName string + indexDefinition string +} + +func NewPostgreSQLSchemaMetaTask(entry *logrus.Entry, ap *model.AuditPlan) Task { + sqlCollector := newSQLCollector(entry, ap) + task := &PostgreSQLSchemaMetaTask{ + sqlCollector, + } + sqlCollector.do = task.collectorDo + return task +} + +func (at *PostgreSQLSchemaMetaTask) collectorDo() { + if at.ap.InstanceName == "" { + at.logger.Warnf("instance is not configured") + return + } + if at.ap.InstanceDatabase == "" { + at.logger.Warnf("instance database is not configured") + return + } + instance, _, err := dms.GetInstanceInProjectByName(context.Background(), string(at.ap.ProjectId), at.ap.InstanceName) + if err != nil { + at.logger.Errorf("get pg instance in project by instanceName failed: %s\n", err) + return + } + + pluginMgr := driver.GetPluginManager() + if !pluginMgr.IsOptionalModuleEnabled(instance.DbType, driverV2.OptionalModuleQuery) { + at.logger.Errorf("collect pg schema meta failed: %v", + driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery)) + return + } + plugin, err := pluginMgr.OpenPlugin(at.logger, instance.DbType, &driverV2.Config{DSN: &driverV2.DSN{ + Host: instance.Host, + Port: instance.Port, + User: instance.User, + Password: instance.Password, + AdditionalParams: instance.AdditionalParams, + DatabaseName: at.ap.InstanceDatabase, + }}) + if err != nil { + at.logger.Errorf("connect to instance fail, error: %v", err) + return + } + defer pluginMgr.Stop() + + // 获取所有的数据库及对应的schema + schemas, err := at.GetAllUserSchemas(plugin, at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get databases=%s schemas fail: %s", at.ap.InstanceDatabase, err) + return + } + + // 获取表和视图 + tablesAndViews, err := at.GetAllTablesAndViewsForPg(plugin, at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get all table and view fail, error: %s", err) + } + + // 获取列信息 + columnsInfo, err := at.GetAllColumnsInfoForPg(plugin, at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get all columns information fail, error: %s", err) + } + + // 获取约束信息 + constraints, err := at.GetAllConstraintsForPg(plugin) + if err != nil { + at.logger.Errorf("get all constraints fail, error: %s", err) + } + + // 获取索引信息 + indexes, err := at.GetAllIndexesForPg(plugin) + if err != nil { + at.logger.Errorf("get all indexes fail, error: %s", err) + } + + // 是否收集视图sql + isCollectView := false + if at.ap.Params.GetParam("collect_view").Bool() { + isCollectView = true + } + + var viewsSql []*PostgreSQLViewInfo + if isCollectView { + // 获取视图创建sql + viewsSql, err = at.GetAllViewsSqlForPg(plugin, at.ap.InstanceDatabase) + if err != nil { + at.logger.Errorf("get all views sql fail, error: %s", err) + } + } + + auditPlanSQLV2Slice := make([]*model.AuditPlanSQLV2, 0) + for _, schema := range schemas { + currentSchema := schema.schemaName + createTableSqls := make([]string, 0) + createViewSqls := make([]string, 0) + for _, tableOrView := range tablesAndViews { + if tableOrView.schemaName != currentSchema { + continue + } + dataObjectType := "" + tableOrViewName := tableOrView.tableName + if tableOrView.tableType == "BASE TABLE" || tableOrView.tableType == "SYSTEM VIEW" { // 表 + dataObjectType = "table" + } else if tableOrView.tableType == "VIEW" { // 视图 + dataObjectType = "view" + } + if dataObjectType == "table" { + createDDL := createTableSqlForPg(currentSchema, tableOrViewName, columnsInfo, constraints, indexes) + tableDDl := fmt.Sprintf("%s;%s", createDDL.createTableSql, createDDL.createIndexSql) + createTableSqls = append(createTableSqls, tableDDl) + } else if dataObjectType == "view" { + if !isCollectView { + continue + } + // 视图sql + for _, view := range viewsSql { + schemaName, tableName := view.schemaName, view.tableName + if schemaName != schema.schemaName || tableName != tableOrViewName { + continue + } + createViewSqls = append(createViewSqls, view.viewSql) + } + } + } + + if len(createTableSqls) > 0 { + auditPlanSQLV2Slice = append(auditPlanSQLV2Slice, convertRawSQLToModelSQLs(createTableSqls, currentSchema)...) + } + + if len(createViewSqls) > 0 { + auditPlanSQLV2Slice = append(auditPlanSQLV2Slice, convertRawSQLToModelSQLs(createViewSqls, currentSchema)...) + } + } + + if len(auditPlanSQLV2Slice) > 0 { + err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, auditPlanSQLV2Slice) + if err != nil { + at.logger.Errorf("save table and view schema meta to storage fail, error: %s", err) + } + } +} + +func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreSQLTableColumnInfo, constraints []*PostgreSQLConstraint, indexes []*PostgreSQLIndex) *PostgreSQLCreateTableSql { + tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableOrViewName) + // 列信息 + for _, columnInfo := range columnsInfo { + schemaName, tableName := columnInfo.schemaName, columnInfo.tableName + if schemaName != schema || tableName != tableOrViewName { + continue + } + tableDDl += columnInfo.columnSql + } + // 约束信息 + for _, constraintInfo := range constraints { + schemaName, tableName := constraintInfo.schemaName, constraintInfo.tableName + if schemaName != schema || tableName != tableOrViewName { + continue + } + tableDDl += fmt.Sprintf(",%s\n", constraintInfo.constraintDefinition) + } + tableDDl += ")" + indexDDl := "" + // 索引信息 + for _, indexInfo := range indexes { + schemaName, tableName := indexInfo.schemaName, indexInfo.tableName + if schemaName != schema || tableName != tableOrViewName { + continue + } + indexDDl += fmt.Sprintf("%s;\n", indexInfo.indexDefinition) + } + return &PostgreSQLCreateTableSql{ + createTableSql: tableDDl, + createIndexSql: indexDDl, + } +} + +func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([]*PostgreSQLSchema, error) { + result := make([]*PostgreSQLSchema, 0) + querySql := fmt.Sprintf(` + SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '%s' + AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name`, database) + res, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range res { + if len(value) == 0 { + continue + } + result = append(result, &PostgreSQLSchema{value[0]}) + } + if len(result) == 0 { + return result, fmt.Errorf("database=%s has no schema", database) + } + return result, nil +} + +func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTablesAndViews, error) { + querySql := fmt.Sprintf(`select table_schema, table_name, table_type from information_schema.tables + where table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema' + ORDER BY table_name`, database) + result := make([]*PostgreSQLTablesAndViews, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get tables and views error, column length is not three") + } + result = append(result, &PostgreSQLTablesAndViews{ + schemaName: value[0], + tableName: value[1], + tableType: value[2], + }) + } + return result, nil +} + +func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTableColumnInfo, error) { + columns := fmt.Sprintf(`select table_schema, table_name, + string_agg( + concat( + column_name, ' ', + case + when lower(data_type) in ('char', 'varchar', 'character', 'character varying') then concat(data_type, '(', coalesce(character_maximum_length, 0), ')') + when lower(data_type) in ('numeric', 'decimal') then concat(data_type, '(', coalesce(numeric_precision, 0), ',', coalesce(numeric_scale, 0), ')') + when lower(data_type) in ('integer', 'smallint', 'bigint', 'text') then data_type + else udt_name + end, + case + when column_default != '' then concat(' default ', column_default) else '' + end, + case + when is_nullable = 'no' then ' not null' else '' + end + ), ', ' order by ordinal_position + ) as columns_sql + from information_schema.columns + where table_catalog = '%s' and table_schema not like 'pg_%%' and table_schema != 'information_schema' + group by table_schema, table_name`, database) + result := make([]*PostgreSQLTableColumnInfo, 0) + ret, err := at.GetResult(plugin, columns) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get column info error, column length is not three") + } + result = append(result, &PostgreSQLTableColumnInfo{ + schemaName: value[0], + tableName: value[1], + columnSql: value[2], + }) + } + return result, nil +} + +func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) ([]*PostgreSQLConstraint, error) { + querySql := `select + n.nspname as schema_name, + c.relname as table_name, + concat ( 'constraint ', r.conname, ' ', pg_catalog.pg_get_constraintdef ( r.oid, true ) ) as constraint_definition + from + pg_catalog.pg_constraint r + join pg_catalog.pg_class c on c.oid = r.conrelid + join pg_catalog.pg_namespace n on n.oid = c.relnamespace + where + n.nspname not like'pg_%' + and n.nspname != 'information_schema'` + result := make([]*PostgreSQLConstraint, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get constraint error, column length is not three") + } + result = append(result, &PostgreSQLConstraint{ + schemaName: value[0], + tableName: value[1], + constraintDefinition: value[2], + }) + } + return result, nil +} + +func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([]*PostgreSQLIndex, error) { + querySql := `select schemaname, tablename, indexname, indexdef from pg_indexes + where schemaname not like 'pg_%' and schemaname != 'information_schema'` + result := make([]*PostgreSQLIndex, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 4 { + return result, fmt.Errorf("get index error, column length is not four") + } + result = append(result, &PostgreSQLIndex{ + schemaName: value[0], + tableName: value[1], + indexName: value[2], + indexDefinition: value[3], + }) + } + return result, nil +} + +func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, database string) ([]*PostgreSQLViewInfo, error) { + querySql := fmt.Sprintf(`select table_schema, table_name, + concat('create or replace view ', table_schema, '.', table_name, ' as ', view_definition) as create_view_statement + from information_schema.views + where table_catalog = '%s' + and table_schema not like 'pg_%%' + and table_schema != 'information_schema' + order by table_name`, database) + result := make([]*PostgreSQLViewInfo, 0) + ret, err := at.GetResult(plugin, querySql) + if err != nil { + return result, err + } + for _, value := range ret { + if len(value) < 3 { + return result, fmt.Errorf("get view sql error, column length is not three") + } + result = append(result, &PostgreSQLViewInfo{ + schemaName: value[0], + tableName: value[1], + viewSql: value[2], + }) + } + return result, nil +} + +func (at *PostgreSQLSchemaMetaTask) GetResult(plugin driver.Plugin, sql string) ([][]string, error) { + var ret [][]string + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120}) + if err != nil { + at.logger.Errorf("plugin.Query() failed:%s\n", err) + return nil, err + } + rows := result.Rows + for _, row := range rows { + values := row.Values + if len(values) == 0 { + continue + } + var valueArr []string + for _, value := range values { + valueArr = append(valueArr, value.Value) + } + ret = append(ret, valueArr) + } + return ret, nil +} + +func (at *PostgreSQLSchemaMetaTask) Audit() (*AuditResultResp, error) { + task, err := getTaskWithInstanceByAuditPlan(at.ap, at.persist) + if err != nil { + return nil, err + } + return at.baseTask.audit(task) +} + +func (at *PostgreSQLSchemaMetaTask) GetSQLs(args map[string]interface{}) ([]Head, []map[string] /* head name */ string, uint64, error) { + auditPlanSQLs, count, err := at.persist.GetAuditPlanSQLsByReq(args) + if err != nil { + return nil, nil, count, err + } + head, rows := buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs) + return head, rows, count, nil +} + +func buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs []*model.AuditPlanSQLListDetail) ([]Head, []map[string] /* head name */ string) { + head := []Head{ + { + Name: "sql", + Desc: "SQL语句", + Type: "sql", + }, + } + rows := make([]map[string]string, 0, len(auditPlanSQLs)) + for _, sql := range auditPlanSQLs { + rows = append(rows, map[string]string{ + "sql": sql.SQLContent, + }) + } + return head, rows +}