diff --git a/distro/profile.xml b/distro/profile.xml index 3038258..940b176 100644 --- a/distro/profile.xml +++ b/distro/profile.xml @@ -1,12 +1,12 @@ - Library + Plugin {description} ${serviceClass} ${artifactId} - ${version} + 1.0.0 {description} @@ -17,7 +17,7 @@ ${artifactId} ${version} - Library + Plugin ${build.finalName}.jar diff --git a/pom.xml b/pom.xml index c30e74b..8cfaed6 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,11 @@ astyanax 1.56.26 + + com.sun.mail + javax.mail + [1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT) + junit diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java index ae9a61d..c95d972 100644 --- a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java @@ -10,14 +10,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -49,7 +50,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; public class SocialDataIndexerPlugin extends Plugin{ //Logger - private static Logger _log = LoggerFactory.getLogger(SocialDataIndexerPlugin.class); + private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class); // the cluster name private String clusterName; @@ -69,35 +70,44 @@ public class SocialDataIndexerPlugin extends Plugin inputs) throws Exception { + public void launch(Map inputs){ String scope = null; // retrieve the scope from inputs, if any if(inputs.containsKey("scope")) scope = (String) inputs.get("scope"); + else{ + + logger.error("Scope variable is not set. Unable to continue"); + return; + } // connection to cassandra - if(scope != null) - store = new DBCassandraAstyanaxImpl(scope); - else - store = new DBCassandraAstyanaxImpl(); + store = new DBCassandraAstyanaxImpl(scope); - // retrieve ElasticSearch Endpoint and set hosts/port number - ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope); + // retrieve ElasticSearch Endpoint and set hosts/port numbers + ElasticSearchRunningCluster elasticCluster; + + try { + elasticCluster = new ElasticSearchRunningCluster(scope); + } catch (Exception e1) { + logger.error(e1.toString()); + return; + } // save info clusterName = elasticCluster.getClusterName(); hostsToContact = elasticCluster.getHosts(); portNumbers = elasticCluster.getPorts(); - _log.debug("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and " + logger.debug("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and " + " cluster's name = " + clusterName); // set cluster's name to check and the sniff property to true. @@ -123,27 +133,27 @@ public class SocialDataIndexerPlugin extends Plugin vreIds = null; try { vreIds = store.getAllVREIds(); } catch (ConnectionException e) { - _log.error("Unable to retrieve vres' ids", e); - throw e; + logger.error("Unable to retrieve vres' ids", e); + return; } // save feeds & comments @@ -152,48 +162,48 @@ public class SocialDataIndexerPlugin extends Plugin feeds = store.getAllFeedsByVRE(vreID); - _log.debug("Number of retrieved feeds is " + feeds.size() + " for vre " + vreID); + logger.debug("Number of retrieved feeds is " + feeds.size() + " for vre " + vreID); // enhance List enhanceFeeds = enhanceFeeds((ArrayList) feeds); // try to index - addEnhancedFeedsInBulk(enhanceFeeds, null); - addEnhancedFeedsInBulk(enhanceFeeds, vreID); + addEnhancedFeedsInBulk(enhanceFeeds); }catch(Exception e){ - _log.debug("Exception while saving feeds/comments into the index", e); - client.close(); - throw e; + logger.debug("Exception while saving feeds/comments into the index", e); + return; } } long end = System.currentTimeMillis(); - _log.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " + + logger.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " + " that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes."); + } + @Override + protected void finalize(){ // close connection - client.close(); - + if(client != null) + client.close(); } /**{@inheritDoc}*/ @Override protected void onStop() throws Exception { - _log.debug("onStop()"); + logger.debug("onStop()"); Thread.currentThread().interrupt(); } /** * Add enhancedFeeds into the elasticsearch index. * @param enhanceFeeds - * @param vreID * @throws BulkInsertionFailedException */ - public void addEnhancedFeedsInBulk(List enhanceFeeds, String vreID) throws BulkInsertionFailedException { - _log.debug("Starting bulk insert enhanced feeds operation in vre = " + vreID); + private void addEnhancedFeedsInBulk(List enhanceFeeds) throws BulkInsertionFailedException { + logger.debug("Starting bulk insert enhanced feeds operation"); BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @@ -201,8 +211,7 @@ public class SocialDataIndexerPlugin extends Plugin attachments = new ArrayList(); if (isMultiFileUpload) { - _log.debug("Retriving attachments for feed with id="+feed.getKey()); + logger.debug("Retriving attachments for feed with id="+feed.getKey()); try { attachments = (ArrayList) store.getAttachmentsByFeedId(feed.getKey()); } catch (FeedIDNotFoundException e) { - _log.error("It looks like sth wrong with this feedid having attachments, could not find feedId = " + feed.getKey() + "\n" + e.getMessage()); + logger.error("It looks like sth wrong with this feedid having attachments, could not find feedId = " + feed.getKey() + "\n" + e.getMessage()); throw new Exception(); } } @@ -320,7 +328,7 @@ public class SocialDataIndexerPlugin extends Plugin getAllCommentsByFeed(String feedid) { - //_log.trace("Asking comments for " + feedid); + //logger.trace("Asking comments for " + feedid); ArrayList toReturn = (ArrayList) store.getAllCommentByFeed(feedid); Collections.sort(toReturn); return toReturn; diff --git a/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java b/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java index 6740a58..e5bb246 100644 --- a/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java +++ b/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java @@ -1,23 +1,38 @@ package org.gcube.socialnetworking.socialdataindexer; +import java.util.HashMap; +import java.util.Map; + import org.gcube.common.authorization.library.provider.SecurityTokenProvider; -import org.gcube.socialnetworking.social_data_indexing_common.ex.ServiceEndPointException; -import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Tests { + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(Tests.class); + + @Before + public void beforeTest(){ + // set security token + SecurityTokenProvider.instance.set("422d795b-d978-41d5-abac-b1c8be90a632"); + } + @Test - public void retrieveISInformation() { + public void testLaunch() { + logger.debug("Starting to test launch"); + Map inputs = new HashMap(); + inputs.put("scope", "gcube"); + SocialDataIndexerPlugin plugin = new SocialDataIndexerPlugin(null); + plugin.launch(inputs); //TODO: uncomment for testing purpose + logger.debug("-------------- launch test finished"); + } - try { + @After + public void after(){ - // set security token - SecurityTokenProvider.instance.set("422d795b-d978-41d5-abac-b1c8be90a632"); - ElasticSearchRunningCluster esrc = new ElasticSearchRunningCluster("gcube"); - System.err.println(esrc.getClusterName() + ", " + esrc.getHosts() + ", " + esrc.getPorts()); - } catch (ServiceEndPointException e) { - System.err.println(e.toString()); - } catch (Exception e) { - System.err.println(e.toString()); - } } }