diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml index b034528ba4..aa2078be5d 100644 --- a/dhp-workflows/dhp-actionmanager/pom.xml +++ b/dhp-workflows/dhp-actionmanager/pom.xml @@ -53,5 +53,10 @@ ${project.version} + + eu.dnetlib + dnet-actionmanager-api + + diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java new file mode 100644 index 0000000000..1cb1eb4bcf --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java @@ -0,0 +1,122 @@ +package eu.dnetlib.dhp.actionmanager; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import eu.dnetlib.actionmanager.rmi.ActionManagerException; +import eu.dnetlib.actionmanager.set.ActionManagerSet; +import eu.dnetlib.actionmanager.set.ActionManagerSet.ImpactTypes; +import eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJob; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.dom4j.Document; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +public class ISClient implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(PartitionActionSetsByPayloadTypeJob.class); + + private static final String INPUT_ACTION_SET_ID_SEPARATOR = ","; + + public static List getLatestRawsetPaths(String isLookupUrl, String setIds) { + + ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + ISClient isClient = new ISClient(); + List ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR) + .omitEmptyStrings() + .trimResults() + .split(setIds)); + + return ids.stream() + .map(id -> isClient.getSet(isLookup, id)) + .map(as -> as.getPathToLatest()) + .collect(Collectors.toCollection(ArrayList::new)); + } + + private ActionManagerSet getSet(ISLookUpService isLookup, final String setId) { + + final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') " + + "where $x//SET/@id = '" + setId + "' return $x"; + + try { + final String basePath = getBasePathHDFS(isLookup); + final String setProfile = isLookup.getResourceProfileByQuery(q); + return getActionManagerSet(basePath, setProfile); + } catch (ISLookUpException | ActionManagerException e) { + throw new RuntimeException("Error accessing Sets, using query: " + q); + } + } + + private ActionManagerSet getActionManagerSet(final String basePath, final String profile) throws ActionManagerException { + final SAXReader reader = new SAXReader(); + final ActionManagerSet set = new ActionManagerSet(); + + try { + final Document doc = reader.read(new StringReader(profile)); + + set.setId(doc.valueOf("//SET/@id").trim()); + set.setName(doc.valueOf("//SET").trim()); + set.setImpact(ImpactTypes.valueOf(doc.valueOf("//IMPACT").trim())); + set.setLatest(doc.valueOf("//RAW_SETS/LATEST/@id"), doc.valueOf("//RAW_SETS/LATEST/@creationDate"), doc.valueOf("//RAW_SETS/LATEST/@lastUpdate")); + set.setDirectory(doc.valueOf("//SET/@directory")); + final List expiredNodes = doc.selectNodes("//RAW_SETS/EXPIRED"); + if (expiredNodes != null) { + for (int i = 0; i < expiredNodes.size(); i++) { + Element ex = (Element) expiredNodes.get(i); + set.addExpired(ex.attributeValue("id"), ex.attributeValue("creationDate"), ex.attributeValue("lastUpdate")); + } + } + + final StringBuilder sb = new StringBuilder(); + sb.append(basePath); + sb.append("/"); + sb.append(doc.valueOf("//SET/@directory")); + sb.append("/"); + sb.append(doc.valueOf("//RAW_SETS/LATEST/@id")); + set.setPathToLatest(sb.toString()); + + return set; + } catch (Exception e) { + throw new ActionManagerException("Error creating set from profile: " + profile, e); + } + } + + private String getBasePathHDFS(ISLookUpService isLookup) throws ActionManagerException { + return queryServiceProperty(isLookup, "basePath"); + } + + private String queryServiceProperty(ISLookUpService isLookup, final String propertyName) throws ActionManagerException { + final String q = "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ActionManagerServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='" + + propertyName + "']/@value/string()"; + log.debug("quering for service property: " + q); + try { + final List value = isLookup.quickSearchProfile(q); + return Iterables.getOnlyElement(value); + } catch (ISLookUpException e) { + String msg = "Error accessing service profile, using query: " + q; + log.error(msg, e); + throw new ActionManagerException(msg, e); + } catch (NoSuchElementException e) { + String msg = "missing service property: " + propertyName; + log.error(msg, e); + throw new ActionManagerException(msg, e); + } catch (IllegalArgumentException e) { + String msg = "found more than one service property: " + propertyName; + log.error(msg, e); + throw new ActionManagerException(msg, e); + } + } + + +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java index 003f6dc6c5..8ba3316267 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.actionmanager.partition; +import eu.dnetlib.dhp.actionmanager.ISClient; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -40,8 +41,6 @@ public class PartitionActionSetsByPayloadTypeJob { StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty()) )); - private static final String INPUT_ACTION_SET_PATHS_SEPARATOR = ","; - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils.toString( PromoteActionPayloadForGraphTableJob.class @@ -55,21 +54,25 @@ public class PartitionActionSetsByPayloadTypeJob { .orElse(Boolean.TRUE); logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputActionSetPaths = parser.get("inputActionSetPaths"); - logger.info("inputActionSetPaths: {}", inputActionSetPaths); + String inputActionSetIds = parser.get("inputActionSetIds"); + logger.info("inputActionSetIds: {}", inputActionSetIds); String outputPath = parser.get("outputPath"); logger.info("outputPath: {}", outputPath); + String isLookupUrl = parser.get("isLookupUrl"); + logger.info("isLookupUrl: {}", isLookupUrl); + + List inputActionSetPaths = ISClient.getLatestRawsetPaths(isLookupUrl, inputActionSetIds); + logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths)); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - readAndWriteActionSetsFromPaths(spark, - Arrays.asList(inputActionSetPaths.split(INPUT_ACTION_SET_PATHS_SEPARATOR)), - outputPath); + readAndWriteActionSetsFromPaths(spark, inputActionSetPaths, outputPath); }); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index abad4f2107..68bb35c2bb 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -121,8 +121,8 @@ public class PromoteActionPayloadForGraphTableJob { logger.info("Reading graph table from path: {}", path); return spark .read() - .textFile(path) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz)); + .parquet(path) + .as(Encoders.bean(rowClazz)); } private static Dataset readActionPayload(SparkSession spark, diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json index c2594ba496..ad58fe7546 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json @@ -6,9 +6,9 @@ "paramRequired": false }, { - "paramName": "iasp", - "paramLongName": "inputActionSetPaths", - "paramDescription": "comma separated list of action sets to partition by payload type", + "paramName": "iasi", + "paramLongName": "inputActionSetIds", + "paramDescription": "comma separated list of action set ids to partition by payload type", "paramRequired": true }, { @@ -16,5 +16,11 @@ "paramLongName": "outputPath", "paramDescription": "root output location for partitioned action sets", "paramRequired": true + }, + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml index 4f54c0699e..25afc34c99 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml @@ -41,8 +41,12 @@ root location of input materialized graph - inputActionSetPaths - comma separated list of action sets to promote + isLookupUrl + URL of the ISLookupService + + + inputActionSetIds + comma separated list of action set ids to promote outputGraphRootPath @@ -121,8 +125,9 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --inputActionSetPaths${inputActionSetPaths} + --inputActionSetIds${inputActionSetIds} --outputPath${workingDir}/action_payload_by_type + --isLookupUrl${isLookupUrl} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 11c8f71a15..d33bb0211c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -2,7 +2,7 @@ - graphBasePath + graphOutputPath the target path to store raw graph @@ -343,7 +343,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication @@ -369,7 +369,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset @@ -395,7 +395,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation @@ -421,7 +421,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software @@ -447,7 +447,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct @@ -473,7 +473,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource @@ -499,7 +499,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization @@ -525,7 +525,7 @@ --rawGraphPath${workingDir}/graph_raw --claimsGraphPath${workingDir}/graph_claims - --outputRawGaphPath${graphBasePath}/graph_raw + --outputRawGaphPath${graphOutputPath} --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project diff --git a/pom.xml b/pom.xml index 52c6015176..1d36d42f91 100644 --- a/pom.xml +++ b/pom.xml @@ -293,6 +293,12 @@ dnet-actionmanager-common 6.0.5 + + eu.dnetlib + dnet-actionmanager-api + [4.0.1,5.0.0) + + eu.dnetlib dnet-openaire-data-protos