diff --git a/server/pom.xml b/server/pom.xml index 74fa888f..7bd4df0b 100755 --- a/server/pom.xml +++ b/server/pom.xml @@ -24,6 +24,8 @@ UTF-8 no-exclusions local + 2.10 + 1.3.0 @@ -444,6 +446,48 @@ 7.0.6 + + org.apache.spark + spark-core_${scala.version} + ${spark.version} + + + org.slf4j + jcl-over-slf4j + + + + + + org.apache.spark + spark-mllib_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-graphx_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-streaming_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-streaming-kafka_${scala.version} + ${spark.version} + + + + org.apache.spark + spark-sql_${scala.version} + ${spark.version} + + diff --git a/server/resources/common/log4j.properties b/server/resources/common/log4j.properties old mode 100755 new mode 100644 index 4e3a5f72..715ef103 --- a/server/resources/common/log4j.properties +++ b/server/resources/common/log4j.properties @@ -7,20 +7,10 @@ net.playtxt.logbase.base=${catalina.home}/logs/seldon-server/base net.playtxt.logbase.actions=${catalina.home}/logs/seldon-server/actions # The default category logs debug and above to the file appender. -log4j.rootLogger=ERROR, api +log4j.rootLogger=DEBUG, api -log4j.logger.io.seldon.api.state.zk.ZkClientConfigHandler=INFO -log4j.logger.io.seldon.api.state.zk.ZkGlobalConfigHandler=INFO -log4j.logger.io.seldon.ar.AssocRuleManager=INFO -log4j.logger.io.seldon.cc.UserClusterManager=INFO -log4j.logger.io.seldon.memcache.DogpileHandler=INFO -log4j.logger.io.seldon.memcache.MemCachePeer=INFO -log4j.logger.io.seldon.mf.MfFeaturesManager=INFO -log4j.logger.io.seldon.recommendation.model.ModelManager=INFO -log4j.logger.io.seldon.tags.UserTagAffinityManager=INFO - -log4j.logger.org.springframework=INFO +log4j.logger.org.springframework=DEBUG # The API Stats log log4j.additivity.APILogger=false @@ -45,7 +35,7 @@ log4j.category.org.apache.axis=INFO, api # Info and above from jpox log4j.additivity.datanucleus=false -log4j.category.datanucleus=ERROR, jpox +log4j.category.datanucleus=DEBUG, jpox # Appender used for API record keeping log4j.appender.api=org.apache.log4j.DailyRollingFileAppender @@ -55,7 +45,6 @@ log4j.appender.api.layout=org.apache.log4j.PatternLayout log4j.appender.api.layout.ConversionPattern=%d %p %t %C{1} [%X{consumer}] [%X{user}] [%X{item}] - %m%n - # Appender used for jpox log4j.appender.jpox=org.apache.log4j.DailyRollingFileAppender log4j.appender.jpox.File=${net.playtxt.logbase.base}/jpox.log @@ -64,24 +53,24 @@ log4j.appender.jpox.layout=org.apache.log4j.PatternLayout log4j.appender.jpox.layout.ConversionPattern=%d %p %t - %m%n # DataNucleus Categories -log4j.category.DataNucleus.JDO=ERROR, jpox -log4j.category.DataNucleus.Cache=ERROR, jpox -log4j.category.DataNucleus.Datastore=ERROR, jpox -log4j.category.DataNucleus.Connection=ERROR, jpox -log4j.category.DataNucleus.MetaData=ERROR, jpox -log4j.category.DataNucleus.General=ERROR, jpox -log4j.category.DataNucleus.Utility=ERROR, jpox -log4j.category.DataNucleus.Transaction=ERROR, jpox -log4j.category.DataNucleus.ClassLoading=ERROR, jpox -log4j.category.DataNucleus.Plugin=ERROR, jpox -log4j.category.DataNucleus.Store.Poid=ERROR, jpox -log4j.category.DataNucleus.RDBMS=ERROR, jpox -log4j.category.DataNucleus.Enhancer=ERROR, jpox +log4j.category.DataNucleus.JDO=DEBUG, jpox +log4j.category.DataNucleus.Cache=DEBUG, jpox +log4j.category.DataNucleus.Datastore=DEBUG, jpox +log4j.category.DataNucleus.Connection=DEBUG, jpox +log4j.category.DataNucleus.MetaData=DEBUG, jpox +log4j.category.DataNucleus.General=DEBUG, jpox +log4j.category.DataNucleus.Utility=DEBUG, jpox +log4j.category.DataNucleus.Transaction=DEBUG, jpox +log4j.category.DataNucleus.ClassLoading=DEBUG, jpox +log4j.category.DataNucleus.Plugin=DEBUG, jpox +log4j.category.DataNucleus.Store.Poid=DEBUG, jpox +log4j.category.DataNucleus.RDBMS=DEBUG, jpox +log4j.category.DataNucleus.Enhancer=DEBUG, jpox log4j.category.DataNucleus.SchemaTool=DEBUG, jpox -log4j.category.DataNucleus.Persistence=ERROR, jpox -log4j.category.DataNucleus.Query=ERROR, jpox -log4j.category.DataNucleus.Lifecycle=ERROR, jpox -log4j.category.DataNucleus.Reachability=ERROR, jpox +log4j.category.DataNucleus.Persistence=DEBUG, jpox +log4j.category.DataNucleus.Query=DEBUG, jpox +log4j.category.DataNucleus.Lifecycle=DEBUG, jpox +log4j.category.DataNucleus.Reachability=DEBUG, jpox log4j.category.com.danga.MemCached=WARN @@ -92,7 +81,7 @@ log4j.category.com.danga.MemCached=WARN # log4j.category.ApiStatsLogger=DEBUG, apistats -log4j.additivity.ApiStatsLogger=false +log4j.additivity.ApiStatsLogger=true log4j.appender.apistats=org.apache.log4j.DailyRollingFileAppender log4j.appender.apistats.File=${net.playtxt.logbase.base}/restapi.log log4j.appender.apistats.DatePattern='.'yyyy-MM-dd @@ -151,4 +140,3 @@ log4j.appender.events.File=${net.playtxt.logbase.actions}/events.log log4j.appender.events.DatePattern='.'yyyy-MM-dd log4j.appender.events.layout=org.apache.log4j.PatternLayout log4j.appender.events.layout.ConversionPattern=%m%n - diff --git a/server/src/io/seldon/general/ItemDemographic_PK.java b/server/src/io/seldon/general/ItemDemographic_PK.java new file mode 100644 index 00000000..b383783f --- /dev/null +++ b/server/src/io/seldon/general/ItemDemographic_PK.java @@ -0,0 +1,40 @@ +package io.seldon.general; + +import java.io.Serializable; + +public class ItemDemographic_PK implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public int demoId; + public long itemId; + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if(obj!=null){ + if(obj instanceof ItemDemographic_PK){ + ItemDemographic_PK target = (ItemDemographic_PK)obj; + if(this.demoId==target.demoId && + this.itemId == target.itemId){ + return true; + }else{ + return false; + } + } + } + return super.equals(obj); + } + + @Override + public String toString() { + return String.format("%s-%s", demoId, itemId); + } +} diff --git a/server/src/io/seldon/general/UserDimension_PK.java b/server/src/io/seldon/general/UserDimension_PK.java new file mode 100644 index 00000000..3a75ff9c --- /dev/null +++ b/server/src/io/seldon/general/UserDimension_PK.java @@ -0,0 +1,38 @@ +package io.seldon.general; + +import java.io.Serializable; + +public class UserDimension_PK implements Serializable{ + /** + * + */ + private static final long serialVersionUID = 1L; + public int dimId; + public long userId; + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if(obj!=null){ + if(obj instanceof UserDimension_PK){ + UserDimension_PK target = (UserDimension_PK)obj; + if(this.dimId==target.dimId && + this.userId == target.userId){ + return true; + }else{ + return false; + } + } + } + return super.equals(obj); + } + + @Override + public String toString() { + return String.format("%s-%s", dimId, userId); + } +} diff --git a/server/src/io/seldon/naivebayes/NaiveBayesData.java b/server/src/io/seldon/naivebayes/NaiveBayesData.java new file mode 100644 index 00000000..836c6e1e --- /dev/null +++ b/server/src/io/seldon/naivebayes/NaiveBayesData.java @@ -0,0 +1,9 @@ +package io.seldon.naivebayes; + +/** + * Created by Vincent on 2015-7-15. + */ +public class NaiveBayesData { + public String[] attributeNames; + public NaiveBayesModelData modelData; +} diff --git a/server/src/io/seldon/naivebayes/NaiveBayesManager.java b/server/src/io/seldon/naivebayes/NaiveBayesManager.java new file mode 100644 index 00000000..31fa851f --- /dev/null +++ b/server/src/io/seldon/naivebayes/NaiveBayesManager.java @@ -0,0 +1,90 @@ +package io.seldon.naivebayes; + +import io.seldon.mf.PerClientExternalLocationListener; +import io.seldon.resources.external.ExternalResourceStreamer; +import io.seldon.resources.external.NewResourceNotifier; +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.io.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +@Component +public class NaiveBayesManager implements PerClientExternalLocationListener{ + + private static Logger logger = Logger.getLogger(NaiveBayesManager.class); + private final ConcurrentMap clientStores = new ConcurrentHashMap<>(); + private NewResourceNotifier notifier; + private final ExternalResourceStreamer featuresFileHandler; + public static final String NB_NEW_LOC_PATTERN = "naivebayes"; + private final Executor executor = Executors.newFixedThreadPool(5); + + @Autowired + public NaiveBayesManager(ExternalResourceStreamer featuresFileHandler, + NewResourceNotifier notifier){ + logger.info(String.format( + "Construct NaiveBayesManager with: featuresFileHandler=%s, notifier=%s", + featuresFileHandler, notifier)); + this.featuresFileHandler = featuresFileHandler; + this.notifier = notifier; + this.notifier.addListener(NB_NEW_LOC_PATTERN, this); + } + + @PostConstruct + public void init(){ + + } + + public void reloadFeatures(final String location, final String client){ + executor.execute(new Runnable() { + @Override + public void run() { + logger.info("Reloading naive bayes features for client: "+ client); + try { + InputStream modelDataInputStream = + featuresFileHandler.getResourceStream(location + "/NaiveBayesModel.json"); + InputStreamReader modelDataInputStreamReader = + new InputStreamReader(modelDataInputStream); + BufferedReader modelDataBufferedReader = new BufferedReader(modelDataInputStreamReader); + String modelData = modelDataBufferedReader.readLine(); + logger.info("Naive bayes model data load completed!"); + if(StringUtils.isNotEmpty(modelData)) { + logger.info("Create naive bayes model..."); + NaiveBayesStore store = NaiveBayesStore.createNaiveBayesStore(modelData); + logger.info("Naive bayes model created!"); + clientStores.put(client, store); + }else{ + logger.warn("Naive bayes features is empty!"); + } + modelDataBufferedReader.close(); + } catch (FileNotFoundException e) { + logger.error("Couldn't reloadFeatures for client "+ client, e); + } catch (IOException e) { + logger.error("Couldn't reloadFeatures for client "+ client, e); + } + } + }); + } + + public NaiveBayesStore getClientStore(String client){ + return clientStores.get(client); + } + + @Override + public void newClientLocation(String client, String configValue, + String configKey) { + reloadFeatures(configValue,client); + } + + @Override + public void clientLocationDeleted(String client, String nodePattern) { + clientStores.remove(client); + } + +} diff --git a/server/src/io/seldon/naivebayes/NaiveBayesModelData.java b/server/src/io/seldon/naivebayes/NaiveBayesModelData.java new file mode 100644 index 00000000..9a695b13 --- /dev/null +++ b/server/src/io/seldon/naivebayes/NaiveBayesModelData.java @@ -0,0 +1,10 @@ +package io.seldon.naivebayes; + +/** + * Created by Vincent on 2015-7-15. + */ +public class NaiveBayesModelData { + public double[] labels; + public double[] pi; + public double[][] theta; +} diff --git a/server/src/io/seldon/naivebayes/NaiveBayesRecommender.java b/server/src/io/seldon/naivebayes/NaiveBayesRecommender.java new file mode 100644 index 00000000..c72f5fee --- /dev/null +++ b/server/src/io/seldon/naivebayes/NaiveBayesRecommender.java @@ -0,0 +1,99 @@ +package io.seldon.naivebayes; + +import io.seldon.api.APIException; +import io.seldon.clustering.recommender.ItemRecommendationAlgorithm; +import io.seldon.clustering.recommender.ItemRecommendationResultSet; +import io.seldon.clustering.recommender.RecommendationContext; +import io.seldon.db.jdo.JDOFactory; +import io.seldon.general.UserAttributePeer; +import io.seldon.general.UserPeer; +import io.seldon.general.jdo.SqlUserAttributePeer; +import io.seldon.general.jdo.SqlUserPeer; +import org.apache.log4j.Logger; +import org.apache.spark.mllib.linalg.DenseVector; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.jdo.PersistenceManager; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Component +public class NaiveBayesRecommender implements ItemRecommendationAlgorithm { + private static Logger logger = Logger.getLogger(NaiveBayesRecommender.class); + private static final String name = NaiveBayesRecommender.class.getSimpleName(); + + @Autowired + private NaiveBayesManager naiveBayesManager; + + private PersistenceManager getPersistenceManager(String client) { + PersistenceManager pm = JDOFactory.get().getPersistenceManager(client); + if (pm == null) { + throw new APIException(APIException.INTERNAL_DB_ERROR); + } + return pm; + } + + private UserPeer getUserPeer(String client) { + PersistenceManager pm = getPersistenceManager(client); + return new SqlUserPeer(pm); + } + + private UserAttributePeer getUserAttributePeer(String client) { + PersistenceManager pm = getPersistenceManager(client); + return new SqlUserAttributePeer(pm); + } + + @Override + public ItemRecommendationResultSet recommend(String clientId, Long userId, + Set dimensions, int maxRecsCount, + RecommendationContext ctxt, List recentItemInteractions) { + UserAttributePeer userAttributePeer = getUserAttributePeer(clientId); + + Map userAttributesNameMap = userAttributePeer.getUserAttributesName(userId); + + for(String key : userAttributesNameMap.keySet()){ + String value = userAttributesNameMap.get(key); + logger.info(String.format("userAttributesNameMap: %s=%s", + key, value)); + } + + NaiveBayesStore store = naiveBayesManager.getClientStore(clientId); + + String[] storeAttributeNames = store.getAttributeNames(); + + double[] userAttributeValueArray = new double[storeAttributeNames.length]; + for (int index = 0; index < userAttributeValueArray.length; index++) { + String attributeName = storeAttributeNames[index]; + Double attributeValue = Double.parseDouble(userAttributesNameMap.get(attributeName)); + if(attributeValue == null) attributeValue = 0d; + userAttributeValueArray[index] = attributeValue; + } + + StringBuilder sb = new StringBuilder(1024); + for (int index = 0; index < userAttributeValueArray.length; index++) { + if (index > 0) { + sb.append(","); + } + sb.append(String.format("%f", userAttributeValueArray[index])); + } + logger.debug(String.format("UserAttributeValueArray: (%s)", sb.toString())); + + NaiveBayesStore clientStore = naiveBayesManager.getClientStore(clientId); + DenseVector userVector = new DenseVector(userAttributeValueArray); + long itemId = Math.round(clientStore.getModel().predict(userVector)); + logger.info(String.format("ItemId: %d", itemId)); + + List recommendations = new ArrayList<>(); + recommendations.add(new ItemRecommendationResultSet.ItemRecommendationResult(itemId, 1f)); + return new ItemRecommendationResultSet(recommendations, name); + } + + @Override + public String name() { + return name; + } + +} diff --git a/server/src/io/seldon/naivebayes/NaiveBayesStore.java b/server/src/io/seldon/naivebayes/NaiveBayesStore.java new file mode 100644 index 00000000..322939d9 --- /dev/null +++ b/server/src/io/seldon/naivebayes/NaiveBayesStore.java @@ -0,0 +1,44 @@ +package io.seldon.naivebayes; + +import org.apache.log4j.Logger; +import org.apache.spark.mllib.classification.NaiveBayesModel; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class NaiveBayesStore { + + private static Logger logger = Logger.getLogger(NaiveBayesStore.class); + + private NaiveBayesModel model; + private NaiveBayesData naiveBayesData; + + public static NaiveBayesStore createNaiveBayesStore(String data){ + + try { + ObjectMapper mapper = new ObjectMapper(); + NaiveBayesData naiveBayesData = mapper.readValue(data, NaiveBayesData.class); + NaiveBayesStore naiveBayesStore = new NaiveBayesStore(naiveBayesData); + return naiveBayesStore; + } catch (Throwable t) { + logger.error(null,t); + } + return null; + } + + public NaiveBayesStore(NaiveBayesData data){ + logger.info("NaiveBayesStore construct..."); + this.naiveBayesData = data; + this.model = new NaiveBayesModel(naiveBayesData.modelData.labels, + naiveBayesData.modelData.pi, naiveBayesData.modelData.theta); + logger.info("NaiveBayesStore construct completed!"); + } + + public String[] getAttributeNames(){ + return naiveBayesData.attributeNames; + } + + public NaiveBayesModel getModel(){ + return this.model; + } + +}