Tests changed and minor bug fixes

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/social-networking/social-data-indexer-se-plugin@124022 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Costantino Perciante 2016-02-09 17:58:55 +00:00
parent 01f3156344
commit 5410e5b971
4 changed files with 109 additions and 81 deletions

View File

@ -1,12 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<Resource>
<ID />
<Type>Library</Type>
<Type>Plugin</Type>
<Profile>
<Description>{description}</Description>
<Class>${serviceClass}</Class>
<Name>${artifactId}</Name>
<Version>${version}</Version>
<Version>1.0.0</Version>
<Packages>
<Software>
<Description>{description}</Description>
@ -17,7 +17,7 @@
<artifactId>${artifactId}</artifactId>
<version>${version}</version>
</MavenCoordinates>
<Type>Library</Type>
<Type>Plugin</Type>
<Files>
<File>${build.finalName}.jar</File>
</Files>

View File

@ -90,6 +90,11 @@
<artifactId>astyanax</artifactId>
<version>1.56.26</version>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>[1.0.0-SNAPSHOT, 2.0.0-SNAPSHOT)</version>
</dependency>
<!-- Test Dependency -->
<dependency>
<groupId>junit</groupId>

View File

@ -10,14 +10,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -49,7 +50,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDeclaration>{
//Logger
private static Logger _log = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
// the cluster name
private String clusterName;
@ -69,35 +70,44 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
public SocialDataIndexerPlugin(SocialDataIndexerPluginDeclaration pluginDeclaration){
super(pluginDeclaration);
_log.debug("Constructor");
logger.debug("Constructor");
}
/**{@inheritDoc}*/
@Override
public void launch(Map<String, Object> inputs) throws Exception {
public void launch(Map<String, Object> inputs){
String scope = null;
// retrieve the scope from inputs, if any
if(inputs.containsKey("scope"))
scope = (String) inputs.get("scope");
else{
logger.error("Scope variable is not set. Unable to continue");
return;
}
// connection to cassandra
if(scope != null)
store = new DBCassandraAstyanaxImpl(scope);
else
store = new DBCassandraAstyanaxImpl();
store = new DBCassandraAstyanaxImpl(scope);
// retrieve ElasticSearch Endpoint and set hosts/port number
ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope);
// retrieve ElasticSearch Endpoint and set hosts/port numbers
ElasticSearchRunningCluster elasticCluster;
try {
elasticCluster = new ElasticSearchRunningCluster(scope);
} catch (Exception e1) {
logger.error(e1.toString());
return;
}
// save info
clusterName = elasticCluster.getClusterName();
hostsToContact = elasticCluster.getHosts();
portNumbers = elasticCluster.getPorts();
_log.debug("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and "
logger.debug("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and "
+ " cluster's name = " + clusterName);
// set cluster's name to check and the sniff property to true.
@ -123,27 +133,27 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
} catch (UnknownHostException e) {
_log.debug("Error while adding " + hostsToContact.get(i) + ":" + portNumbers.get(i) + " as host to be contacted.");
logger.debug("Error while adding " + hostsToContact.get(i) + ":" + portNumbers.get(i) + " as host to be contacted.");
return;
}
}
_log.debug("Connection to ElasticSearch cluster done.");
logger.debug("Connection to ElasticSearch cluster done.");
_log.debug("Synchronization thread starts running");
logger.debug("Synchronization starts running");
long init = System.currentTimeMillis();
// for each VRE, we build a index reachable under the path /social/feeds/vreID/.. and /social/comments/vreID/
// and a general index /social/feeds/ /social/comments.
// we build a index reachable under the path /social/enhanced_feed
List<String> vreIds = null;
try {
vreIds = store.getAllVREIds();
} catch (ConnectionException e) {
_log.error("Unable to retrieve vres' ids", e);
throw e;
logger.error("Unable to retrieve vres' ids", e);
return;
}
// save feeds & comments
@ -152,48 +162,48 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
try{
List<Feed> feeds = store.getAllFeedsByVRE(vreID);
_log.debug("Number of retrieved feeds is " + feeds.size() + " for vre " + vreID);
logger.debug("Number of retrieved feeds is " + feeds.size() + " for vre " + vreID);
// enhance
List<EnhancedFeed> enhanceFeeds = enhanceFeeds((ArrayList<Feed>) feeds);
// try to index
addEnhancedFeedsInBulk(enhanceFeeds, null);
addEnhancedFeedsInBulk(enhanceFeeds, vreID);
addEnhancedFeedsInBulk(enhanceFeeds);
}catch(Exception e){
_log.debug("Exception while saving feeds/comments into the index", e);
client.close();
throw e;
logger.debug("Exception while saving feeds/comments into the index", e);
return;
}
}
long end = System.currentTimeMillis();
_log.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " +
logger.debug("Synchronization thread ends running. It took " + (end - init) + " milliseconds " +
" that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes.");
}
@Override
protected void finalize(){
// close connection
client.close();
if(client != null)
client.close();
}
/**{@inheritDoc}*/
@Override
protected void onStop() throws Exception {
_log.debug("onStop()");
logger.debug("onStop()");
Thread.currentThread().interrupt();
}
/**
* Add enhancedFeeds into the elasticsearch index.
* @param enhanceFeeds
* @param vreID
* @throws BulkInsertionFailedException
*/
public void addEnhancedFeedsInBulk(List<EnhancedFeed> enhanceFeeds, String vreID) throws BulkInsertionFailedException {
_log.debug("Starting bulk insert enhanced feeds operation in vre = " + vreID);
private void addEnhancedFeedsInBulk(List<EnhancedFeed> enhanceFeeds) throws BulkInsertionFailedException {
logger.debug("Starting bulk insert enhanced feeds operation");
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@ -201,8 +211,7 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
public void beforeBulk(long executionId,
BulkRequest request) {
_log.debug("[ENHANCED-FEEDS]Executing bulk request with id " + executionId
+ " and number of requests " + request.numberOfActions());
logger.debug("Going to execute new bulk composed of {} actions", request.numberOfActions());
}
@ -211,8 +220,18 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
BulkRequest request,
BulkResponse response) {
_log.debug("[ENHANCED-FEEDS]Executed bulk request with id " + executionId
+ " and number of requests " + request.numberOfActions() + ". Has it failures? " + response.hasFailures());
logger.debug("Executed bulk composed of {} actions", request.numberOfActions());
if (response.hasFailures()) {
logger.warn("There was failures while executing bulk", response.buildFailureMessage());
if (logger.isDebugEnabled()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
logger.debug("Error for {}/{}/{} for {} operation: {}", item.getIndex(),
item.getType(), item.getId(), item.getOpType(), item.getFailureMessage());
}
}
}
}
}
@ -221,21 +240,26 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
BulkRequest request,
Throwable failure) {
_log.debug("[ENHANCED-FEEDS]Bulk request with id " + executionId
+ " and number of requests " + request.numberOfActions() + " failed! Reason: " + failure.getMessage());
logger.error("Error executing bulk", failure);
if(failure instanceof NoNodeAvailableException){
throw new RuntimeException("No node available. Exiting...");
}
}
})
.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setConcurrentRequests(0)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8))
.build();
for (int i= 0 ; i < enhanceFeeds.size(); i++) {
String enhFeedUUID = null;
try{
EnhancedFeed enFeed = enhanceFeeds.get(i);
@ -243,41 +267,25 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(enFeed);
String enhFeedUUID = enFeed.getFeed().getKey();
enhFeedUUID = enFeed.getFeed().getKey();
IndexRequest ind;
if(vreID == null){
//_log.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.ENHANCED_FEEDS_TABLE);
ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)
.source(json);
}
else{
//_log.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.ENHANCED_FEEDS_TABLE + vreID);
ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE + vreID, enhFeedUUID)
.source(json);
}
//logger.debug("Saving feed in " + IndexFields.INDEX_NAME + "/" + IndexFields.ENHANCED_FEEDS_TABLE);
IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)
.source(json);
// add to the bulkprocessor
bulkProcessor.add(ind);
}catch(IOException ioE){
_log.error(ioE.toString());
logger.error("Skipping insert feed with id " + enhFeedUUID, ioE);
}
}
try {
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
_log.error("[ENHANCED_FEEDS] bulk processor failed!", e);
throw new BulkInsertionFailedException();
}
// close bulk operations (use close() if there is no concurrent request as this is the case)
bulkProcessor.close();
}
/**
@ -297,11 +305,11 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
// retrieve attachments
ArrayList<Attachment> attachments = new ArrayList<Attachment>();
if (isMultiFileUpload) {
_log.debug("Retriving attachments for feed with id="+feed.getKey());
logger.debug("Retriving attachments for feed with id="+feed.getKey());
try {
attachments = (ArrayList<Attachment>) store.getAttachmentsByFeedId(feed.getKey());
} catch (FeedIDNotFoundException e) {
_log.error("It looks like sth wrong with this feedid having attachments, could not find feedId = " + feed.getKey() + "\n" + e.getMessage());
logger.error("It looks like sth wrong with this feedid having attachments, could not find feedId = " + feed.getKey() + "\n" + e.getMessage());
throw new Exception();
}
}
@ -320,7 +328,7 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
* @return
*/
private ArrayList<Comment> getAllCommentsByFeed(String feedid) {
//_log.trace("Asking comments for " + feedid);
//logger.trace("Asking comments for " + feedid);
ArrayList<Comment> toReturn = (ArrayList<Comment>) store.getAllCommentByFeed(feedid);
Collections.sort(toReturn);
return toReturn;

View File

@ -1,23 +1,38 @@
package org.gcube.socialnetworking.socialdataindexer;
import java.util.HashMap;
import java.util.Map;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.socialnetworking.social_data_indexing_common.ex.ServiceEndPointException;
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Tests {
/**
* Logger
*/
private static Logger logger = LoggerFactory.getLogger(Tests.class);
@Before
public void beforeTest(){
// set security token
SecurityTokenProvider.instance.set("422d795b-d978-41d5-abac-b1c8be90a632");
}
@Test
public void retrieveISInformation() {
public void testLaunch() {
logger.debug("Starting to test launch");
Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put("scope", "gcube");
SocialDataIndexerPlugin plugin = new SocialDataIndexerPlugin(null);
plugin.launch(inputs); //TODO: uncomment for testing purpose
logger.debug("-------------- launch test finished");
}
try {
@After
public void after(){
// set security token
SecurityTokenProvider.instance.set("422d795b-d978-41d5-abac-b1c8be90a632");
ElasticSearchRunningCluster esrc = new ElasticSearchRunningCluster("gcube");
System.err.println(esrc.getClusterName() + ", " + esrc.getHosts() + ", " + esrc.getPorts());
} catch (ServiceEndPointException e) {
System.err.println(e.toString());
} catch (Exception e) {
System.err.println(e.toString());
}
}
}