diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs index 29abf99..9896090 100644 --- a/.settings/org.eclipse.core.resources.prefs +++ b/.settings/org.eclipse.core.resources.prefs @@ -1,6 +1,5 @@ eclipse.preferences.version=1 encoding//src/main/java=UTF-8 encoding//src/main/resources=UTF-8 -encoding//src/test/java=UTF-8 encoding//src/test/resources=UTF-8 encoding/=UTF-8 diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java new file mode 100644 index 0000000..671a7c9 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.java @@ -0,0 +1,354 @@ +/** + * + */ +package org.gcube.socialnetworking.socialdataindexer; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.codehaus.jackson.map.ObjectMapper; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl; +import org.gcube.portal.databook.server.DatabookStore; +import org.gcube.portal.databook.shared.Attachment; +import org.gcube.portal.databook.shared.Comment; +import org.gcube.portal.databook.shared.EnhancedFeed; +import org.gcube.portal.databook.shared.Feed; +import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException; +import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster; +import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields; +import org.gcube.vremanagement.executor.plugin.Plugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The SocialDataIndexerPlugin synchronizes and indexes data coming from the cassandra + * cluster in the elasticsearch engine. + * @author Costantino Perciante at ISTI-CNR + * @author Luca Frosini (ISTI-CNR) + */ +public class SocialDataIndexerPlugin extends Plugin { + + //Logger + private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class); + + private String clusterName; + private List hostsToContact; + private List portNumbers; + + private TransportClient client; + private DatabookStore store; + + private int count = 0; + + public SocialDataIndexerPlugin(){ + super(); + logger.debug("{} contructor", this.getClass().getSimpleName()); + } + + /**{@inheritDoc} + * @throws Exception */ + @Override + public void launch(Map inputs) throws Exception{ + + try{ + + logger.info("Reading scope from ScopeProvider"); + + String scope = ScopeProvider.instance.get(); + + logger.info("Scope read is " + scope); + + // connection to cassandra (remove the first / from the scope) + store = new DBCassandraAstyanaxImpl(scope.replaceFirst("/", "")); + + // retrieve ElasticSearch Endpoint and set hosts/port numbers (remove the first / from the scope) + ElasticSearchRunningCluster elasticCluster = new ElasticSearchRunningCluster(scope.replaceFirst("/", "")); + + // save info + clusterName = elasticCluster.getClusterName(); + hostsToContact = elasticCluster.getHosts(); + portNumbers = elasticCluster.getPorts(); + + logger.info("Creating elasticsearch client connection for hosts = " + hostsToContact + ", ports = " + portNumbers + " and " + + " cluster's name = " + clusterName); + + // set cluster's name to check and the sniff property to true. + // Cluster's name: each node must have this name. + // Sniff property: allows the client to recover cluster's structure. + // Look at https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html + Settings settings = Settings.settingsBuilder() + .put("cluster.name", this.clusterName) // force unique cluster's name check + .put("client.transport.sniff", true) + .build(); + + // build the client + client = TransportClient.builder().settings(settings).build(); + + // add the nodes to contact + int reachableHosts = 0; + for (int i = 0; i < hostsToContact.size(); i++){ + try { + client.addTransportAddress( + new InetSocketTransportAddress( + InetAddress.getByName(hostsToContact.get(i)), portNumbers.get(i)) + ); + reachableHosts ++; + } catch (UnknownHostException e) { + logger.error("Error while adding " + hostsToContact.get(i) + ":" + portNumbers.get(i) + " as host to be contacted."); + } + } + + if(reachableHosts == 0){ + logger.error("Unable to reach elasticsearch cluster. Exiting ..."); + return; + } + + logger.info("Connection to ElasticSearch cluster done. Synchronization starts running..."); + final long init = System.currentTimeMillis(); + + // we build a index reachable under the path /social/enhanced_feed + List vreIds = store.getAllVREIds(); + + // save feeds & comments + for (String vreID : vreIds) { + try{ + List feeds = store.getAllFeedsByVRE(vreID); + addEnhancedFeedsInBulk(feeds, init); + logger.info("Number of indexed feeds is " + feeds.size() + " for vre " + vreID); + }catch(Exception e){ + logger.error("Exception while saving feeds/comments into the index for vre " + vreID, e); + continue; + } + + } + + logger.info("Inserted " + count + " docs into the index"); + + // refresh data (note that the index must be refreshed according to the new value of _timestamp) + client.admin().indices().prepareRefresh().execute().actionGet(); + + // delete documents with timestamp lower than init + deleteDocumentsWithTimestampLowerThan(init); + + long end = System.currentTimeMillis(); + logger.info("Synchronization thread ends running. It took " + (end - init) + " milliseconds " + + " that is " + (double)(end - init)/(1000.0 * 60.0) + " minutes."); + + }catch(Exception e){ + logger.error("Error while synchronizing data.", e); + throw e; + }finally{ + // close connection to elasticsearch + if(client != null){ + logger.info("Closing connection to elasticsearch cluster. " + client.toString()); + client.close(); + } + + // close connection to cassandra + if(store != null){ + logger.info("Closing connection to cassandra nodes. " + store.toString()); + store.closeConnection(); + } + } + } + + /** + * Add feeds into the elasticsearch index. + * @param feeds + * @param init is the timestamp that will be put in the document + * @throws BulkInsertionFailedException + */ + private void addEnhancedFeedsInBulk(List feeds, final long init) throws BulkInsertionFailedException { + logger.debug("Starting bulk insert enhanced feeds operation"); + BulkProcessor bulkProcessor = BulkProcessor.builder( + client, + new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, + BulkRequest request) { + logger.debug("Going to execute new bulk composed of {} actions", request.numberOfActions()); + } + @Override + public void afterBulk(long executionId, + BulkRequest request, + BulkResponse response) { + logger.debug("Executed bulk composed of {} actions", request.numberOfActions()); + if (response.hasFailures()) { + logger.warn("There was failures while executing bulk", response.buildFailureMessage()); + if (logger.isDebugEnabled()) { + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + logger.debug("Error for {}/{}/{} for {} operation: {}", item.getIndex(), + item.getType(), item.getId(), item.getOpType(), item.getFailureMessage()); + } + } + } + } + } + + @Override + public void afterBulk(long executionId, + BulkRequest request, + Throwable failure) { + logger.error("Error executing bulk", failure); + if(failure instanceof NoNodeAvailableException){ + throw new RuntimeException("No node available. Exiting..."); + } + } + }) + .setBulkActions(1000) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) + .setFlushInterval(TimeValue.timeValueSeconds(5)) + .setConcurrentRequests(0) + .setBackoffPolicy( + BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), 8)) + .build(); + + // save feeds + for (Feed feed: feeds) { + + String enhFeedUUID = null; + try{ + // enhance and convert + String json = enhanceAndConvertToJson(feed); + enhFeedUUID = feed.getKey(); + IndexRequest ind = new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, enhFeedUUID)// set timestamp + .timestamp(String.valueOf(init)) // add json object + .source(json); + + // add + bulkProcessor.add(ind); + + count++; + }catch(Exception e){ + logger.error("Skip inserting feed with id " + enhFeedUUID, e); + } + } + + // close bulk operations + try { + bulkProcessor.awaitClose(60000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for awaitClose()", e); + } + } + + /** + * Build enhanced feed and convert to json + * @param feed to enhanced and convert + * @return json object + * @throws Exception + */ + private String enhanceAndConvertToJson(Feed feed) throws Exception { + + boolean isMultiFileUpload = feed.isMultiFileUpload(); + + // retrieve attachments + ArrayList attachments = new ArrayList(); + if (isMultiFileUpload) { + logger.debug("Retrieving attachments for feed with id="+feed.getKey()); + attachments = (ArrayList) store.getAttachmentsByFeedId(feed.getKey()); + } + + // retrieve comments + ArrayList comments = getAllCommentsByFeed(feed.getKey()); + + // build enhanced feed + EnhancedFeed enFeed = new EnhancedFeed(feed, false, false, comments, attachments); + + // convert to json + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(enFeed); + } + + /** + * retrieve and sort comments given a feed id + * @param feedid + * @return + */ + private ArrayList getAllCommentsByFeed(String feedid) { + // logger.debug("Asking comments for " + feedid); + ArrayList toReturn = (ArrayList) store.getAllCommentByFeed(feedid); + Collections.sort(toReturn); + return toReturn; + } + + /** + * Delete disabled feeds in the index + * @param timestamp delete feeds whose _timestamp is lower than timestamp + * @return + */ + public void deleteDocumentsWithTimestampLowerThan(long timestamp) { + + logger.debug("Removing docs with timestamp lower than " + timestamp); + + // query on _timestamp field + BoolQueryBuilder filter = QueryBuilders.boolQuery(); + filter.must(QueryBuilders.matchAllQuery()); + filter.filter(QueryBuilders.rangeQuery("_timestamp").gte(0).lt(timestamp)); + + SearchResponse scrollResp = client.prepareSearch(IndexFields.INDEX_NAME) + .setSize(100) // get 100 elements at most at time + .setScroll(new TimeValue(60000)) // keep alive this query for 1 minute + .setQuery(filter) + .execute() + .actionGet(); + + int deleteDocs = 0; + + //Scroll until no more hits are returned + while (true) { + + for (SearchHit hit : scrollResp.getHits().getHits()) { + + String docID = hit.getId(); + DeleteResponse response = client.prepareDelete(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, docID).get(); + //logger.debug("deleting doc with id = " + docID + "...found? " + response.isFound()); // found of course.. + if(response.isFound()) + deleteDocs ++; + + } + + // more enhanced feeds requested + scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); + + //Break condition: No hits are returned + if (scrollResp.getHits().getHits().length == 0) { + logger.debug("No more hits to delete"); + break; + } + } + + logger.info("Number of delete documents is " + deleteDocs); + } + + /**{@inheritDoc}*/ + @Override + protected void onStop() throws Exception { + logger.debug("onStop()"); + Thread.currentThread().interrupt(); + } +} diff --git a/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/SendNotification.java b/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/SendNotification.java new file mode 100644 index 0000000..fbefd29 --- /dev/null +++ b/src/main/java/org/gcube/socialnetworking/socialdataindexer/utils/SendNotification.java @@ -0,0 +1,176 @@ +package org.gcube.socialnetworking.socialdataindexer.utils; + +import static org.gcube.resources.discovery.icclient.ICFactory.client; +import static org.gcube.resources.discovery.icclient.ICFactory.queryFor; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.util.List; +import java.util.Map; + +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.gcube.common.authorization.library.provider.SecurityTokenProvider; +import org.gcube.common.resources.gcore.GCoreEndpoint; +import org.gcube.common.scope.api.ScopeProvider; +import org.gcube.resources.discovery.client.api.DiscoveryClient; +import org.gcube.resources.discovery.client.queries.api.SimpleQuery; +import org.gcube.vremanagement.executor.plugin.PluginStateEvolution; +import org.gcube.vremanagement.executor.plugin.PluginStateNotification; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Send a notification to the interested person. + * @author Costantino Perciante at ISTI-CNR (costantino.perciante@isti.cnr.it) + */ +public class SendNotification extends PluginStateNotification { + + private Map pluginInputs; + private static Logger logger = LoggerFactory.getLogger(SendNotification.class); + private static final String NOTIFY_METHOD = "2/notifications/notify-job-status"; + private static final String resource = "jersey-servlet"; + private static final String serviceName = "SocialNetworking"; + private static final String serviceClass = "Portal"; + + // user to contact on exception via social-networking ws + private final static String RECIPIENT_KEY = "recipient"; + + // service name + private final static String SERVICE_NAME = "Smart-Executor"; + + public SendNotification(Map inputs) { + super(inputs); + this.pluginInputs = inputs; + } + + @SuppressWarnings("unchecked") + @Override + public void pluginStateEvolution(PluginStateEvolution pluginStateEvolution, + Exception exception) throws Exception { + + switch(pluginStateEvolution.getPluginState()){ + + // case DONE: + case STOPPED: + case FAILED: + case DISCARDED: + + // check what happened + String recipient = pluginInputs.get(RECIPIENT_KEY); + String basePath = discoverEndPoint(); + logger.info("Recipient of the notification is " + recipient + ". Base path found for the notification service is " + basePath); + + if(basePath != null && recipient != null){ + + basePath = basePath.endsWith("/") ? basePath : basePath + "/"; + basePath += NOTIFY_METHOD + "?gcube-token=" + SecurityTokenProvider.instance.get(); + basePath = basePath.trim(); + + try(CloseableHttpClient httpClient = HttpClientBuilder.create().build()){ + JSONObject obj = new JSONObject(); + obj.put("job_id", pluginStateEvolution.getUUID().toString()); + obj.put("recipient", recipient); + obj.put("job_name", pluginStateEvolution.getPluginDefinition().getName()); + obj.put("service_name", SERVICE_NAME); + // if(pluginStateEvolution.getPluginState().equals(PluginState.DONE)) + // obj.put("status", "SUCCEEDED"); + // else{ + obj.put("status", "FAILED"); + obj.put("status_message", "original status reported by " + SERVICE_NAME + " was " + pluginStateEvolution.getPluginState() + + ". Exception is " + exception != null ? exception.getMessage() : null); + // } + logger.debug("Request json is going to be " + obj.toJSONString()); + + HttpResponse response = performRequest(httpClient, basePath, obj.toJSONString()); + + logger.info(response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase()); + + int status = response.getStatusLine().getStatusCode(); + + if(status == HttpURLConnection.HTTP_OK){ + logger.info("Notification sent"); + } + else if(status == HttpURLConnection.HTTP_MOVED_TEMP + || status == HttpURLConnection.HTTP_MOVED_PERM + || status == HttpURLConnection.HTTP_SEE_OTHER){ + + // redirect -> fetch new location + Header[] locations = response.getHeaders("Location"); + Header lastLocation = locations[locations.length - 1]; + String realLocation = lastLocation.getValue(); + logger.info("New location is " + realLocation); + + response = performRequest(httpClient, realLocation, obj.toJSONString()); + + logger.info(" " + response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase()); + }else + logger.warn(" " + response.getStatusLine().getStatusCode() + " and response message is " + response.getStatusLine().getReasonPhrase()); + }catch (Exception e) { + logger.warn("Something failed when trying to notify the user", e); + } + } + break; + default: logger.info("No notification is going to be sent, because the status of the plugin execution is " + pluginStateEvolution.getPluginState().name()); + } + } + + /** + * Perform the post/json request + * @param httpClient + * @param path + * @param params + * @return + * @throws ClientProtocolException + * @throws IOException + */ + private static HttpResponse performRequest(CloseableHttpClient httpClient, String path, String params) throws ClientProtocolException, IOException{ + + HttpPost request = new HttpPost(path); + StringEntity paramsEntity = new StringEntity(params, ContentType.APPLICATION_JSON); + request.setEntity(paramsEntity); + return httpClient.execute(request); + + } + + /** + * Discover the social networking service base path. + * @return base path of the service. + */ + private static String discoverEndPoint(){ + String context = ScopeProvider.instance.get(); + String basePath = null; + try{ + + SimpleQuery query = queryFor(GCoreEndpoint.class); + query.addCondition(String.format("$resource/Profile/ServiceClass/text() eq '%s'",serviceClass)); + query.addCondition("$resource/Profile/DeploymentData/Status/text() eq 'ready'"); + query.addCondition(String.format("$resource/Profile/ServiceName/text() eq '%s'",serviceName)); + query.setResult("$resource/Profile/AccessPoint/RunningInstanceInterfaces//Endpoint[@EntryName/string() eq \""+resource+"\"]/text()"); + + DiscoveryClient client = client(); + List endpoints = client.submit(query); + if (endpoints == null || endpoints.isEmpty()) + throw new Exception("Cannot retrieve the GCoreEndpoint serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+context); + + basePath = endpoints.get(0); + if(basePath==null) + throw new Exception("Endpoint:"+resource+", is null for serviceName: "+serviceName +", serviceClass: " +serviceClass +", in scope: "+context); + + logger.info("found entryname "+basePath+" for ckanResource: "+resource); + + }catch(Exception e){ + logger.error("Unable to retrieve such service endpoint information!", e); + } + + return basePath; + } + +}