package org.gcube.dataharvest; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.SortedSet; import org.gcube.accounting.accounting.summary.access.AccountingDao; import org.gcube.accounting.accounting.summary.access.model.ScopeDescriptor; import org.gcube.accounting.accounting.summary.access.model.internal.Dimension; import org.gcube.accounting.accounting.summary.access.model.update.AccountingRecord; import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.scope.impl.ScopeBean; import org.gcube.common.scope.impl.ScopeBean.Type; import org.gcube.dataharvest.harvester.CatalogueAccessesHarvester; import org.gcube.dataharvest.harvester.CoreServicesAccessesHarvester; import org.gcube.dataharvest.harvester.JupyterAccessesHarvester; import org.gcube.dataharvest.harvester.MethodInvocationHarvester; import org.gcube.dataharvest.harvester.SocialInteractionsHarvester; import org.gcube.dataharvest.harvester.VREAccessesHarvester; import org.gcube.dataharvest.harvester.VREUsersHarvester; import org.gcube.dataharvest.harvester.sobigdata.DataMethodDownloadHarvester; import org.gcube.dataharvest.harvester.sobigdata.ResourceCatalogueHarvester; import org.gcube.dataharvest.harvester.sobigdata.TagMeMethodInvocationHarvester; import org.gcube.dataharvest.utils.AggregationType; import org.gcube.dataharvest.utils.ContextAuthorization; import org.gcube.dataharvest.utils.DateUtils; import org.gcube.dataharvest.utils.Utils; import org.gcube.vremanagement.executor.plugin.Plugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Eric Perrone (ISTI - CNR) * @author Luca Frosini (ISTI - CNR) */ public class AccountingDashboardHarvesterPlugin extends Plugin { private static Logger logger = LoggerFactory.getLogger(AccountingDashboardHarvesterPlugin.class); private static final String PROPERTY_FILENAME = "config.properties"; public static final String START_DATE_INPUT_PARAMETER = "startDate"; public static final String MEASURE_TYPE_INPUT_PARAMETER = "measureType"; public static final String RERUN_INPUT_PARAMETER = "reRun"; public static final String GET_VRE_USERS_INPUT_PARAMETER = "getVREUsers"; public static final String DRY_RUN_INPUT_PARAMETER = "dryRun"; /** * Allows partial harvesting of data of the current period. This means that * in MONTHLY aggregation type the current month is harvested instead of the * previous month which is done when the month is completed. This allow the * portlet to display monthly data in the current moth even the data is * partial (till the current day). */ public static final String PARTIAL_HARVESTING = "partialHarvesting"; public static final String SO_BIG_DATA_VO = "/d4science.research-infrastructures.eu/SoBigData"; public static final String SO_BIG_DATA_EU_VRE = "/d4science.research-infrastructures.eu/gCubeApps/SoBigData.eu"; public static final String SO_BIG_DATA_IT_VRE = "/d4science.research-infrastructures.eu/gCubeApps/SoBigData.it"; public static final String SO_BIG_DATA_CATALOGUE_CONTEXT = "/d4science.research-infrastructures.eu/SoBigData/ResourceCatalogue"; public static final String TAGME_CONTEXT = "/d4science.research-infrastructures.eu/SoBigData/TagMe"; public static final String TO_BE_SET = "TO BE SET"; protected Date start; protected Date end; public AccountingDashboardHarvesterPlugin() { super(); } private static final InheritableThreadLocal properties = new InheritableThreadLocal() { @Override protected Properties initialValue() { return new Properties(); } }; public static InheritableThreadLocal getProperties() { return properties; } public static Dimension getDimension(String key) { Dimension dimension = dimensions.get().get(key); if (dimension == null) { dimension = new Dimension(key, key, null, key); } return dimension; } protected static final InheritableThreadLocal> dimensions = new InheritableThreadLocal>() { @Override protected Map initialValue() { return new HashMap<>(); } }; public static ScopeDescriptor getScopeDescriptor(String context) { return scopeDescriptors.get().get(context); } public static final InheritableThreadLocal> scopeDescriptors = new InheritableThreadLocal>() { @Override protected Map initialValue() { return new HashMap<>(); } }; public static ScopeDescriptor getScopeDescriptor() { return scopeDescriptor.get(); } public static final InheritableThreadLocal scopeDescriptor = new InheritableThreadLocal() { @Override protected ScopeDescriptor initialValue() { return new ScopeDescriptor("", ""); } }; public Properties getConfigParameters() throws IOException { Properties properties = new Properties(); try { InputStream input = AccountingDashboardHarvesterPlugin.class.getClassLoader() .getResourceAsStream(PROPERTY_FILENAME); properties.load(input); return properties; } catch (Exception e) { logger.warn( "Unable to load {} file containing configuration properties. AccountingDataHarvesterPlugin will use defaults", PROPERTY_FILENAME); } return properties; } /** {@inheritDoc} */ @Override public void launch(Map inputs) throws Exception { logger.debug("{} is starting", this.getClass().getSimpleName()); if (inputs == null || inputs.isEmpty()) { throw new IllegalArgumentException("The can only be launched providing valid input parameters"); } if (!inputs.containsKey(MEASURE_TYPE_INPUT_PARAMETER)) { throw new IllegalArgumentException("Please set required parameter '" + MEASURE_TYPE_INPUT_PARAMETER + "'"); } AggregationType aggregationType = AggregationType.valueOf((String) inputs.get(MEASURE_TYPE_INPUT_PARAMETER)); boolean reRun = true; if (inputs.containsKey(RERUN_INPUT_PARAMETER)) { try { reRun = (boolean) inputs.get(RERUN_INPUT_PARAMETER); } catch (Exception e) { throw new IllegalArgumentException("'" + RERUN_INPUT_PARAMETER + "' must be a boolean"); } } boolean getVREUsers = true; if (inputs.containsKey(GET_VRE_USERS_INPUT_PARAMETER)) { try { getVREUsers = (boolean) inputs.get(GET_VRE_USERS_INPUT_PARAMETER); } catch (Exception e) { throw new IllegalArgumentException("'" + GET_VRE_USERS_INPUT_PARAMETER + "' must be a boolean"); } } boolean dryRun = true; if (inputs.containsKey(DRY_RUN_INPUT_PARAMETER)) { try { dryRun = (boolean) inputs.get(DRY_RUN_INPUT_PARAMETER); } catch (Exception e) { throw new IllegalArgumentException("'" + DRY_RUN_INPUT_PARAMETER + "' must be a boolean"); } } boolean partialHarvesting = false; if (inputs.containsKey(PARTIAL_HARVESTING)) { partialHarvesting = (boolean) inputs.get(PARTIAL_HARVESTING); } if (inputs.containsKey(START_DATE_INPUT_PARAMETER)) { String startDateString = (String) inputs.get(START_DATE_INPUT_PARAMETER); start = DateUtils.UTC_DATE_FORMAT.parse(startDateString + " " + DateUtils.UTC); } else { start = DateUtils.getPreviousPeriod(aggregationType, partialHarvesting).getTime(); } end = DateUtils.getEndDateFromStartDate(aggregationType, start, 1, partialHarvesting); logger.debug("Harvesting from {} to {} (ReRun:{} - GetVREUsers:{} - DryRun:{})", DateUtils.format(start), DateUtils.format(end), reRun, getVREUsers, dryRun); Properties properties = getConfigParameters(); getProperties().set(properties); ContextAuthorization contextAuthorization = new ContextAuthorization(); SortedSet contexts = contextAuthorization.getContexts(); String root = contexts.first(); Utils.setContext(contextAuthorization.getTokenForContext(root)); AccountingDao dao = AccountingDao.get(); Set scopeDescriptorSet = dao.getContexts(); Map scopeDescriptorMap = new HashMap<>(); for (ScopeDescriptor scopeDescriptor : scopeDescriptorSet) { scopeDescriptorMap.put(scopeDescriptor.getId(), scopeDescriptor); } scopeDescriptors.set(scopeDescriptorMap); Set dimensionSet = dao.getDimensions(); Map dimensionMap = new HashMap<>(); for (Dimension dimension : dimensionSet) { dimensionMap.put(dimension.getId(), dimension); } dimensions.set(dimensionMap); ArrayList accountingRecords = new ArrayList(); String initialToken = SecurityTokenProvider.instance.get(); VREAccessesHarvester vreAccessesHarvester = null; JupyterAccessesHarvester jupyterAccessesHarvester = null; for (String context : contexts) { // Setting the token for the context Utils.setContext(contextAuthorization.getTokenForContext(context)); ScopeBean scopeBean = new ScopeBean(context); ScopeDescriptor actualScopeDescriptor = scopeDescriptorMap.get(context); if (actualScopeDescriptor == null) { actualScopeDescriptor = new ScopeDescriptor(scopeBean.name(), context); } scopeDescriptor.set(actualScopeDescriptor); if (scopeBean.is(Type.INFRASTRUCTURE)) { try { CatalogueAccessesHarvester catalogueHarvester = new CatalogueAccessesHarvester(start, end); List harvested = catalogueHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); CoreServicesAccessesHarvester coreServicesHarvester = new CoreServicesAccessesHarvester(start, end); List records = coreServicesHarvester.getAccountingRecords(); accountingRecords.addAll(records); } catch (Exception e) { logger.error("Error harvesting {} for {}", CatalogueAccessesHarvester.class.getSimpleName(), context, e); } } if (vreAccessesHarvester == null) { if (scopeBean.is(Type.INFRASTRUCTURE)) { vreAccessesHarvester = new VREAccessesHarvester(start, end); } else { // This code should be never used because the scopes are // sorted by fullname ScopeBean parent = scopeBean.enclosingScope(); while (!parent.is(Type.INFRASTRUCTURE)) { parent = scopeBean.enclosingScope(); } // Setting back token for the context Utils.setContext(contextAuthorization.getTokenForContext(parent.toString())); vreAccessesHarvester = new VREAccessesHarvester(start, end); // Setting back token for the context Utils.setContext(contextAuthorization.getTokenForContext(context)); } } if (jupyterAccessesHarvester == null) { if (scopeBean.is(Type.INFRASTRUCTURE)) { jupyterAccessesHarvester = new JupyterAccessesHarvester(start, end); } else { // This code should be never used because the scopes are // sorted by fullname ScopeBean parent = scopeBean.enclosingScope(); while (!parent.is(Type.INFRASTRUCTURE)) { parent = scopeBean.enclosingScope(); } // Setting back token for the context Utils.setContext(contextAuthorization.getTokenForContext(parent.toString())); jupyterAccessesHarvester = new JupyterAccessesHarvester(start, end); // Setting back token for the context Utils.setContext(contextAuthorization.getTokenForContext(context)); } } if ((context.startsWith(SO_BIG_DATA_VO) || context.startsWith(SO_BIG_DATA_EU_VRE) || context.startsWith(SO_BIG_DATA_IT_VRE)) && start.before(DateUtils.getStartCalendar(2018, Calendar.APRIL, 1).getTime())) { logger.info("Not Harvesting for {} from {} to {}", context, DateUtils.format(start), DateUtils.format(end)); } else { try { // Collecting Google Analytics Data for VREs Accesses logger.info("Going to harvest VRE Accesses for {}", context); List harvested = vreAccessesHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * vreAccessesHarvester.getData(); data.addAll(harvested); */ } catch (Exception e) { logger.error("Error harvesting VRE Accesses for {}", context, e); } try { // Collecting Google Analytics Data for Jupyters Accesses logger.info("Going to harvest Jupyter Accesses for {}", context); List harvested = jupyterAccessesHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * jupyterAccessesHarvester.getData(); * data.addAll(harvested); */ } catch (Exception e) { logger.error("Error harvesting VRE Accesses for {}", context, e); } try { // Collecting info on social (posts, replies and likes) logger.info("Going to harvest Social Interactions for {}", context); SocialInteractionsHarvester socialHarvester = new SocialInteractionsHarvester(start, end); List harvested = socialHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * socialHarvester.getData(); data.addAll(harvested); */ } catch (Exception e) { logger.error("Error harvesting Social Interactions for {}", context, e); } try { // Collecting info on VRE users if (getVREUsers) { // Harvesting Users only for VREs (not for VO and ROOT // which is the sum of the children contexts) // The VREUsers can be only Harvested for the last month if (scopeBean.is(Type.VRE) && start .equals(DateUtils.getPreviousPeriod(aggregationType, partialHarvesting).getTime())) { logger.info("Going to harvest Context Users for {}", context); VREUsersHarvester vreUsersHarvester = new VREUsersHarvester(start, end); List harvested = vreUsersHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * vreUsersHarvester.getData(); * data.addAll(harvested); */ } } } catch (Exception e) { logger.error("Error harvesting Context Users for {}", context, e); } if (context.startsWith(SO_BIG_DATA_CATALOGUE_CONTEXT)) { try { // Collecting info on Resource Catalogue (Dataset, // Application, Deliverables, Methods) logger.info("Going to harvest Resource Catalogue Information for {}", context); ResourceCatalogueHarvester resourceCatalogueHarvester = new ResourceCatalogueHarvester(start, end, contexts); List harvested = resourceCatalogueHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * resourceCatalogueHarvester.getData(); * data.addAll(harvested); */ } catch (Exception e) { logger.error("Error harvesting Resource Catalogue Information for {}", context, e); } try { // Collecting info on Data/Method download logger.info("Going to harvest Data Method Download for {}", context); DataMethodDownloadHarvester dataMethodDownloadHarvester = new DataMethodDownloadHarvester(start, end, contexts); List harvested = dataMethodDownloadHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); } catch (Exception e) { logger.error("Error harvesting Data Method Download for {}", context, e); } } if (context.startsWith(TAGME_CONTEXT)) { try { // Collecting info on method invocation logger.info("Going to harvest Method Invocations for {}", context); TagMeMethodInvocationHarvester tagMeMethodInvocationHarvester = new TagMeMethodInvocationHarvester( start, end); List harvested = tagMeMethodInvocationHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * tagMeMethodInvocationHarvester.getData(); * data.addAll(harvested); */ } catch (Exception e) { logger.error("Error harvesting Method Invocations for {}", context, e); } } else { try { // Collecting info on method invocation logger.info("Going to harvest Method Invocations for {}", context); MethodInvocationHarvester methodInvocationHarvester = new MethodInvocationHarvester(start, end); List harvested = methodInvocationHarvester.getAccountingRecords(); accountingRecords.addAll(harvested); /* * List harvested = * methodInvocationHarvester.getData(); * data.addAll(harvested); */ } catch (Exception e) { logger.error("Error harvesting Method Invocations for {}", context, e); } } } } Utils.setContext(initialToken); logger.debug("Harvest Measures from {} to {} are {}", DateUtils.format(start), DateUtils.format(end), accountingRecords); if (!dryRun) { dao.insertRecords(accountingRecords.toArray(new AccountingRecord[1])); } else { logger.debug("Harvested measures are {}", accountingRecords); } } /** {@inheritDoc} */ @Override protected void onStop() throws Exception { logger.debug("{} is stopping", this.getClass().getSimpleName()); } }