added SendNotification class to send notifications on failure

git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/social-networking/social-data-indexer-se-plugin@142412 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
Costantino Perciante 2017-02-08 18:34:05 +00:00
parent 58a3e89c56
commit 507493cd2e
4 changed files with 144 additions and 20 deletions

View File

@ -25,6 +25,7 @@
<serviceClass>social-networking</serviceClass>
<elasticSearchVersion>2.2.0</elasticSearchVersion>
<guavaVersion>18.0</guavaVersion>
<apache.http.version>4.1.2</apache.http.version>
<astyanaxVersion>2.0.2</astyanaxVersion>
</properties>
@ -119,6 +120,12 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${apache.http.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -53,19 +53,11 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
//Logger
private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
// the cluster name
private String clusterName;
// list of hosts to contact
private List<String> hostsToContact;
// private port number
private List<Integer> portNumbers;
// the elasticsearch client
private TransportClient client;
// connection to cassandra
private DatabookStore store;
private int count = 0;
@ -77,9 +69,10 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
}
/**{@inheritDoc}*/
/**{@inheritDoc}
* @throws Exception */
@Override
public void launch(Map<String, Object> inputs){
public void launch(Map<String, Object> inputs) throws Exception{
try{
@ -89,11 +82,6 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
logger.info("Scope read is " + scope);
if(scope == null || scope.isEmpty()){
logger.error("Running scope is missing! Exiting");
return;
}
// connection to cassandra (remove the first / from the scope)
store = new DBCassandraAstyanaxImpl(scope.replaceFirst("/", ""));
@ -171,9 +159,8 @@ public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDecla
" that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes.");
}catch(Exception e){
logger.error("Error while synchronizing data.", e);
throw e;
}finally{
// close connection to elasticsearch
if(client != null){

View File

@ -29,9 +29,8 @@ public class SocialDataIndexerPluginDeclaration implements PluginDeclaration {
* Plugin name used by the Executor to retrieve this class
*/
public static final String NAME = "social-data-indexer-plugin";
public static final String DESCRIPTION = "The social-data-indexer-plugin has the role to index data contained"
+ " in the Cassandra cluster using an elasticsearch index to support full-text search.";
public static final String VERSION = "1.1.1";
public static final String DESCRIPTION = "The social-data-indexer-plugin has the role to index data contained into a Cassandra cluster using an Elasticsearch index to support full-text search.";
public static final String VERSION = "1.1.2";
/**{@inheritDoc}*/
@Override

View File

@ -0,0 +1,131 @@
package org.gcube.socialnetworking.socialdataindexer.utils;
import static org.gcube.resources.discovery.icclient.ICFactory.client;
import static org.gcube.resources.discovery.icclient.ICFactory.queryFor;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.resources.gcore.GCoreEndpoint;
import org.gcube.common.scope.api.ScopeProvider;
import org.gcube.resources.discovery.client.api.DiscoveryClient;
import org.gcube.resources.discovery.client.queries.api.SimpleQuery;
import org.gcube.vremanagement.executor.plugin.PluginStateEvolution;
import org.gcube.vremanagement.executor.plugin.PluginStateNotification;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Send a notification to the interested person.
* @author Costantino Perciante at ISTI-CNR (costantino.perciante@isti.cnr.it)
*/
public class SendNotification extends PluginStateNotification {
private Map<String, String> pluginInputs;
private static Logger logger = LoggerFactory.getLogger(SendNotification.class);
private static final String NOTIFY_METHOD = "/2/notifications/notify-job-status";
private static final String resource = "jersey-servlet";
private static final String serviceName = "SocialNetworking";
private static final String serviceClass = "Portal";
// user to contact on exception via social-networking ws
private final static String RECIPIENT_KEY = "recipient";
// service name
private final static String SERVICE_NAME = "Smart Executor";
public SendNotification(Map<String, String> inputs) {
super(inputs);
this.pluginInputs = inputs;
}
@SuppressWarnings("unchecked")
@Override
public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution,
Exception exception) throws Exception {
switch(pluginStateEvolution.getPluginState()){
case STOPPED:
case FAILED:
case DISCARDED:
// check what happened
String recipient = pluginInputs.get(RECIPIENT_KEY);
String basePath = discoverEndPoint();
logger.info("Recipient of the notification is " + recipient + ". Base path found for the notification service is " + basePath);
if(basePath != null && recipient != null){
basePath = basePath.contains("http:") ? basePath.replace("http", "https") : basePath;
HttpPost request = new HttpPost(basePath + NOTIFY_METHOD);
DefaultHttpClient httpClient = new DefaultHttpClient();
JSONObject obj = new JSONObject();
obj.put("job_id", pluginStateEvolution.getUuid());
obj.put("recipient", recipient);
obj.put("job_name", pluginStateEvolution.getPluginDeclaration().getName());
obj.put("service_name", SERVICE_NAME);
obj.put("status", "FAILED");
obj.put("status_message", "original status reported by " + SERVICE_NAME + " was " + pluginStateEvolution.getPluginState()
+ ". Exception is " + exception != null ? exception.getMessage() : null);
logger.debug("Request json is going to be " + obj.toJSONString());
try{
request.addHeader("gcube-token", SecurityTokenProvider.instance.get());
StringEntity params = new StringEntity(obj.toJSONString());
request.setEntity(params);
HttpResponse response = httpClient.execute(request);
logger.info(" " + response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase());
}catch (Exception ex) {
logger.error("Error while sending notification ", ex);
}finally{
if(httpClient != null)
httpClient.getConnectionManager().shutdown();
}
}
break;
default: logger.info("No notification is going to be sent, because the status of the plugin execution is " + pluginStateEvolution.getPluginState().name());
}
}
/**
* Discover the social networking service base path.
* @return base path of the service.
*/
private static String discoverEndPoint(){
String context = ScopeProvider.instance.get();
String basePath = null;
try{
SimpleQuery query = queryFor(GCoreEndpoint.class);
query.addCondition(String.format("$resource/Profile/ServiceClass/text() eq '%s'",serviceClass));
query.addCondition("$resource/Profile/DeploymentData/Status/text() eq 'ready'");
query.addCondition(String.format("$resource/Profile/ServiceName/text() eq '%s'",serviceName));
query.setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces//Endpoint[@EntryName/string() eq \""+resource+"\"]/text()");
DiscoveryClient<String> client = client();
List<String> endpoints = client.submit(query);
if (endpoints == null || endpoints.isEmpty())
throw new Exception("Cannot retrieve the GCoreEndpoint serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+context);
basePath = endpoints.get(0);
if(basePath==null)
throw new Exception("Endpoint:"+resource+", is null for serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+context);
logger.info("found entyname "+basePath+" for ckanResource: "+resource);
}catch(Exception e){
logger.error("Unable to retrieve such service endpoint information!", e);
}
return basePath;
}
}