diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java new file mode 100644 index 000000000..465206251 --- /dev/null +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.falcon.bridge; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.falcon.model.FalconDataModelGenerator; +import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.fs.model.FSDataTypes; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.atlas.Util.EventUtil; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.FileSystemStorage; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.CatalogTable; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Workflow; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A Bridge Utility to register Falcon entities metadata to Atlas. + */ +public class FalconBridge { + private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class); + private static final String DATASET_STR = "-dataset"; + private static final String REPLICATED_STR = "-replicated"; + + /** + * Creates cluster entity + * + * @param cluster ClusterEntity + * @return cluster instance reference + */ + public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster, + final String user, + final Date timestamp) throws Exception { + LOG.info("Creating cluster Entity : {}", cluster.getName()); + + Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER_ENTITY.getName()); + + clusterRef.set(FalconDataModelGenerator.NAME, String.format("%s", cluster.getName())); + clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s", cluster.getName())); + + clusterRef.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + clusterRef.set(FalconDataModelGenerator.COLO, cluster.getColo()); + + clusterRef.set(FalconDataModelGenerator.USER, user); + + + if (StringUtils.isNotEmpty(cluster.getTags())) { + clusterRef.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(cluster.getTags())); + } + + return clusterRef; + } + + private static Referenceable createFeedDataset(Feed feed, Referenceable clusterReferenceable, + String user, + Date timestamp) throws Exception { + LOG.info("Creating feed dataset: {}", feed.getName()); + + Referenceable datasetReferenceable = new Referenceable(FalconDataTypes.FALCON_FEED_DATASET.getName()); + String feedName = getFeedDatasetName(feed.getName(), (String) clusterReferenceable.get(FalconDataModelGenerator.NAME)); + datasetReferenceable.set(FalconDataModelGenerator.NAME, feedName); + datasetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedName); + datasetReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + + datasetReferenceable.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); + datasetReferenceable.set(FalconDataModelGenerator.USER, user); + + if (StringUtils.isNotEmpty(feed.getTags())) { + datasetReferenceable.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(feed.getTags())); + } + + if (feed.getGroups() != null) { + datasetReferenceable.set(FalconDataModelGenerator.GROUPS, feed.getGroups()); + } + + return datasetReferenceable; + } + + public static List createFeedEntity(Feed feed, + ConfigurationStore falconStore, String user, + Date timestamp) throws Exception { + LOG.info("Creating feed : {}", feed.getName()); + + List entities = new ArrayList<>(); + + if (feed.getClusters() != null) { + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, + feedCluster.getName()); + // set cluster + Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); + + // input as hive_table or hdfs_path, output as falcon_feed dataset + List inputs = new ArrayList<>(); + List inputReferenceables = getInputEntities(cluster, + (Feed) falconStore.get(EntityType.FEED, feed.getName())); + if (inputReferenceables != null) { + entities.addAll(inputReferenceables); + inputs.add(inputReferenceables.get(inputReferenceables.size() - 1)); + } + + List outputs = new ArrayList<>(); + Referenceable outputReferenceable = createFeedDataset(feed, clusterReferenceable, user, timestamp); + if (outputReferenceable != null) { + entities.add(outputReferenceable); + outputs.add(outputReferenceable); + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + Referenceable feedReferenceable = new Referenceable(FalconDataTypes.FALCON_FEED_ENTITY.getName()); + feedReferenceable.set(FalconDataModelGenerator.NAME, String.format("%s", feed.getName())); + feedReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", feed.getName(), + cluster.getName())); + + feedReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + if (!inputs.isEmpty()) { + feedReferenceable.set(FalconDataModelGenerator.INPUTS, inputs); + } + if (!outputs.isEmpty()) { + feedReferenceable.set(FalconDataModelGenerator.OUTPUTS, outputs); + } + + feedReferenceable.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); + feedReferenceable.set(FalconDataModelGenerator.USER, user); + entities.add(feedReferenceable); + } + } + + } + return entities; + } + + public static List createReplicationFeedEntity(Feed feed, + ConfigurationStore falconStore, String user, + Date timestamp) throws Exception { + LOG.info("Creating replication feed : {}", feed.getName()); + + List entities = new ArrayList<>(); + // input as falcon_feed in source cluster, output as falcon_feed in target cluster + List inputs = new ArrayList<>(); + List outputs = new ArrayList<>(); + if (feed.getClusters() != null) { + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { + + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, + feedCluster.getName()); + + Referenceable clusterEntity = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterEntity); + + + if (ClusterType.SOURCE == feedCluster.getType()) { + + Referenceable inputReferenceable = createFeedDataset(feed, clusterEntity, user, timestamp); + if (inputReferenceable != null) { + entities.add(inputReferenceable); + inputs.add(inputReferenceable); + } + } + + if (ClusterType.TARGET == feedCluster.getType()) { + Referenceable outputReferenceable = createFeedDataset(feed, clusterEntity, user, timestamp); + if (outputReferenceable != null) { + entities.add(outputReferenceable); + outputs.add(outputReferenceable); + } + } + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + Referenceable replicationFeedReferenceable = new Referenceable(FalconDataTypes + .FALCON_REPLICATION_FEED_ENTITY.getName()); + + replicationFeedReferenceable.set(FalconDataModelGenerator.NAME, String.format("%s", feed.getName() + REPLICATED_STR)); + replicationFeedReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s", + feed.getName() + REPLICATED_STR)); + + replicationFeedReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + if (!inputs.isEmpty()) { + replicationFeedReferenceable.set(FalconDataModelGenerator.INPUTS, inputs); + } + if (!outputs.isEmpty()) { + replicationFeedReferenceable.set(FalconDataModelGenerator.OUTPUTS, outputs); + } + + replicationFeedReferenceable.set(FalconDataModelGenerator.USER, user); + entities.add(replicationFeedReferenceable); + } + + } + return entities; + } + + /** + * + * Creates process entity + * + * + * + * @param process process entity + * + * @param falconStore config store + * + * @param user falcon user + * + * @param timestamp timestamp of entity + * + * @return process instance reference + * + + */ + public static List createProcessEntity(org.apache.falcon.entity.v0.process.Process process, + ConfigurationStore falconStore, String user, + Date timestamp) throws Exception { + LOG.info("Creating process Entity : {}", process.getName()); + + // The requirement is for each cluster, create a process entity with name + // clustername.processname + List entities = new ArrayList<>(); + + if (process.getClusters() != null) { + + for (Cluster processCluster : process.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, processCluster.getName()); + Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); + + List inputs = new ArrayList<>(); + if (process.getInputs() != null) { + for (Input input : process.getInputs().getInputs()) { + Referenceable inputReferenceable = getFeedDataSetReference(getFeedDatasetName(input.getFeed(), + cluster.getName()), clusterReferenceable); + entities.add(inputReferenceable); + inputs.add(inputReferenceable); + } + } + + List outputs = new ArrayList<>(); + if (process.getOutputs() != null) { + for (Output output : process.getOutputs().getOutputs()) { + Referenceable outputReferenceable = getFeedDataSetReference(getFeedDatasetName(output.getFeed(), + cluster.getName()), clusterReferenceable); + entities.add(outputReferenceable); + outputs.add(outputReferenceable); + } + } + + if (!inputs.isEmpty() || !outputs.isEmpty()) { + + Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); + processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName())); + processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(), + cluster.getName())); + processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + + if (!inputs.isEmpty()) { + processEntity.set(FalconDataModelGenerator.INPUTS, inputs); + } + if (!outputs.isEmpty()) { + processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); + } + + // set cluster + processEntity.set(FalconDataModelGenerator.RUNSON, clusterReferenceable); + + // Set user + processEntity.set(FalconDataModelGenerator.USER, user); + + if (StringUtils.isNotEmpty(process.getTags())) { + processEntity.set(FalconDataModelGenerator.TAGS, + EventUtil.convertKeyValueStringToMap(process.getTags())); + } + + if (process.getPipelines() != null) { + processEntity.set(FalconDataModelGenerator.PIPELINES, process.getPipelines()); + } + + processEntity.set(FalconDataModelGenerator.WFPROPERTIES, getProcessEntityWFProperties(process.getWorkflow(), + process.getName())); + + entities.add(processEntity); + } + + } + } + return entities; + } + + private static List getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, + Feed feed) throws Exception { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); + + final CatalogTable table = getTable(feedCluster, feed); + if (table != null) { + CatalogStorage storage = new CatalogStorage(cluster, table); + return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), + storage.getTable().toLowerCase()); + } else { + List locations = FeedHelper.getLocations(feedCluster, feed); + Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); + final String pathUri = normalize(dataLocation.getPath()); + LOG.info("Registering DFS Path {} ", pathUri); + return fillHDFSDataSet(pathUri, cluster.getName()); + } + } + + private static CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) { + // check if table is overridden in cluster + if (cluster.getTable() != null) { + return cluster.getTable(); + } + + return feed.getTable(); + } + + private static List fillHDFSDataSet(final String pathUri, final String clusterName) { + List entities = new ArrayList<>(); + Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString()); + ref.set("path", pathUri); +// Path path = new Path(pathUri); +// ref.set("name", path.getName()); + //TODO - Fix after ATLAS-542 to shorter Name + ref.set(FalconDataModelGenerator.NAME, pathUri); + ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); + ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + entities.add(ref); + return entities; + } + + private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName) + throws Exception { + Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + return dbRef; + } + + private static List createHiveTableInstance(String clusterName, String dbName, + String tableName) throws Exception { + List entities = new ArrayList<>(); + Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + entities.add(dbRef); + + Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + tableRef.set(HiveDataModelGenerator.NAME, + HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + tableRef.set(HiveDataModelGenerator.TABLE_NAME, tableName); + tableRef.set(HiveDataModelGenerator.DB, dbRef); + entities.add(tableRef); + + return entities; + } + + private static Referenceable getClusterEntityReference(final String clusterName, + final String colo) { + LOG.info("Getting reference for entity {}", clusterName); + Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER_ENTITY.getName()); + clusterRef.set(FalconDataModelGenerator.NAME, String.format("%s", clusterName)); + clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName); + clusterRef.set(FalconDataModelGenerator.COLO, colo); + return clusterRef; + } + + + private static Referenceable getFeedDataSetReference(final String feedDatasetName, + Referenceable clusterReference) { + LOG.info("Getting reference for entity {}", feedDatasetName); + Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED_DATASET.getName()); + feedDatasetRef.set(FalconDataModelGenerator.NAME, feedDatasetName); + feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedDatasetName); + feedDatasetRef.set(FalconDataModelGenerator.STOREDIN, clusterReference); + return feedDatasetRef; + } + + private static Map getProcessEntityWFProperties(final Workflow workflow, + final String processName) { + Map wfProperties = new HashMap<>(); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), + ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), + workflow.getVersion()); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), + workflow.getEngine().value()); + + return wfProperties; + } + + private static String getFeedDatasetName(final String feedName, final String clusterName) { + return String.format("%s@%s", feedName + DATASET_STR, clusterName); + } + + private static String normalize(final String str) { + if (StringUtils.isBlank(str)) { + return null; + } + return str.toLowerCase().trim(); + } +} diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 97ee1a2e4..de8afc65f 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -21,32 +21,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasConstants; -import org.apache.atlas.falcon.model.FalconDataModelGenerator; -import org.apache.atlas.falcon.model.FalconDataTypes; -import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; -import org.apache.atlas.hive.model.HiveDataModelGenerator; -import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.atlas.Util.EventUtil; import org.apache.falcon.atlas.event.FalconEvent; import org.apache.falcon.atlas.publisher.FalconEventPublisher; -import org.apache.falcon.entity.CatalogStorage; -import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.security.CurrentUser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +71,13 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private static ConfigurationStore STORE; + private static enum Operation { + ADD, + UPDATE, + } + + private List messages = new ArrayList<>(); + static { try { // initialize the async facility to process hook calls. We don't @@ -144,150 +136,87 @@ public void run() { } } + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; + } + private void fireAndForget(FalconEvent event) throws Exception { LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); - notifyEntities(getAuthenticatedUser(), createEntities(event)); - } + Operation op = getOperation(event.getOperation()); - private String getAuthenticatedUser() { - String user = null; - try { - user = CurrentUser.getAuthenticatedUser(); - } catch (IllegalArgumentException e) { - LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser"); - } - return getUser(user, null); - } + switch (op) { + case ADD: + LOG.info("fireAndForget user:{}, ugi: {}", event.getUser(), event.getUgi()); + messages.add(new HookNotification.EntityCreateRequest(getAuthenticatedUser(event.getUser()), createEntities(event))); + break; - private List createEntities(FalconEvent event) throws Exception { - switch (event.getOperation()) { - case ADD_PROCESS: - return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp()); + case UPDATE: + LOG.info("fireAndForget user:{}, ugi: {}", event.getUser(), event.getUgi()); + messages.add(new HookNotification.EntityUpdateRequest(getAuthenticatedUser(event.getUser()), createEntities(event))); + break; } + notifyEntities(messages); + } - return null; + private String getAuthenticatedUser(final String user) { + return getUser(user, null); } - /** - + * Creates process entity - + * - + * @param event process entity event - + * @return process instance reference - + */ - public List createProcessInstance(Process process, String user, long timestamp) throws Exception { - LOG.info("Creating process Instance : {}", process.getName()); - - // The requirement is for each cluster, create a process entity with name - // clustername.processname + private List createEntities(FalconEvent event) throws Exception { List entities = new ArrayList<>(); - if (process.getClusters() != null) { - - for (Cluster processCluster : process.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName()); - - List inputs = new ArrayList<>(); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - List clusterInputs = getInputOutputEntity(cluster, input.getFeed()); - if (clusterInputs != null) { - entities.addAll(clusterInputs); - inputs.add(clusterInputs.get(clusterInputs.size() - 1)); - } - } - } - - List outputs = new ArrayList<>(); - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - List clusterOutputs = getInputOutputEntity(cluster, output.getFeed()); - if (clusterOutputs != null) { - entities.addAll(clusterOutputs); - outputs.add(clusterOutputs.get(clusterOutputs.size() - 1)); - } - } - } - - if (!inputs.isEmpty() || !outputs.isEmpty()) { - Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); - processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(), - cluster.getName())); - processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(), - cluster.getName())); - processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); - if (!inputs.isEmpty()) { - processEntity.set(FalconDataModelGenerator.INPUTS, inputs); - } - if (!outputs.isEmpty()) { - processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); - } - processEntity.set(FalconDataModelGenerator.USER, user); - - if (StringUtils.isNotEmpty(process.getTags())) { - processEntity.set(FalconDataModelGenerator.TAGS, - EventUtil.convertKeyValueStringToMap(process.getTags())); - } - entities.add(processEntity); - } + switch (event.getOperation()) { + case ADD_CLUSTER: + case UPDATE_CLUSTER: + entities.add(FalconBridge.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), event.getUser(), + event.getTimestamp())); + LOG.info("Sent to kafka"); + break; - } + case ADD_PROCESS: + case UPDATE_PROCESS: + entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE, + event.getUser(), event.getTimestamp())); + break; + + case ADD_FEED: + case UPDATE_FEED: + entities.addAll(FalconBridge.createFeedEntity((Feed) event.getEntity(), STORE, + event.getUser(), event.getTimestamp())); + break; + + case ADD_REPLICATION_FEED: + case UPDATE_REPLICATION_FEED: + entities.addAll(FalconBridge.createReplicationFeedEntity((Feed) event.getEntity(), STORE, + event.getUser(), event.getTimestamp())); + break; + + default: + throw new Exception("Falcon operation " + event.getOperation() + " is not valid or supported"); } return entities; } - private List getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception { - Feed feed = STORE.get(EntityType.FEED, feedName); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); - - final CatalogTable table = getTable(feedCluster, feed); - if (table != null) { - CatalogStorage storage = new CatalogStorage(cluster, table); - return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), - storage.getTable().toLowerCase()); - } + private static Operation getOperation(final FalconEvent.OPERATION op) throws Exception { + switch (op) { + case ADD_CLUSTER: + case ADD_FEED: + case ADD_PROCESS: + case ADD_REPLICATION_FEED: + return Operation.ADD; - return null; - } + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + case UPDATE_REPLICATION_FEED: + return Operation.UPDATE; - private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) { - // check if table is overridden in cluster - if (cluster.getTable() != null) { - return cluster.getTable(); + default: + throw new Exception("Falcon operation " + op + " is not valid or supported"); } - - return feed.getTable(); - } - - private Referenceable createHiveDatabaseInstance(String clusterName, String dbName) - throws Exception { - Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - dbRef.set(HiveDataModelGenerator.NAME, dbName); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); - return dbRef; - } - - private List createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception { - List entities = new ArrayList<>(); - Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); - entities.add(dbRef); - - Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - tableRef.set(HiveDataModelGenerator.NAME, - HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); - tableRef.set(HiveDataModelGenerator.TABLE_NAME, tableName); - tableRef.set(HiveDataModelGenerator.DB, dbRef); - entities.add(tableRef); - - return entities; - } - - @Override - protected String getNumberOfRetriesPropertyKey() { - return HOOK_NUM_RETRIES; } } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java index 397dea498..7f44161bf 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java @@ -58,14 +58,19 @@ public class FalconDataModelGenerator { public static final String NAME = "name"; public static final String TIMESTAMP = "timestamp"; + public static final String COLO = "collocated"; public static final String USER = "owned-by"; public static final String TAGS = "tag-classification"; + public static final String GROUPS = "grouped-as"; + public static final String PIPELINES = "pipeline"; + public static final String WFPROPERTIES = "wf-properties"; + public static final String RUNSON = "runs-on"; + public static final String STOREDIN = "stored-in"; // multiple inputs and outputs for process public static final String INPUTS = "inputs"; public static final String OUTPUTS = "outputs"; - public FalconDataModelGenerator() { classTypeDefinitions = new HashMap<>(); enumTypeDefinitionMap = new HashMap<>(); @@ -74,7 +79,13 @@ public FalconDataModelGenerator() { public void createDataModel() throws AtlasException { LOG.info("Generating the Falcon Data Model"); + + // classes + createClusterEntityClass(); createProcessEntityClass(); + createFeedEntityClass(); + createFeedDatasetClass(); + createReplicationFeedEntityClass(); } @@ -103,26 +114,101 @@ private ImmutableList> getTraitTypeDefinit return ImmutableList.of(); } + private void createClusterEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + new AttributeDefinition(COLO, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + // map of tags + new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null),}; + + HierarchicalTypeDefinition definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_CLUSTER_ENTITY.getName(), null, + ImmutableSet.of(AtlasClient.INFRASTRUCTURE_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_CLUSTER_ENTITY.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_CLUSTER_ENTITY.getName()); + } + + private void createFeedEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER_ENTITY.getName(), Multiplicity.REQUIRED, + false, null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null)}; + + HierarchicalTypeDefinition definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_ENTITY.getName(), null, + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_ENTITY.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_ENTITY.getName()); + } + + private void createFeedDatasetClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER_ENTITY.getName(), Multiplicity.REQUIRED, + false, null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, + null), + new AttributeDefinition(GROUPS, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + // map of tags + new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null),}; + + HierarchicalTypeDefinition definition = + new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_DATASET.getName(), null, + ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_DATASET.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_DATASET.getName()); + } + + + private void createReplicationFeedEntityClass() throws AtlasException { + AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, + null), + new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, + null)}; + + HierarchicalTypeDefinition definition = + new HierarchicalTypeDefinition<>(ClassType.class, + FalconDataTypes.FALCON_REPLICATION_FEED_ENTITY.getName(), null, + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + classTypeDefinitions.put(FalconDataTypes.FALCON_REPLICATION_FEED_ENTITY.getName(), definition); + LOG.debug("Created definition for {}", FalconDataTypes.FALCON_REPLICATION_FEED_ENTITY.getName()); + } private void createProcessEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false, + new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null), + new AttributeDefinition(RUNSON, FalconDataTypes.FALCON_CLUSTER_ENTITY.getName(), Multiplicity.REQUIRED, + false, null), new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), // map of tags new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), + Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(PIPELINES, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + // wf properties + new AttributeDefinition(WFPROPERTIES, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), Multiplicity.OPTIONAL, false, null),}; HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), null, - ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition); LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); } - public String getModelAsJson() throws AtlasException { createDataModel(); return getDataModelAsJSON(); diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java index f1f350b18..ab8c1be41 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java @@ -22,8 +22,11 @@ * Falcon Data Types for model and bridge. */ public enum FalconDataTypes { - - + // Classes + FALCON_CLUSTER_ENTITY("falcon_cluster"), + FALCON_FEED_ENTITY("falcon_feed"), + FALCON_FEED_DATASET("falcon_feed_dataset"), + FALCON_REPLICATION_FEED_ENTITY("falcon_replication_feed"), FALCON_PROCESS_ENTITY("falcon_process"), ; diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java index 7f67407c2..44bf46fae 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java @@ -22,8 +22,6 @@ import org.apache.falcon.FalconException; import org.apache.falcon.security.CurrentUser; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -33,7 +31,6 @@ * Falcon event util */ public final class EventUtil { - private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class); private EventUtil() {} @@ -55,7 +52,6 @@ public static Map convertKeyValueStringToMap(final String keyVal return keyValueMap; } - public static UserGroupInformation getUgi() throws FalconException { UserGroupInformation ugi; try { diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java index e587e7305..8ba08a8c1 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java @@ -21,6 +21,8 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.hadoop.security.UserGroupInformation; +import java.util.Date; + /** * Falcon event to interface with Atlas Service. */ @@ -40,7 +42,14 @@ public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOp } public enum OPERATION { - ADD_PROCESS, UPDATE_PROCESS + ADD_CLUSTER, + UPDATE_CLUSTER, + ADD_FEED, + UPDATE_FEED, + ADD_REPLICATION_FEED, + UPDATE_REPLICATION_FEED, + ADD_PROCESS, + UPDATE_PROCESS, } public String getUser() { @@ -55,8 +64,8 @@ public OPERATION getOperation() { return operation; } - public long getTimestamp() { - return timestamp; + public Date getTimestamp() { + return new Date(timestamp); } public Entity getEntity() { diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java index 373846dfb..60af2e4f7 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java @@ -19,6 +19,7 @@ package org.apache.falcon.atlas.service; import org.apache.atlas.falcon.hook.FalconHook; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.atlas.Util.EventUtil; import org.apache.falcon.atlas.event.FalconEvent; @@ -26,7 +27,8 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.service.FalconService; import org.apache.hadoop.security.UserGroupInformation; @@ -67,12 +69,20 @@ public void destroy() throws FalconException { public void onAdd(Entity entity) throws FalconException { EntityType entityType = entity.getEntityType(); switch (entityType) { + case CLUSTER: + addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER); + break; case PROCESS: - addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS); + addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS); + break; + case FEED: + FalconEvent.OPERATION operation = isReplicationFeed((Feed) entity) ? + FalconEvent.OPERATION.ADD_REPLICATION_FEED : + FalconEvent.OPERATION.ADD_FEED; + addEntity(entity, operation); break; - default: - LOG.debug("Entity type not processed " + entityType); + LOG.debug("Entity type not processed {}", entityType); } } @@ -84,12 +94,20 @@ public void onRemove(Entity entity) throws FalconException { public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { EntityType entityType = newEntity.getEntityType(); switch (entityType) { + case CLUSTER: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER); + break; case PROCESS: - addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); + break; + case FEED: + FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ? + FalconEvent.OPERATION.UPDATE_REPLICATION_FEED : + FalconEvent.OPERATION.UPDATE_FEED; + addEntity(newEntity, operation); break; - default: - LOG.debug("Entity type not processed " + entityType); + LOG.debug("Entity type not processed {}", entityType); } } @@ -99,8 +117,8 @@ public void onReload(Entity entity) throws FalconException { onAdd(entity); } - private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException { - LOG.info("Adding process entity to Atlas: {}", entity.getName()); + private void addEntity(Entity entity, FalconEvent.OPERATION operation) throws FalconException { + LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName()); try { String user = entity.getACL() != null ? entity.getACL().getOwner() : @@ -112,4 +130,20 @@ private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) t throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); } } + + private static boolean isReplicationFeed(final Feed entity) { + String srcCluster = null; + String tgtCluster = null; + + // Get the clusters + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) { + if (ClusterType.SOURCE == feedCluster.getType()) { + srcCluster = feedCluster.getName(); + } else if (ClusterType.TARGET == feedCluster.getType()) { + tgtCluster = feedCluster.getName(); + } + } + + return StringUtils.isNotBlank(srcCluster) && StringUtils.isNotBlank(tgtCluster); + } } diff --git a/docs/src/site/twiki/Bridge-Falcon.twiki b/docs/src/site/twiki/Bridge-Falcon.twiki index 4f5e6764c..9e7a570be 100644 --- a/docs/src/site/twiki/Bridge-Falcon.twiki +++ b/docs/src/site/twiki/Bridge-Falcon.twiki @@ -3,6 +3,10 @@ ---++ Falcon Model The default falcon modelling is available in org.apache.atlas.falcon.model.FalconDataModelGenerator. It defines the following types: +falcon_cluster(ClassType) - super types [Infrastructure] - attributes [timestamp, collocated, owned-by, tags] +falcon_feed(ClassType) - super types [Process] - attributes [timestamp, stored-in, tags] +falcon_feed_dataset(ClassType) - super types [DataSet] - attributes [timestamp, stored-in, owned-by, groups, tags] +falcon_replication_feed(ClassType) - super types [Process] - attributes [timestamp, owned-by] falcon_process(ClassType) - super types [Process] - attributes [timestamp, owned-by, tags] @@ -10,6 +14,10 @@ One falcon_process entity is created for every cluster that the falcon process i The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. The unique attributes are: * falcon_process - attribute name - @ + * falcon_cluster - attribute name - + * falcon_feed - attribute name - -@ + * falcon_feed_dataset - attribute name - -dataset@ + * falcon_replication_feed - attribute name - -replicated@ ---++ Falcon Hook Falcon supports listeners on falcon entity submission. This is used to add entities in Atlas using the model defined in org.apache.atlas.falcon.model.FalconDataModelGenerator. @@ -33,5 +41,4 @@ Refer [[Configuration][Configuration]] for notification related configurations ---++ Limitations - * Only the process entity creation is currently handled. This model will be expanded to include all Falcon metadata * In falcon cluster entity, cluster name used should be uniform across components like hive, falcon, sqoop etc. If used with ambari, ambari cluster name should be used for cluster entity