Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,17 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.atlas.Util.EventUtil;
import org.apache.falcon.atlas.event.FalconEvent;
import org.apache.falcon.atlas.publisher.FalconEventPublisher;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,6 +71,13 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {

private static ConfigurationStore STORE;

private static enum Operation {
ADD,
UPDATE,
}

private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();

static {
try {
// initialize the async facility to process hook calls. We don't
Expand Down Expand Up @@ -144,150 +136,87 @@ public void run() {
}
}

@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}

private void fireAndForget(FalconEvent event) throws Exception {
LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());

notifyEntities(getAuthenticatedUser(), createEntities(event));
}
Operation op = getOperation(event.getOperation());

private String getAuthenticatedUser() {
String user = null;
try {
user = CurrentUser.getAuthenticatedUser();
} catch (IllegalArgumentException e) {
LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser");
}
return getUser(user, null);
}
switch (op) {
case ADD:
LOG.info("fireAndForget user:{}, ugi: {}", event.getUser(), event.getUgi());
messages.add(new HookNotification.EntityCreateRequest(getAuthenticatedUser(event.getUser()), createEntities(event)));
break;

private List<Referenceable> createEntities(FalconEvent event) throws Exception {
switch (event.getOperation()) {
case ADD_PROCESS:
return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp());
case UPDATE:
LOG.info("fireAndForget user:{}, ugi: {}", event.getUser(), event.getUgi());
messages.add(new HookNotification.EntityUpdateRequest(getAuthenticatedUser(event.getUser()), createEntities(event)));
break;
}
notifyEntities(messages);
}

return null;
private String getAuthenticatedUser(final String user) {
return getUser(user, null);
}

/**
+ * Creates process entity
+ *
+ * @param event process entity event
+ * @return process instance reference
+ */
public List<Referenceable> createProcessInstance(Process process, String user, long timestamp) throws Exception {
LOG.info("Creating process Instance : {}", process.getName());

// The requirement is for each cluster, create a process entity with name
// clustername.processname
private List<Referenceable> createEntities(FalconEvent event) throws Exception {
List<Referenceable> entities = new ArrayList<>();

if (process.getClusters() != null) {

for (Cluster processCluster : process.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName());

List<Referenceable> inputs = new ArrayList<>();
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
if (clusterInputs != null) {
entities.addAll(clusterInputs);
inputs.add(clusterInputs.get(clusterInputs.size() - 1));
}
}
}

List<Referenceable> outputs = new ArrayList<>();
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
if (clusterOutputs != null) {
entities.addAll(clusterOutputs);
outputs.add(clusterOutputs.get(clusterOutputs.size() - 1));
}
}
}

if (!inputs.isEmpty() || !outputs.isEmpty()) {
Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(),
cluster.getName()));
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(),
cluster.getName()));
processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
if (!inputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
}
if (!outputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
}
processEntity.set(FalconDataModelGenerator.USER, user);

if (StringUtils.isNotEmpty(process.getTags())) {
processEntity.set(FalconDataModelGenerator.TAGS,
EventUtil.convertKeyValueStringToMap(process.getTags()));
}
entities.add(processEntity);
}
switch (event.getOperation()) {
case ADD_CLUSTER:
case UPDATE_CLUSTER:
entities.add(FalconBridge.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), event.getUser(),
event.getTimestamp()));
LOG.info("Sent to kafka");
break;

}
case ADD_PROCESS:
case UPDATE_PROCESS:
entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE,
event.getUser(), event.getTimestamp()));
break;

case ADD_FEED:
case UPDATE_FEED:
entities.addAll(FalconBridge.createFeedEntity((Feed) event.getEntity(), STORE,
event.getUser(), event.getTimestamp()));
break;

case ADD_REPLICATION_FEED:
case UPDATE_REPLICATION_FEED:
entities.addAll(FalconBridge.createReplicationFeedEntity((Feed) event.getEntity(), STORE,
event.getUser(), event.getTimestamp()));
break;

default:
throw new Exception("Falcon operation " + event.getOperation() + " is not valid or supported");
}

return entities;
}

private List<Referenceable> getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception {
Feed feed = STORE.get(EntityType.FEED, feedName);
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());

final CatalogTable table = getTable(feedCluster, feed);
if (table != null) {
CatalogStorage storage = new CatalogStorage(cluster, table);
return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(),
storage.getTable().toLowerCase());
}
private static Operation getOperation(final FalconEvent.OPERATION op) throws Exception {
switch (op) {
case ADD_CLUSTER:
case ADD_FEED:
case ADD_PROCESS:
case ADD_REPLICATION_FEED:
return Operation.ADD;

return null;
}
case UPDATE_CLUSTER:
case UPDATE_FEED:
case UPDATE_PROCESS:
case UPDATE_REPLICATION_FEED:
return Operation.UPDATE;

private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
// check if table is overridden in cluster
if (cluster.getTable() != null) {
return cluster.getTable();
default:
throw new Exception("Falcon operation " + op + " is not valid or supported");
}

return feed.getTable();
}

private Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
return dbRef;
}

private List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception {
List<Referenceable> entities = new ArrayList<>();
Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
entities.add(dbRef);

Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set(HiveDataModelGenerator.NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(HiveDataModelGenerator.TABLE_NAME, tableName);
tableRef.set(HiveDataModelGenerator.DB, dbRef);
entities.add(tableRef);

return entities;
}

@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
}

Loading