From 827997c13373fe0a83b970beb77a96f8bd25bc61 Mon Sep 17 00:00:00 2001 From: Costantino Perciante Date: Thu, 4 Feb 2016 16:06:49 +0000 Subject: [PATCH] Implementation added. git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/social-networking/social-data-indexer-se-plugin@122844 82a268e6-3cf1-43bd-a215-b396298e98cf --- pom.xml | 87 +++-- .../SocialDataIndexerPlugin.java | 367 +++++++++++++++++- .../SocialDataIndexerPluginDeclaration.java | 32 +- .../ex/BulkInsertionFailedException.java | 23 ++ ...ElasticSearchRuntimeResourceException.java | 24 ++ .../ex/ServiceEndPointException.java | 22 ++ .../ex/TooManyRunningClustersException.java | 23 ++ .../utils/ElasticSearchRunningCluster.java | 133 +++++++ .../socialdataindexer/utils/IndexFields.java | 29 ++ .../socialdataindexer/Tests.java | 21 + src/test/resources/logback-test.xml | 16 - 11 files changed, 713 insertions(+), 64 deletions(-) create mode 100644 src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/BulkInsertionFailedException.java create mode 100644 src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/NoElasticSearchRuntimeResourceException.java create mode 100644 src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/ServiceEndPointException.java create mode 100644 src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/TooManyRunningClustersException.java create mode 100644 src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/ElasticSearchRunningCluster.java create mode 100644 src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/IndexFields.java create mode 100644 src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java delete mode 100644 src/test/resources/logback-test.xml diff --git a/pom.xml b/pom.xml index 2d5a995..f632ee5 100644 --- a/pom.xml +++ b/pom.xml @@ -1,42 +1,76 @@ - - 4.0.0 - - org.gcube.tools - maven-parent - 1.0.0 - - org.gcube.socialnetworking - social-data-indexer-se-plugin - 1.0.0-SNAPSHOT - Social Data Indexer Smart Executor Plugin - Social Data Indexer Smart Executor Plugin - - + + 4.0.0 + + org.gcube.tools + maven-parent + 1.0.0 + + org.gcube.socialnetworking + social-data-indexer-se-plugin + 1.0.0-SNAPSHOT + Social Data Indexer Smart Executor Plugin + Social Data Indexer Smart Executor Plugin + + scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/${serviceClass}/${project.artifactId} scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/${serviceClass}/${project.artifactId} https://svn.d4science.research-infrastructures.eu/gcube/trunk/${serviceClass}/${project.artifactId} - + UTF-8 src/main/resources/META-INF/services distro social-networking + 2.2.0 + 18.0 - + org.gcube.distribution - maven-smartgears-bom + maven-portal-bom LATEST pom import - + + + org.gcube.portal + social-networking-library + [1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT) + compile + + + org.elasticsearch + elasticsearch + ${elasticSearchVersion} + + + com.google.guava + guava + ${guavaVersion} + + + com.ning + compress-lzf + 1.0.3 + + + org.gcube.resources.discovery + ic-client + [1.0.0-SNAPSHOT,2.0.0-SNAPSHOT) + + + org.gcube.common.portal + portal-manager + [1.0.0-SNAPSHPT, 2.0.0-SNAPSHOT) + org.gcube.vremanagement smart-executor-api @@ -47,23 +81,20 @@ slf4j-api provided - - + + com.netflix.astyanax + astyanax + 1.56.26 + + junit junit 4.11 test - - ch.qos.logback - logback-classic - 1.0.13 - test - - - + maven-compiler-plugin diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java index aa2112c..511ce2a 100644 --- a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java @@ -3,38 +3,381 @@ */ package org.gcube.socialnetworking.socialdataindexer; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl; +import org.gcube.portal.databook.server.DatabookStore; +import org.gcube.portal.databook.shared.Comment; +import org.gcube.portal.databook.shared.Feed; +import org.gcube.portal.databook.shared.FeedType; +import org.gcube.socialnetworking.socialdataindexer.ex.BulkInsertionFailedException; +import org.gcube.socialnetworking.socialdataindexer.utils.ElasticSearchRunningCluster; +import org.gcube.socialnetworking.socialdataindexer.utils.IndexFields; import org.gcube.vremanagement.executor.plugin.Plugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +/** + * This plugin synchronizes and indexes data coming from the cassandra cluster in the elasticsearch engine. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ public class SocialDataIndexerPlugin extends Plugin{ - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class); - - public SocialDataIndexerPlugin(SocialDataIndexerPluginDeclaration pluginDeclaration) { + //Logger + private static Logger _log = LoggerFactory.getLogger(SocialDataIndexerPlugin.class); + + // the cluster name + private String clusterName; + + // list of hosts to contact + private List hostsToContact; + + // private port number + private List portNumbers; + + // the elasticsearch client + private TransportClient client; + + // connection to cassandra + private DatabookStore store; + + public SocialDataIndexerPlugin(SocialDataIndexerPluginDeclaration pluginDeclaration){ + super(pluginDeclaration); - logger.debug("contructor"); + _log.debug("Constructor"); + } - + /**{@inheritDoc}*/ @Override public void launch(Map inputs) throws Exception { - // TODO Auto-generated method stub + + String scope = null; + + // retrieve the scope from inputs, if any + if(inputs.containsKey("scope")) + scope = (String) inputs.get("scope"); + + // connection to cassandra + if(scope != null) + store = new DBCassandraAstyanaxImpl(scope); + else + store = new DBCassandraAstyanaxImpl(); + + // retrieve ElasticSearch Endpoint and set hosts/port number + ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope); + + // save info + clusterName = elasticCluster.getClusterName(); + hostsToContact = elasticCluster.getHosts(); + portNumbers = elasticCluster.getPorts(); + + _log.debug("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and " + + " cluster's name = " + clusterName); + + // set cluster's name to check and the sniff property to true. + // Cluster's name: each node must have this name. + // Sniff property: allows the client to recover cluster's structure. + // Look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html + Settings settings = Settings.settingsBuilder() + .put("cluster.name", this.clusterName) // force unique cluster's name check + //.put("client.transport.sniff", true) -> unneeded since we pass all the cluster's nodes + .build(); + + // build the client + client = TransportClient.builder().settings(settings).build(); + + // add the nodes to contact + for (int i = 0; i < hostsToContact.size(); i++){ + try { + + client.addTransportAddress( + new InetSocketTransportAddress( + InetAddress.getByName(hostsToContact.get(i)), portNumbers.get(i)) + ); + + } catch (UnknownHostException e) { + + _log.debug("Error while adding " + hostsToContact.get(i) + ":" + portNumbers.get(i) + " as host to be contacted."); + + } + } + + _log.debug("Connection to ElasticSearch cluster done."); + + _log.debug("Synchronization thread starts running"); + + long init = System.currentTimeMillis(); + + // for each VRE, we build a index reachable under the path /social/feeds/vreID/.. and /social/comments/vreID/ + // and a general index /social/feeds/ /social/comments. + List vreIds = null; + try { + + vreIds = store.getAllVREIds(); + + } catch (ConnectionException e) { + _log.error("Unable to retrieve vres' ids", e); + throw e; + } + + // save feeds & comments + for (String vreID : vreIds) { + + try{ + // get its feeds + List feeds = store.getAllFeedsByVRE(vreID); + + _log.debug("List of feeds of vre " + vreID + " retrieved."); + + // save feeds + addFeedsInBulk(feeds, vreID); + addFeedsInBulk(feeds, null); + + // get feeds' comments + List comments = new ArrayList(); + for (Feed feed : feeds) { + + List commentsFeed = store.getAllCommentByFeed(feed.getKey()); + comments.addAll(commentsFeed); + + } + + addCommentsInBulk(comments, vreID); + addCommentsInBulk(comments, null); + + }catch(Exception e){ + + _log.debug("Exception while saving feeds/comments into the index", e); + + } + + } + + long end = System.currentTimeMillis(); + _log.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " + + " that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes."); + + // close connection + client.close(); + } /**{@inheritDoc}*/ @Override protected void onStop() throws Exception { - logger.debug("onStop()"); - + _log.debug("onStop()"); Thread.currentThread().interrupt(); } - + /** + * Add feeds to the elasticsearch index using bulk api. + * @param feeds + * @param vreID + * @throws BulkInsertionFailedException + */ + private void addFeedsInBulk(List feeds, String vreID) throws BulkInsertionFailedException { + + // look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.2/java-docs-bulk-processor.html + _log.debug("Starting bulk insert feeds operation in vre = " + vreID); + BulkProcessor bulkProcessor = BulkProcessor.builder( + client, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, + BulkRequest request) { + + _log.debug("[FEEDS]Executing bulk request with id " + executionId + + " and number of requests " + request.numberOfActions()); + + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + BulkResponse response) { + + _log.debug("[FEEDS]Executed bulk request with id " + executionId + + " and number of requests " + request.numberOfActions() + ". Has it failures? " + response.hasFailures()); + + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + Throwable failure) { + + _log.debug("[FEEDS]Bulk request with id " + executionId + + " and number of requests " + request.numberOfActions() + " failed! Reason: " + failure.getMessage()); + + } + }) + .setBulkActions(1000) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .setConcurrentRequests(1) + .setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8)) + .build(); + + // build up the bulk request + for (Feed feed : feeds) { + + String feedUUID = feed.getKey(); + String feedVRE = feed.getVreid(); + String feedDescription = feed.getDescription(); + FeedType feedType = feed.getType(); + + // prepare the json object + Map jsonFeed = new HashMap(); + jsonFeed.put(IndexFields.FEED_TEXT, feedDescription); + jsonFeed.put(IndexFields.FEED_TYPE, feedType); + jsonFeed.put(IndexFields.FEED_VRE_ID, feedVRE); + + IndexRequest ind; + + if(vreID == null){ + + //_log.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.FEED_TABLE); + ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.FEED_TABLE, feedUUID) + .source(jsonFeed); + + } + else{ + + //_log.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.FEED_TABLE + vreID); + ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.FEED_TABLE + vreID, feedUUID) + .source(jsonFeed); + + } + + // add to the bulkprocessor + bulkProcessor.add(ind); + + } + + try { + + // flush data and wait termination + bulkProcessor.awaitClose(10, TimeUnit.MINUTES); + } catch (InterruptedException e) { + _log.error("[FEEDS] bulk processor failed!", e); + throw new BulkInsertionFailedException("[FEEDS] bulk processor failed!"); + } + + } + + /** + * Add comments to the elasticsearch index using bulk api. + * @param comments + * @param vreID + * @throws BulkInsertionFailedException + */ + private void addCommentsInBulk(List comments, String vreID) throws BulkInsertionFailedException { + + // look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.2/java-docs-bulk-processor.html + _log.debug("Starting bulk insert comments operation in vre = " + vreID); + BulkProcessor bulkProcessor = BulkProcessor.builder( + client, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, + BulkRequest request) { + + _log.debug("[COMMENTS]Executing bulk request with id " + executionId + + " and number of requests " + request.numberOfActions()); + + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + BulkResponse response) { + + _log.debug("[COMMENTS]Executed bulk request with id " + executionId + + " and number of requests " + request.numberOfActions() + ". Has it failures? " + response.hasFailures()); + + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + Throwable failure) { + + _log.debug("[COMMENTS]Bulk request with id " + executionId + + " and number of requests " + request.numberOfActions() + ". Failed! Reason: " + failure.getMessage()); + + } + }) + .setBulkActions(1000) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .setConcurrentRequests(1) + .setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8)) + .build(); + + // build up the bulk request + for (Comment comment : comments) { + + String commentUUID = comment.getKey(); + String parentId = comment.getFeedid(); + String description = comment.getText(); + + // prepare the json object + Map jsonComment = new HashMap(); + jsonComment.put(IndexFields.COMMENT_PARENT_ID, parentId); + jsonComment.put(IndexFields.COMMENT_TEXT, description); + + // write + IndexRequest ind; + + if(vreID == null){ + + //_log.debug("Saving comment in " + IndexFields.INDEX_NAME + "/" + IndexFields.COMMENT_TABLE); + ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.COMMENT_TABLE, commentUUID) + .source(jsonComment); + + } + else{ + + //_log.debug("Saving comment in " + IndexFields.INDEX_NAME + "/" + IndexFields.COMMENT_TABLE + vreID); + ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.COMMENT_TABLE + vreID, commentUUID) + .source(jsonComment); + + } + + // add to the bulkprocessor + bulkProcessor.add(ind); + + } + + try { + bulkProcessor.awaitClose(10, TimeUnit.MINUTES); + } catch (InterruptedException e) { + _log.error("[COMMENTS] bulk processor failed!", e); + throw new BulkInsertionFailedException("[COMMENTS] bulk processor failed!"); + } + } } diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPluginDeclaration.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPluginDeclaration.java index 5524fa3..6ea9855 100644 --- a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPluginDeclaration.java +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPluginDeclaration.java @@ -12,32 +12,39 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * The social data indexer plugin declaration class. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ public class SocialDataIndexerPluginDeclaration implements PluginDeclaration { /** * Logger */ private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPluginDeclaration.class); - + /** * Plugin name used by the Executor to retrieve this class */ - public static final String NAME = ""; - public static final String DESCRIPTION = ""; + public static final String NAME = "social-data-indexer-plugin"; + public static final String DESCRIPTION = "The social-data-indexer-plugin has the role to index data contained" + + " in the Cassandra cluster using an elasticsearch index to support full-text search."; public static final String VERSION = "1.0.0"; - + /**{@inheritDoc}*/ @Override public void init() { logger.debug(String.format("%s initialized", SocialDataIndexerPluginDeclaration.class.getSimpleName())); } - + /**{@inheritDoc}*/ @Override public String getName() { return NAME; } - + /**{@inheritDoc}*/ @Override public String getDescription() { @@ -49,7 +56,7 @@ public class SocialDataIndexerPluginDeclaration implements PluginDeclaration { public String getVersion() { return VERSION; } - + /**{@inheritDoc}*/ @Override public Map getSupportedCapabilities() { @@ -63,5 +70,14 @@ public class SocialDataIndexerPluginDeclaration implements PluginDeclaration { public Class> getPluginImplementation() { return SocialDataIndexerPlugin.class; } - + + @Override + public String toString(){ + return String.format("%s : %s - %s - %s - %s - %s", + this.getClass().getSimpleName(), + getName(), getVersion(), getDescription(), + getSupportedCapabilities(), + getPluginImplementation().getClass().getSimpleName()); + } + } diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/BulkInsertionFailedException.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/BulkInsertionFailedException.java new file mode 100644 index 0000000..2e729d6 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/BulkInsertionFailedException.java @@ -0,0 +1,23 @@ +package org.gcube.socialnetworking.socialdataindexer.ex; + +/** + * BulkInsertionFailedException class: such an error is thrown if the index process fails. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ +public class BulkInsertionFailedException extends Exception { + + private static final long serialVersionUID = -7707293515649267047L; + + private static final String DEFAULT_MESSAGE = "Unable to insert data into the index"; + + public BulkInsertionFailedException(){ + super(DEFAULT_MESSAGE); + } + + public BulkInsertionFailedException(String message) { + super(message); + } + +} diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/NoElasticSearchRuntimeResourceException.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/NoElasticSearchRuntimeResourceException.java new file mode 100644 index 0000000..92c185b --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/NoElasticSearchRuntimeResourceException.java @@ -0,0 +1,24 @@ +package org.gcube.socialnetworking.socialdataindexer.ex; + +/** + * No elasticsearch cluster in the infrastructure found exception. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ +public class NoElasticSearchRuntimeResourceException extends Exception { + + private static final long serialVersionUID = -40748130477807648L; + + private static final String DEFAULT_MESSAGE = "No ElasticSearch cluster instance for this scope!"; + + public NoElasticSearchRuntimeResourceException(){ + super(DEFAULT_MESSAGE); + } + + public NoElasticSearchRuntimeResourceException(String message) { + super(message); + } + + +} diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/ServiceEndPointException.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/ServiceEndPointException.java new file mode 100644 index 0000000..3f84486 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/ServiceEndPointException.java @@ -0,0 +1,22 @@ +package org.gcube.socialnetworking.socialdataindexer.ex; + +/** + * Exception thrown when it is not possible retrieve information from the ServiceEndpoint + * related to ElasticSearch + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ +public class ServiceEndPointException extends Exception { + + private static final long serialVersionUID = 5378333924429281681L; + + private static final String DEFAULT_MESSAGE = "Unable to retrieve information from ElasticSearch endpoint!"; + + public ServiceEndPointException(){ + super(DEFAULT_MESSAGE); + } + public ServiceEndPointException(String string) { + super(string); + } +} diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/TooManyRunningClustersException.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/TooManyRunningClustersException.java new file mode 100644 index 0000000..d2344a6 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/ex/TooManyRunningClustersException.java @@ -0,0 +1,23 @@ +package org.gcube.socialnetworking.socialdataindexer.ex; + +/** + * Too many clusters in this scope exception. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ +public class TooManyRunningClustersException extends Exception { + + private static final long serialVersionUID = -4112724774153676227L; + + private static final String DEFAULT_MESSAGE = "Too many ElasticSearch cluster instances for this scope!"; + + public TooManyRunningClustersException(){ + super(DEFAULT_MESSAGE); + } + + public TooManyRunningClustersException(String message) { + super(message); + } + +} diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/ElasticSearchRunningCluster.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/ElasticSearchRunningCluster.java new file mode 100644 index 0000000..707c0b7 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/ElasticSearchRunningCluster.java @@ -0,0 +1,133 @@ +package org.gcube.socialnetworking.socialdataindexer.utils; + +import static org.gcube.resources.discovery.icclient.ICFactory.clientFor; +import static org.gcube.resources.discovery.icclient.ICFactory.queryFor; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.gcube.common.portal.PortalContext; +import org.gcube.common.resources.gcore.ServiceEndpoint; +import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint; +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; +import org.gcube.socialnetworking.socialdataindexer.ex.NoElasticSearchRuntimeResourceException; +import org.gcube.socialnetworking.socialdataindexer.ex.ServiceEndPointException; +import org.gcube.socialnetworking.socialdataindexer.ex.TooManyRunningClustersException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieve elasticsearch's running instance information in the infrastructure. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ +public class ElasticSearchRunningCluster { + + //logger + private static final Logger _log = LoggerFactory.getLogger(ElasticSearchRunningCluster.class); + + //properties + private final static String RUNTIME_RESOURCE_NAME = "SocialPortalDataIndex"; + private final static String PLATFORM_NAME = "ElasticSearch"; + + // retrieved data + private List hosts = new ArrayList(); + private List ports = new ArrayList(); + private String clusterName; + + public ElasticSearchRunningCluster(String infrastructure) throws Exception{ + + try { + + List resources = getConfigurationFromIS(infrastructure); + + if (resources.size() > 1) { + _log.error("Too many Runtime Resource having name " + RUNTIME_RESOURCE_NAME +" in this scope"); + throw new TooManyRunningClustersException("There exist more than 1 Runtime Resource in this scope having name " + + RUNTIME_RESOURCE_NAME + " and Platform " + PLATFORM_NAME + ". Only one allowed per infrasrtucture."); + } + else if (resources.size() == 0){ + _log.error("There is no Runtime Resource having name " + RUNTIME_RESOURCE_NAME +" and Platform " + PLATFORM_NAME + " in this scope."); + throw new NoElasticSearchRuntimeResourceException(); + } + else { + + try{ + + _log.debug(resources.toString()); + for (ServiceEndpoint res : resources) { + + Iterator accessPointIterator = res.profile().accessPoints().iterator(); + + while (accessPointIterator.hasNext()) { + ServiceEndpoint.AccessPoint accessPoint = (ServiceEndpoint.AccessPoint) accessPointIterator + .next(); + + // add this host + hosts.add(accessPoint.address().split(":")[0]); + + // save the port + int port = Integer.parseInt(accessPoint.address().split(":")[1]); + ports.add(port); + + // save the name of the cluster (this should be unique) + clusterName = accessPoint.name(); + + } + } + }catch(Exception e ){ + + _log.error(e.toString()); + throw new ServiceEndPointException(); + } + } + } catch (Exception e) { + _log.error(e.toString()); + throw e; + } + + } + + /** + * Retrieve endpoints information from IS + * @return list of endpoints for elasticsearch + * @throws Exception + */ + private List getConfigurationFromIS(String infrastructure) throws Exception{ + + PortalContext context = PortalContext.getConfiguration(); + String scope = "/"; + if(infrastructure != null && !infrastructure.isEmpty()) + scope += infrastructure; + else + scope += context.getInfrastructureName(); + + String currScope = ScopeProvider.instance.get(); + ScopeProvider.instance.set(scope); + SimpleQuery query = queryFor(ServiceEndpoint.class); + query.addCondition("$resource/Profile/Name/text() eq '"+ RUNTIME_RESOURCE_NAME +"'"); + query.addCondition("$resource/Profile/Platform/Name/text() eq '"+ PLATFORM_NAME +"'"); + DiscoveryClient client = clientFor(ServiceEndpoint.class); + List toReturn = client.submit(query); + ScopeProvider.instance.set(currScope); + return toReturn; + + } + + public List getHosts() { + return hosts; + } + + public List getPorts() { + return ports; + } + + public String getClusterName() { + return clusterName; + } + +} diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/IndexFields.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/IndexFields.java new file mode 100644 index 0000000..1240753 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/IndexFields.java @@ -0,0 +1,29 @@ +package org.gcube.socialnetworking.socialdataindexer.utils; + +/** + * The fields used to build up the index. + * @author Costantino Perciante at ISTI-CNR + * (costantino.perciante@isti.cnr.it) + * + */ +public class IndexFields { + + // name of the index + public static final String INDEX_NAME = "social"; + + // table for comments + public static final String COMMENT_TABLE = "comments"; + + // table for feeds + public static final String FEED_TABLE = "feeds"; + + // comment table's fields + public static final String COMMENT_TEXT = "description"; + public static final String COMMENT_PARENT_ID = "parent_id"; + + // feed table's fields + public static final String FEED_TEXT = "description"; + public static final String FEED_TYPE = "type"; + public static final String FEED_VRE_ID = "vre_id"; + +} diff --git a/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java b/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java new file mode 100644 index 0000000..5f173cb --- /dev/null +++ b/src/test/java/org/gcube/socialnetworking/socialdataindexer/Tests.java @@ -0,0 +1,21 @@ +package org.gcube.socialnetworking.socialdataindexer; + +import org.gcube.socialnetworking.socialdataindexer.ex.ServiceEndPointException; +import org.gcube.socialnetworking.socialdataindexer.utils.ElasticSearchRunningCluster; +import org.junit.Test; +public class Tests { + + @Test + public void retrieveISInformation() { + + try { + ElasticSearchRunningCluster esrc = new ElasticSearchRunningCluster("gcube"); + System.err.println(esrc.getClusterName() + ", " + esrc.getHosts() + ", " + esrc.getPorts()); + } catch (ServiceEndPointException e) { + System.err.println(e.toString()); + } catch (Exception e) { + System.err.println(e.toString()); + } + } + +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml deleted file mode 100644 index 9325e37..0000000 --- a/src/test/resources/logback-test.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{0}: %msg%n - - - - - - - - - - - \ No newline at end of file