Implementation added.
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/social-networking/social-data-indexer-se-plugin@122844 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
5c553cf939
commit
827997c133
87
pom.xml
87
pom.xml
|
@ -1,42 +1,76 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.gcube.tools</groupId>
|
||||
<artifactId>maven-parent</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
<groupId>org.gcube.socialnetworking</groupId>
|
||||
<artifactId>social-data-indexer-se-plugin</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<name>Social Data Indexer Smart Executor Plugin</name>
|
||||
<description>Social Data Indexer Smart Executor Plugin</description>
|
||||
|
||||
<scm>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.gcube.tools</groupId>
|
||||
<artifactId>maven-parent</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</parent>
|
||||
<groupId>org.gcube.socialnetworking</groupId>
|
||||
<artifactId>social-data-indexer-se-plugin</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<name>Social Data Indexer Smart Executor Plugin</name>
|
||||
<description>Social Data Indexer Smart Executor Plugin</description>
|
||||
|
||||
<scm>
|
||||
<connection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/${serviceClass}/${project.artifactId}</connection>
|
||||
<developerConnection>scm:https://svn.d4science.research-infrastructures.eu/gcube/trunk/${serviceClass}/${project.artifactId}</developerConnection>
|
||||
<url>https://svn.d4science.research-infrastructures.eu/gcube/trunk/${serviceClass}/${project.artifactId}</url>
|
||||
</scm>
|
||||
|
||||
<properties>
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metaInfDirectory>src/main/resources/META-INF/services</metaInfDirectory>
|
||||
<distroDirectory>distro</distroDirectory>
|
||||
<serviceClass>social-networking</serviceClass>
|
||||
<elasticSearchVersion>2.2.0</elasticSearchVersion>
|
||||
<guavaVersion>18.0</guavaVersion>
|
||||
</properties>
|
||||
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.gcube.distribution</groupId>
|
||||
<artifactId>maven-smartgears-bom</artifactId>
|
||||
<artifactId>maven-portal-bom</artifactId>
|
||||
<version>LATEST</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.gcube.portal</groupId>
|
||||
<artifactId>social-networking-library</artifactId>
|
||||
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch</groupId>
|
||||
<artifactId>elasticsearch</artifactId>
|
||||
<version>${elasticSearchVersion}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${guavaVersion}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ning</groupId>
|
||||
<artifactId>compress-lzf</artifactId>
|
||||
<version>1.0.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.gcube.resources.discovery</groupId>
|
||||
<artifactId>ic-client</artifactId>
|
||||
<version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.gcube.common.portal</groupId>
|
||||
<artifactId>portal-manager</artifactId>
|
||||
<version>[1.0.0-SNAPSHPT, 2.0.0-SNAPSHOT)</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.gcube.vremanagement</groupId>
|
||||
<artifactId>smart-executor-api</artifactId>
|
||||
|
@ -47,23 +81,20 @@
|
|||
<artifactId>slf4j-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Test Dependency -->
|
||||
<dependency>
|
||||
<groupId>com.netflix.astyanax</groupId>
|
||||
<artifactId>astyanax</artifactId>
|
||||
<version>1.56.26</version>
|
||||
</dependency>
|
||||
<!-- Test Dependency -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.11</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.0.13</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
|
|
|
@ -3,38 +3,381 @@
|
|||
*/
|
||||
package org.gcube.socialnetworking.socialdataindexer;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl;
|
||||
import org.gcube.portal.databook.server.DatabookStore;
|
||||
import org.gcube.portal.databook.shared.Comment;
|
||||
import org.gcube.portal.databook.shared.Feed;
|
||||
import org.gcube.portal.databook.shared.FeedType;
|
||||
import org.gcube.socialnetworking.socialdataindexer.ex.BulkInsertionFailedException;
|
||||
import org.gcube.socialnetworking.socialdataindexer.utils.ElasticSearchRunningCluster;
|
||||
import org.gcube.socialnetworking.socialdataindexer.utils.IndexFields;
|
||||
import org.gcube.vremanagement.executor.plugin.Plugin;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
|
||||
|
||||
/**
|
||||
* This plugin synchronizes and indexes data coming from the cassandra cluster in the elasticsearch engine.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDeclaration>{
|
||||
|
||||
/**
|
||||
* Logger
|
||||
*/
|
||||
private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
|
||||
|
||||
public SocialDataIndexerPlugin(SocialDataIndexerPluginDeclaration pluginDeclaration) {
|
||||
//Logger
|
||||
private static Logger _log = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
|
||||
|
||||
// the cluster name
|
||||
private String clusterName;
|
||||
|
||||
// list of hosts to contact
|
||||
private List<String> hostsToContact;
|
||||
|
||||
// private port number
|
||||
private List<Integer> portNumbers;
|
||||
|
||||
// the elasticsearch client
|
||||
private TransportClient client;
|
||||
|
||||
// connection to cassandra
|
||||
private DatabookStore store;
|
||||
|
||||
public SocialDataIndexerPlugin(SocialDataIndexerPluginDeclaration pluginDeclaration){
|
||||
|
||||
super(pluginDeclaration);
|
||||
logger.debug("contructor");
|
||||
_log.debug("Constructor");
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public void launch(Map<String, Object> inputs) throws Exception {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
String scope = null;
|
||||
|
||||
// retrieve the scope from inputs, if any
|
||||
if(inputs.containsKey("scope"))
|
||||
scope = (String) inputs.get("scope");
|
||||
|
||||
// connection to cassandra
|
||||
if(scope != null)
|
||||
store = new DBCassandraAstyanaxImpl(scope);
|
||||
else
|
||||
store = new DBCassandraAstyanaxImpl();
|
||||
|
||||
// retrieve ElasticSearch Endpoint and set hosts/port number
|
||||
ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope);
|
||||
|
||||
// save info
|
||||
clusterName = elasticCluster.getClusterName();
|
||||
hostsToContact = elasticCluster.getHosts();
|
||||
portNumbers = elasticCluster.getPorts();
|
||||
|
||||
_log.debug("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and "
|
||||
+ " cluster's name = " + clusterName);
|
||||
|
||||
// set cluster's name to check and the sniff property to true.
|
||||
// Cluster's name: each node must have this name.
|
||||
// Sniff property: allows the client to recover cluster's structure.
|
||||
// Look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html
|
||||
Settings settings = Settings.settingsBuilder()
|
||||
.put("cluster.name", this.clusterName) // force unique cluster's name check
|
||||
//.put("client.transport.sniff", true) -> unneeded since we pass all the cluster's nodes
|
||||
.build();
|
||||
|
||||
// build the client
|
||||
client = TransportClient.builder().settings(settings).build();
|
||||
|
||||
// add the nodes to contact
|
||||
for (int i = 0; i < hostsToContact.size(); i++){
|
||||
try {
|
||||
|
||||
client.addTransportAddress(
|
||||
new InetSocketTransportAddress(
|
||||
InetAddress.getByName(hostsToContact.get(i)), portNumbers.get(i))
|
||||
);
|
||||
|
||||
} catch (UnknownHostException e) {
|
||||
|
||||
_log.debug("Error while adding " + hostsToContact.get(i) + ":" + portNumbers.get(i) + " as host to be contacted.");
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
_log.debug("Connection to ElasticSearch cluster done.");
|
||||
|
||||
_log.debug("Synchronization thread starts running");
|
||||
|
||||
long init = System.currentTimeMillis();
|
||||
|
||||
// for each VRE, we build a index reachable under the path /social/feeds/vreID/.. and /social/comments/vreID/
|
||||
// and a general index /social/feeds/ /social/comments.
|
||||
List<String> vreIds = null;
|
||||
try {
|
||||
|
||||
vreIds = store.getAllVREIds();
|
||||
|
||||
} catch (ConnectionException e) {
|
||||
_log.error("Unable to retrieve vres' ids", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// save feeds & comments
|
||||
for (String vreID : vreIds) {
|
||||
|
||||
try{
|
||||
// get its feeds
|
||||
List<Feed> feeds = store.getAllFeedsByVRE(vreID);
|
||||
|
||||
_log.debug("List of feeds of vre " + vreID + " retrieved.");
|
||||
|
||||
// save feeds
|
||||
addFeedsInBulk(feeds, vreID);
|
||||
addFeedsInBulk(feeds, null);
|
||||
|
||||
// get feeds' comments
|
||||
List<Comment> comments = new ArrayList<Comment>();
|
||||
for (Feed feed : feeds) {
|
||||
|
||||
List<Comment> commentsFeed = store.getAllCommentByFeed(feed.getKey());
|
||||
comments.addAll(commentsFeed);
|
||||
|
||||
}
|
||||
|
||||
addCommentsInBulk(comments, vreID);
|
||||
addCommentsInBulk(comments, null);
|
||||
|
||||
}catch(Exception e){
|
||||
|
||||
_log.debug("Exception while saving feeds/comments into the index", e);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
_log.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " +
|
||||
" that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes.");
|
||||
|
||||
// close connection
|
||||
client.close();
|
||||
|
||||
}
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
protected void onStop() throws Exception {
|
||||
logger.debug("onStop()");
|
||||
|
||||
_log.debug("onStop()");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add feeds to the elasticsearch index using bulk api.
|
||||
* @param feeds
|
||||
* @param vreID
|
||||
* @throws BulkInsertionFailedException
|
||||
*/
|
||||
private void addFeedsInBulk(List<Feed> feeds, String vreID) throws BulkInsertionFailedException {
|
||||
|
||||
// look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.2/java-docs-bulk-processor.html
|
||||
_log.debug("Starting bulk insert feeds operation in vre = " + vreID);
|
||||
BulkProcessor bulkProcessor = BulkProcessor.builder(
|
||||
client,
|
||||
new BulkProcessor.Listener() {
|
||||
@Override
|
||||
public void beforeBulk(long executionId,
|
||||
BulkRequest request) {
|
||||
|
||||
_log.debug("[FEEDS]Executing bulk request with id " + executionId
|
||||
+ " and number of requests " + request.numberOfActions());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId,
|
||||
BulkRequest request,
|
||||
BulkResponse response) {
|
||||
|
||||
_log.debug("[FEEDS]Executed bulk request with id " + executionId
|
||||
+ " and number of requests " + request.numberOfActions() + ". Has it failures? " + response.hasFailures());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId,
|
||||
BulkRequest request,
|
||||
Throwable failure) {
|
||||
|
||||
_log.debug("[FEEDS]Bulk request with id " + executionId
|
||||
+ " and number of requests " + request.numberOfActions() + " failed! Reason: " + failure.getMessage());
|
||||
|
||||
}
|
||||
})
|
||||
.setBulkActions(1000)
|
||||
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
|
||||
.setFlushInterval(TimeValue.timeValueSeconds(5))
|
||||
.setConcurrentRequests(1)
|
||||
.setBackoffPolicy(
|
||||
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
|
||||
.build();
|
||||
|
||||
// build up the bulk request
|
||||
for (Feed feed : feeds) {
|
||||
|
||||
String feedUUID = feed.getKey();
|
||||
String feedVRE = feed.getVreid();
|
||||
String feedDescription = feed.getDescription();
|
||||
FeedType feedType = feed.getType();
|
||||
|
||||
// prepare the json object
|
||||
Map<String, Object> jsonFeed = new HashMap<String, Object>();
|
||||
jsonFeed.put(IndexFields.FEED_TEXT, feedDescription);
|
||||
jsonFeed.put(IndexFields.FEED_TYPE, feedType);
|
||||
jsonFeed.put(IndexFields.FEED_VRE_ID, feedVRE);
|
||||
|
||||
IndexRequest ind;
|
||||
|
||||
if(vreID == null){
|
||||
|
||||
//_log.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.FEED_TABLE);
|
||||
ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.FEED_TABLE, feedUUID)
|
||||
.source(jsonFeed);
|
||||
|
||||
}
|
||||
else{
|
||||
|
||||
//_log.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.FEED_TABLE + vreID);
|
||||
ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.FEED_TABLE + vreID, feedUUID)
|
||||
.source(jsonFeed);
|
||||
|
||||
}
|
||||
|
||||
// add to the bulkprocessor
|
||||
bulkProcessor.add(ind);
|
||||
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
// flush data and wait termination
|
||||
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
_log.error("[FEEDS] bulk processor failed!", e);
|
||||
throw new BulkInsertionFailedException("[FEEDS] bulk processor failed!");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Add comments to the elasticsearch index using bulk api.
|
||||
* @param comments
|
||||
* @param vreID
|
||||
* @throws BulkInsertionFailedException
|
||||
*/
|
||||
private void addCommentsInBulk(List<Comment> comments, String vreID) throws BulkInsertionFailedException {
|
||||
|
||||
// look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.2/java-docs-bulk-processor.html
|
||||
_log.debug("Starting bulk insert comments operation in vre = " + vreID);
|
||||
BulkProcessor bulkProcessor = BulkProcessor.builder(
|
||||
client,
|
||||
new BulkProcessor.Listener() {
|
||||
@Override
|
||||
public void beforeBulk(long executionId,
|
||||
BulkRequest request) {
|
||||
|
||||
_log.debug("[COMMENTS]Executing bulk request with id " + executionId
|
||||
+ " and number of requests " + request.numberOfActions());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId,
|
||||
BulkRequest request,
|
||||
BulkResponse response) {
|
||||
|
||||
_log.debug("[COMMENTS]Executed bulk request with id " + executionId
|
||||
+ " and number of requests " + request.numberOfActions() + ". Has it failures? " + response.hasFailures());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterBulk(long executionId,
|
||||
BulkRequest request,
|
||||
Throwable failure) {
|
||||
|
||||
_log.debug("[COMMENTS]Bulk request with id " + executionId
|
||||
+ " and number of requests " + request.numberOfActions() + ". Failed! Reason: " + failure.getMessage());
|
||||
|
||||
}
|
||||
})
|
||||
.setBulkActions(1000)
|
||||
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
|
||||
.setFlushInterval(TimeValue.timeValueSeconds(5))
|
||||
.setConcurrentRequests(1)
|
||||
.setBackoffPolicy(
|
||||
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
|
||||
.build();
|
||||
|
||||
// build up the bulk request
|
||||
for (Comment comment : comments) {
|
||||
|
||||
String commentUUID = comment.getKey();
|
||||
String parentId = comment.getFeedid();
|
||||
String description = comment.getText();
|
||||
|
||||
// prepare the json object
|
||||
Map<String, Object> jsonComment = new HashMap<String, Object>();
|
||||
jsonComment.put(IndexFields.COMMENT_PARENT_ID, parentId);
|
||||
jsonComment.put(IndexFields.COMMENT_TEXT, description);
|
||||
|
||||
// write
|
||||
IndexRequest ind;
|
||||
|
||||
if(vreID == null){
|
||||
|
||||
//_log.debug("Saving comment in " + IndexFields.INDEX_NAME + "/" + IndexFields.COMMENT_TABLE);
|
||||
ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.COMMENT_TABLE, commentUUID)
|
||||
.source(jsonComment);
|
||||
|
||||
}
|
||||
else{
|
||||
|
||||
//_log.debug("Saving comment in " + IndexFields.INDEX_NAME + "/" + IndexFields.COMMENT_TABLE + vreID);
|
||||
ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.COMMENT_TABLE + vreID, commentUUID)
|
||||
.source(jsonComment);
|
||||
|
||||
}
|
||||
|
||||
// add to the bulkprocessor
|
||||
bulkProcessor.add(ind);
|
||||
|
||||
}
|
||||
|
||||
try {
|
||||
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
_log.error("[COMMENTS] bulk processor failed!", e);
|
||||
throw new BulkInsertionFailedException("[COMMENTS] bulk processor failed!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,32 +12,39 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* The social data indexer plugin declaration class.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class SocialDataIndexerPluginDeclaration implements PluginDeclaration {
|
||||
|
||||
/**
|
||||
* Logger
|
||||
*/
|
||||
private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPluginDeclaration.class);
|
||||
|
||||
|
||||
/**
|
||||
* Plugin name used by the Executor to retrieve this class
|
||||
*/
|
||||
public static final String NAME = "";
|
||||
public static final String DESCRIPTION = "";
|
||||
public static final String NAME = "social-data-indexer-plugin";
|
||||
public static final String DESCRIPTION = "The social-data-indexer-plugin has the role to index data contained"
|
||||
+ " in the Cassandra cluster using an elasticsearch index to support full-text search.";
|
||||
public static final String VERSION = "1.0.0";
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public void init() {
|
||||
logger.debug(String.format("%s initialized", SocialDataIndexerPluginDeclaration.class.getSimpleName()));
|
||||
}
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public String getDescription() {
|
||||
|
@ -49,7 +56,7 @@ public class SocialDataIndexerPluginDeclaration implements PluginDeclaration {
|
|||
public String getVersion() {
|
||||
return VERSION;
|
||||
}
|
||||
|
||||
|
||||
/**{@inheritDoc}*/
|
||||
@Override
|
||||
public Map<String, String> getSupportedCapabilities() {
|
||||
|
@ -63,5 +70,14 @@ public class SocialDataIndexerPluginDeclaration implements PluginDeclaration {
|
|||
public Class<? extends Plugin<? extends PluginDeclaration>> getPluginImplementation() {
|
||||
return SocialDataIndexerPlugin.class;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
return String.format("%s : %s - %s - %s - %s - %s",
|
||||
this.getClass().getSimpleName(),
|
||||
getName(), getVersion(), getDescription(),
|
||||
getSupportedCapabilities(),
|
||||
getPluginImplementation().getClass().getSimpleName());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer.ex;
|
||||
|
||||
/**
|
||||
* BulkInsertionFailedException class: such an error is thrown if the index process fails.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class BulkInsertionFailedException extends Exception {
|
||||
|
||||
private static final long serialVersionUID = -7707293515649267047L;
|
||||
|
||||
private static final String DEFAULT_MESSAGE = "Unable to insert data into the index";
|
||||
|
||||
public BulkInsertionFailedException(){
|
||||
super(DEFAULT_MESSAGE);
|
||||
}
|
||||
|
||||
public BulkInsertionFailedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer.ex;
|
||||
|
||||
/**
|
||||
* No elasticsearch cluster in the infrastructure found exception.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class NoElasticSearchRuntimeResourceException extends Exception {
|
||||
|
||||
private static final long serialVersionUID = -40748130477807648L;
|
||||
|
||||
private static final String DEFAULT_MESSAGE = "No ElasticSearch cluster instance for this scope!";
|
||||
|
||||
public NoElasticSearchRuntimeResourceException(){
|
||||
super(DEFAULT_MESSAGE);
|
||||
}
|
||||
|
||||
public NoElasticSearchRuntimeResourceException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer.ex;
|
||||
|
||||
/**
|
||||
* Exception thrown when it is not possible retrieve information from the ServiceEndpoint
|
||||
* related to ElasticSearch
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class ServiceEndPointException extends Exception {
|
||||
|
||||
private static final long serialVersionUID = 5378333924429281681L;
|
||||
|
||||
private static final String DEFAULT_MESSAGE = "Unable to retrieve information from ElasticSearch endpoint!";
|
||||
|
||||
public ServiceEndPointException(){
|
||||
super(DEFAULT_MESSAGE);
|
||||
}
|
||||
public ServiceEndPointException(String string) {
|
||||
super(string);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer.ex;
|
||||
|
||||
/**
|
||||
* Too many clusters in this scope exception.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class TooManyRunningClustersException extends Exception {
|
||||
|
||||
private static final long serialVersionUID = -4112724774153676227L;
|
||||
|
||||
private static final String DEFAULT_MESSAGE = "Too many ElasticSearch cluster instances for this scope!";
|
||||
|
||||
public TooManyRunningClustersException(){
|
||||
super(DEFAULT_MESSAGE);
|
||||
}
|
||||
|
||||
public TooManyRunningClustersException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer.utils;
|
||||
|
||||
import static org.gcube.resources.discovery.icclient.ICFactory.clientFor;
|
||||
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.gcube.common.portal.PortalContext;
|
||||
import org.gcube.common.resources.gcore.ServiceEndpoint;
|
||||
import org.gcube.common.resources.gcore.ServiceEndpoint.AccessPoint;
|
||||
import org.gcube.common.scope.api.ScopeProvider;
|
||||
import org.gcube.resources.discovery.client.api.DiscoveryClient;
|
||||
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
|
||||
import org.gcube.socialnetworking.socialdataindexer.ex.NoElasticSearchRuntimeResourceException;
|
||||
import org.gcube.socialnetworking.socialdataindexer.ex.ServiceEndPointException;
|
||||
import org.gcube.socialnetworking.socialdataindexer.ex.TooManyRunningClustersException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Retrieve elasticsearch's running instance information in the infrastructure.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class ElasticSearchRunningCluster {
|
||||
|
||||
//logger
|
||||
private static final Logger _log = LoggerFactory.getLogger(ElasticSearchRunningCluster.class);
|
||||
|
||||
//properties
|
||||
private final static String RUNTIME_RESOURCE_NAME = "SocialPortalDataIndex";
|
||||
private final static String PLATFORM_NAME = "ElasticSearch";
|
||||
|
||||
// retrieved data
|
||||
private List<String> hosts = new ArrayList<String>();
|
||||
private List<Integer> ports = new ArrayList<Integer>();
|
||||
private String clusterName;
|
||||
|
||||
public ElasticSearchRunningCluster(String infrastructure) throws Exception{
|
||||
|
||||
try {
|
||||
|
||||
List<ServiceEndpoint> resources = getConfigurationFromIS(infrastructure);
|
||||
|
||||
if (resources.size() > 1) {
|
||||
_log.error("Too many Runtime Resource having name " + RUNTIME_RESOURCE_NAME +" in this scope");
|
||||
throw new TooManyRunningClustersException("There exist more than 1 Runtime Resource in this scope having name "
|
||||
+ RUNTIME_RESOURCE_NAME + " and Platform " + PLATFORM_NAME + ". Only one allowed per infrasrtucture.");
|
||||
}
|
||||
else if (resources.size() == 0){
|
||||
_log.error("There is no Runtime Resource having name " + RUNTIME_RESOURCE_NAME +" and Platform " + PLATFORM_NAME + " in this scope.");
|
||||
throw new NoElasticSearchRuntimeResourceException();
|
||||
}
|
||||
else {
|
||||
|
||||
try{
|
||||
|
||||
_log.debug(resources.toString());
|
||||
for (ServiceEndpoint res : resources) {
|
||||
|
||||
Iterator<AccessPoint> accessPointIterator = res.profile().accessPoints().iterator();
|
||||
|
||||
while (accessPointIterator.hasNext()) {
|
||||
ServiceEndpoint.AccessPoint accessPoint = (ServiceEndpoint.AccessPoint) accessPointIterator
|
||||
.next();
|
||||
|
||||
// add this host
|
||||
hosts.add(accessPoint.address().split(":")[0]);
|
||||
|
||||
// save the port
|
||||
int port = Integer.parseInt(accessPoint.address().split(":")[1]);
|
||||
ports.add(port);
|
||||
|
||||
// save the name of the cluster (this should be unique)
|
||||
clusterName = accessPoint.name();
|
||||
|
||||
}
|
||||
}
|
||||
}catch(Exception e ){
|
||||
|
||||
_log.error(e.toString());
|
||||
throw new ServiceEndPointException();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
_log.error(e.toString());
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve endpoints information from IS
|
||||
* @return list of endpoints for elasticsearch
|
||||
* @throws Exception
|
||||
*/
|
||||
private List<ServiceEndpoint> getConfigurationFromIS(String infrastructure) throws Exception{
|
||||
|
||||
PortalContext context = PortalContext.getConfiguration();
|
||||
String scope = "/";
|
||||
if(infrastructure != null && !infrastructure.isEmpty())
|
||||
scope += infrastructure;
|
||||
else
|
||||
scope += context.getInfrastructureName();
|
||||
|
||||
String currScope = ScopeProvider.instance.get();
|
||||
ScopeProvider.instance.set(scope);
|
||||
SimpleQuery query = queryFor(ServiceEndpoint.class);
|
||||
query.addCondition("$resource/Profile/Name/text() eq '"+ RUNTIME_RESOURCE_NAME +"'");
|
||||
query.addCondition("$resource/Profile/Platform/Name/text() eq '"+ PLATFORM_NAME +"'");
|
||||
DiscoveryClient<ServiceEndpoint> client = clientFor(ServiceEndpoint.class);
|
||||
List<ServiceEndpoint> toReturn = client.submit(query);
|
||||
ScopeProvider.instance.set(currScope);
|
||||
return toReturn;
|
||||
|
||||
}
|
||||
|
||||
public List<String> getHosts() {
|
||||
return hosts;
|
||||
}
|
||||
|
||||
public List<Integer> getPorts() {
|
||||
return ports;
|
||||
}
|
||||
|
||||
public String getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer.utils;
|
||||
|
||||
/**
|
||||
* The fields used to build up the index.
|
||||
* @author Costantino Perciante at ISTI-CNR
|
||||
* (costantino.perciante@isti.cnr.it)
|
||||
*
|
||||
*/
|
||||
public class IndexFields {
|
||||
|
||||
// name of the index
|
||||
public static final String INDEX_NAME = "social";
|
||||
|
||||
// table for comments
|
||||
public static final String COMMENT_TABLE = "comments";
|
||||
|
||||
// table for feeds
|
||||
public static final String FEED_TABLE = "feeds";
|
||||
|
||||
// comment table's fields
|
||||
public static final String COMMENT_TEXT = "description";
|
||||
public static final String COMMENT_PARENT_ID = "parent_id";
|
||||
|
||||
// feed table's fields
|
||||
public static final String FEED_TEXT = "description";
|
||||
public static final String FEED_TYPE = "type";
|
||||
public static final String FEED_VRE_ID = "vre_id";
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package org.gcube.socialnetworking.socialdataindexer;
|
||||
|
||||
import org.gcube.socialnetworking.socialdataindexer.ex.ServiceEndPointException;
|
||||
import org.gcube.socialnetworking.socialdataindexer.utils.ElasticSearchRunningCluster;
|
||||
import org.junit.Test;
|
||||
public class Tests {
|
||||
|
||||
@Test
|
||||
public void retrieveISInformation() {
|
||||
|
||||
try {
|
||||
ElasticSearchRunningCluster esrc = new ElasticSearchRunningCluster("gcube");
|
||||
System.err.println(esrc.getClusterName() + ", " + esrc.getHosts() + ", " + esrc.getPorts());
|
||||
} catch (ServiceEndPointException e) {
|
||||
System.err.println(e.toString());
|
||||
} catch (Exception e) {
|
||||
System.err.println(e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
<configuration>
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{0}: %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="org.gcube" level="WARN" />
|
||||
<logger name="org.gcube.informationsystem.sweeper" level="TRACE" />
|
||||
|
||||
<root level="WARN">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
|
||||
</configuration>
|
Loading…
Reference in New Issue