forked from D-Net/dnet-hadoop
reorganizing parameter names in the provision workflow
This commit is contained in:
parent
6b5f9ca9cb
commit
82e8341f50
|
@ -53,5 +53,10 @@
|
|||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
package eu.dnetlib.dhp.actionmanager;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.actionmanager.rmi.ActionManagerException;
|
||||
import eu.dnetlib.actionmanager.set.ActionManagerSet;
|
||||
import eu.dnetlib.actionmanager.set.ActionManagerSet.ImpactTypes;
|
||||
import eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJob;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.Element;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ISClient implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PartitionActionSetsByPayloadTypeJob.class);
|
||||
|
||||
private static final String INPUT_ACTION_SET_ID_SEPARATOR = ",";
|
||||
|
||||
public static List<String> getLatestRawsetPaths(String isLookupUrl, String setIds) {
|
||||
|
||||
ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
ISClient isClient = new ISClient();
|
||||
List<String> ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR)
|
||||
.omitEmptyStrings()
|
||||
.trimResults()
|
||||
.split(setIds));
|
||||
|
||||
return ids.stream()
|
||||
.map(id -> isClient.getSet(isLookup, id))
|
||||
.map(as -> as.getPathToLatest())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
private ActionManagerSet getSet(ISLookUpService isLookup, final String setId) {
|
||||
|
||||
final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
|
||||
+ "where $x//SET/@id = '" + setId + "' return $x";
|
||||
|
||||
try {
|
||||
final String basePath = getBasePathHDFS(isLookup);
|
||||
final String setProfile = isLookup.getResourceProfileByQuery(q);
|
||||
return getActionManagerSet(basePath, setProfile);
|
||||
} catch (ISLookUpException | ActionManagerException e) {
|
||||
throw new RuntimeException("Error accessing Sets, using query: " + q);
|
||||
}
|
||||
}
|
||||
|
||||
private ActionManagerSet getActionManagerSet(final String basePath, final String profile) throws ActionManagerException {
|
||||
final SAXReader reader = new SAXReader();
|
||||
final ActionManagerSet set = new ActionManagerSet();
|
||||
|
||||
try {
|
||||
final Document doc = reader.read(new StringReader(profile));
|
||||
|
||||
set.setId(doc.valueOf("//SET/@id").trim());
|
||||
set.setName(doc.valueOf("//SET").trim());
|
||||
set.setImpact(ImpactTypes.valueOf(doc.valueOf("//IMPACT").trim()));
|
||||
set.setLatest(doc.valueOf("//RAW_SETS/LATEST/@id"), doc.valueOf("//RAW_SETS/LATEST/@creationDate"), doc.valueOf("//RAW_SETS/LATEST/@lastUpdate"));
|
||||
set.setDirectory(doc.valueOf("//SET/@directory"));
|
||||
final List expiredNodes = doc.selectNodes("//RAW_SETS/EXPIRED");
|
||||
if (expiredNodes != null) {
|
||||
for (int i = 0; i < expiredNodes.size(); i++) {
|
||||
Element ex = (Element) expiredNodes.get(i);
|
||||
set.addExpired(ex.attributeValue("id"), ex.attributeValue("creationDate"), ex.attributeValue("lastUpdate"));
|
||||
}
|
||||
}
|
||||
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append(basePath);
|
||||
sb.append("/");
|
||||
sb.append(doc.valueOf("//SET/@directory"));
|
||||
sb.append("/");
|
||||
sb.append(doc.valueOf("//RAW_SETS/LATEST/@id"));
|
||||
set.setPathToLatest(sb.toString());
|
||||
|
||||
return set;
|
||||
} catch (Exception e) {
|
||||
throw new ActionManagerException("Error creating set from profile: " + profile, e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getBasePathHDFS(ISLookUpService isLookup) throws ActionManagerException {
|
||||
return queryServiceProperty(isLookup, "basePath");
|
||||
}
|
||||
|
||||
private String queryServiceProperty(ISLookUpService isLookup, final String propertyName) throws ActionManagerException {
|
||||
final String q = "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ActionManagerServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
|
||||
+ propertyName + "']/@value/string()";
|
||||
log.debug("quering for service property: " + q);
|
||||
try {
|
||||
final List<String> value = isLookup.quickSearchProfile(q);
|
||||
return Iterables.getOnlyElement(value);
|
||||
} catch (ISLookUpException e) {
|
||||
String msg = "Error accessing service profile, using query: " + q;
|
||||
log.error(msg, e);
|
||||
throw new ActionManagerException(msg, e);
|
||||
} catch (NoSuchElementException e) {
|
||||
String msg = "missing service property: " + propertyName;
|
||||
log.error(msg, e);
|
||||
throw new ActionManagerException(msg, e);
|
||||
} catch (IllegalArgumentException e) {
|
||||
String msg = "found more than one service property: " + propertyName;
|
||||
log.error(msg, e);
|
||||
throw new ActionManagerException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
package eu.dnetlib.dhp.actionmanager.partition;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.ISClient;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
@ -40,8 +41,6 @@ public class PartitionActionSetsByPayloadTypeJob {
|
|||
StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())
|
||||
));
|
||||
|
||||
private static final String INPUT_ACTION_SET_PATHS_SEPARATOR = ",";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String jsonConfiguration = IOUtils.toString(
|
||||
PromoteActionPayloadForGraphTableJob.class
|
||||
|
@ -55,21 +54,25 @@ public class PartitionActionSetsByPayloadTypeJob {
|
|||
.orElse(Boolean.TRUE);
|
||||
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String inputActionSetPaths = parser.get("inputActionSetPaths");
|
||||
logger.info("inputActionSetPaths: {}", inputActionSetPaths);
|
||||
String inputActionSetIds = parser.get("inputActionSetIds");
|
||||
logger.info("inputActionSetIds: {}", inputActionSetIds);
|
||||
|
||||
String outputPath = parser.get("outputPath");
|
||||
logger.info("outputPath: {}", outputPath);
|
||||
|
||||
String isLookupUrl = parser.get("isLookupUrl");
|
||||
logger.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
List<String> inputActionSetPaths = ISClient.getLatestRawsetPaths(isLookupUrl, inputActionSetIds);
|
||||
logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths));
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
readAndWriteActionSetsFromPaths(spark,
|
||||
Arrays.asList(inputActionSetPaths.split(INPUT_ACTION_SET_PATHS_SEPARATOR)),
|
||||
outputPath);
|
||||
readAndWriteActionSetsFromPaths(spark, inputActionSetPaths, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -121,8 +121,8 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
logger.info("Reading graph table from path: {}", path);
|
||||
return spark
|
||||
.read()
|
||||
.textFile(path)
|
||||
.map((MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz));
|
||||
.parquet(path)
|
||||
.as(Encoders.bean(rowClazz));
|
||||
}
|
||||
|
||||
private static <A extends Oaf> Dataset<A> readActionPayload(SparkSession spark,
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "iasp",
|
||||
"paramLongName": "inputActionSetPaths",
|
||||
"paramDescription": "comma separated list of action sets to partition by payload type",
|
||||
"paramName": "iasi",
|
||||
"paramLongName": "inputActionSetIds",
|
||||
"paramDescription": "comma separated list of action set ids to partition by payload type",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
@ -16,5 +16,11 @@
|
|||
"paramLongName": "outputPath",
|
||||
"paramDescription": "root output location for partitioned action sets",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "is",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -41,8 +41,12 @@
|
|||
<description>root location of input materialized graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>inputActionSetPaths</name>
|
||||
<description>comma separated list of action sets to promote</description>
|
||||
<name>isLookupUrl</name>
|
||||
<description>URL of the ISLookupService</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>inputActionSetIds</name>
|
||||
<description>comma separated list of action set ids to promote</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputGraphRootPath</name>
|
||||
|
@ -121,8 +125,9 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--inputActionSetPaths</arg><arg>${inputActionSetPaths}</arg>
|
||||
<arg>--inputActionSetIds</arg><arg>${inputActionSetIds}</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/action_payload_by_type</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
</spark>
|
||||
<ok to="ForkPromote"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<name>graphOutputPath</name>
|
||||
<description>the target path to store raw graph</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -343,7 +343,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -369,7 +369,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -395,7 +395,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -421,7 +421,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -447,7 +447,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -473,7 +473,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -499,7 +499,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
@ -525,7 +525,7 @@
|
|||
</spark-opts>
|
||||
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphBasePath}/graph_raw</arg>
|
||||
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
||||
</spark>
|
||||
<ok to="wait_merge"/>
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -293,6 +293,12 @@
|
|||
<artifactId>dnet-actionmanager-common</artifactId>
|
||||
<version>6.0.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-api</artifactId>
|
||||
<version>[4.0.1,5.0.0)</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-openaire-data-protos</artifactId>
|
||||
|
|
Loading…
Reference in New Issue