[graph cleaning] WIP: refactoring of the cleaning stages

This commit is contained in:
Claudio Atzori 2023-03-16 17:23:36 +01:00
parent 518618f1a9
commit 6d3d18d8b5
5 changed files with 268 additions and 279 deletions

View File

@ -38,6 +38,124 @@ public class GraphCleaningFunctions extends CleaningFunctions {
public static final int TITLE_FILTER_RESIDUAL_LENGTH = 5;
public static <T extends Oaf> T cleanContext(T value, String contextId, String verifyParam) {
if (ModelSupport.isSubClass(value, Result.class)) {
final Result res = (Result) value;
if (res
.getTitle()
.stream()
.filter(
t -> t
.getQualifier()
.getClassid()
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
.noneMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) {
return (T) res;
}
res
.setContext(
res
.getContext()
.stream()
.filter(
c -> !c.getId().split("::")[0]
.equalsIgnoreCase(contextId))
.collect(Collectors.toList()));
return (T) res;
} else {
return value;
}
}
public static <T extends Oaf> T cleanCountry(T value, String[] verifyParam, Set<String> hostedBy,
String collectedfrom, String country) {
if (ModelSupport.isSubClass(value, Result.class)) {
final Result res = (Result) value;
if (res.getInstance().stream().anyMatch(i -> hostedBy.contains(i.getHostedby().getKey())) ||
!res.getCollectedfrom().stream().anyMatch(cf -> cf.getValue().equals(collectedfrom))) {
return (T) res;
}
List<StructuredProperty> ids = getPidsAndAltIds(res).collect(Collectors.toList());
if (ids
.stream()
.anyMatch(
p -> p
.getQualifier()
.getClassid()
.equals(PidType.doi.toString()) && pidInParam(p.getValue(), verifyParam))) {
res
.setCountry(
res
.getCountry()
.stream()
.filter(
c -> toTakeCountry(c, country))
.collect(Collectors.toList()));
}
return (T) res;
} else {
return value;
}
}
private static <T extends Result> Stream<StructuredProperty> getPidsAndAltIds(T r) {
final Stream<StructuredProperty> resultPids = Optional
.ofNullable(r.getPid())
.map(Collection::stream)
.orElse(Stream.empty());
final Stream<StructuredProperty> instancePids = Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.flatMap(
i -> Optional
.ofNullable(i.getPid())
.map(Collection::stream)
.orElse(Stream.empty())))
.orElse(Stream.empty());
final Stream<StructuredProperty> instanceAltIds = Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.flatMap(
i -> Optional
.ofNullable(i.getAlternateIdentifier())
.map(Collection::stream)
.orElse(Stream.empty())))
.orElse(Stream.empty());
return Stream
.concat(
Stream.concat(resultPids, instancePids),
instanceAltIds);
}
private static boolean pidInParam(String value, String[] verifyParam) {
for (String s : verifyParam)
if (value.startsWith(s))
return true;
return false;
}
private static boolean toTakeCountry(Country c, String country) {
// If dataInfo is not set, or dataInfo.inferenceprovenance is not set or not present then it cannot be
// inserted via propagation
if (!Optional.ofNullable(c.getDataInfo()).isPresent())
return true;
if (!Optional.ofNullable(c.getDataInfo().getInferenceprovenance()).isPresent())
return true;
return !(c
.getClassid()
.equalsIgnoreCase(country) &&
c.getDataInfo().getInferenceprovenance().equals("propagation"));
}
public static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
// nothing to clean here

View File

@ -3,7 +3,10 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -17,12 +20,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -61,6 +68,24 @@ public class CleanGraphSparkJob {
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
String contextId = parser.get("contextId");
log.info("contextId: {}", contextId);
String verifyParam = parser.get("verifyParam");
log.info("verifyParam: {}", verifyParam);
String datasourcePath = parser.get("hostedBy");
log.info("datasourcePath: {}", datasourcePath);
String country = parser.get("country");
log.info("country: {}", country);
String[] verifyCountryParam = parser.get("verifyCountryParam").split(";");
log.info("verifyCountryParam: {}", verifyCountryParam);
String collectedfrom = parser.get("collectedfrom");
log.info("collectedfrom: {}", collectedfrom);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -72,7 +97,9 @@ public class CleanGraphSparkJob {
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
cleanGraphTable(
spark, vocs, inputPath, entityClazz, outputPath, contextId, verifyParam, datasourcePath, country,
verifyCountryParam, collectedfrom);
});
}
@ -81,7 +108,15 @@ public class CleanGraphSparkJob {
VocabularyGroup vocs,
String inputPath,
Class<T> clazz,
String outputPath) {
String outputPath, String contextId, String verifyParam, String datasourcePath, String country,
String[] verifyCountryParam, String collectedfrom) {
Set<String> hostedBy = Sets
.newHashSet(
spark
.read()
.textFile(datasourcePath)
.collectAsList());
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
@ -90,6 +125,13 @@ public class CleanGraphSparkJob {
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> GraphCleaningFunctions.cleanup(value, vocs), Encoders.bean(clazz))
.filter((FilterFunction<T>) GraphCleaningFunctions::filter)
.map(
(MapFunction<T, T>) value -> GraphCleaningFunctions.cleanContext(value, contextId, verifyParam),
Encoders.bean(clazz))
.map(
(MapFunction<T, T>) value -> GraphCleaningFunctions
.cleanCountry(value, verifyCountryParam, hostedBy, collectedfrom, country),
Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -83,12 +83,37 @@
</property>
</parameters>
<start to="fork_clean_graph"/>
<start to="select_datasourceId_from_country"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="select_datasourceId_from_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select datasource ID from country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--country</arg><arg>${country}</arg>
</spark>
<ok to="fork_clean_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_clean_graph">
<path start="clean_publication"/>
<path start="clean_dataset"/>
@ -121,6 +146,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -147,6 +178,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -173,6 +210,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -199,6 +242,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -225,6 +274,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -251,6 +306,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -277,6 +338,12 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
@ -303,286 +370,18 @@
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<join name="wait_clean" to="clean_context"/>
<decision name="clean_context">
<switch>
<case to="fork_clean_context">${wf:conf('shouldClean') eq true}</case>
<default to="End"/>
</switch>
</decision>
<fork name="fork_clean_context">
<path start="clean_publication_context"/>
<path start="clean_dataset_context"/>
<path start="clean_otherresearchproduct_context"/>
<path start="clean_software_context"/>
</fork>
<action name="clean_publication_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean publications context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/publication</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean datasets Context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/dataset</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_otherresearchproduct_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean otherresearchproducts context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/otherresearchproduct</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_software_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean softwares context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/software</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<join name="wait_clean_context" to="select_datasourceId_from_country"/>
<action name="select_datasourceId_from_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select datasource ID from country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.GetDatasourceFromCountry</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--country</arg><arg>${country}</arg>
</spark>
<ok to="fork_clean_country"/>
<error to="Kill"/>
</action>
<fork name="fork_clean_country">
<path start="clean_publication_country"/>
<path start="clean_dataset_country"/>
<path start="clean_otherresearchproduct_country"/>
<path start="clean_software_country"/>
</fork>
<action name="clean_publication_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean publication country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/publication</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_country"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean dataset country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/dataset</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_country"/>
<error to="Kill"/>
</action>
<action name="clean_otherresearchproduct_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean otherresearchproduct country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/otherresearchproduct</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_country"/>
<error to="Kill"/>
</action>
<action name="clean_software_country">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean software country</name>
<class>eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--workingDir</arg><arg>${workingDir}/working/software</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
</spark>
<ok to="wait_clean_country"/>
<error to="Kill"/>
</action>
<join name="wait_clean_country" to="should_patch_datasource_ids"/>
<join name="wait_clean" to="should_patch_datasource_ids"/>
<decision name="should_patch_datasource_ids">
<switch>

View File

@ -28,5 +28,35 @@
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
},
{
"paramName": "ci",
"paramLongName": "contextId",
"paramDescription": "the id of the context to be removed",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "country",
"paramDescription": "the id of the context to be removed",
"paramRequired": true
},
{
"paramName": "vfc",
"paramLongName": "verifyCountryParam",
"paramDescription": "the parameter to be verified to remove the country",
"paramRequired": true
},
{
"paramName": "cf",
"paramLongName": "collectedfrom",
"paramDescription": "the collectedfrom value for which we should apply the cleaning",
"paramRequired": true
},
{
"paramName": "hb",
"paramLongName": "hostedBy",
"paramDescription": "the set of datasources having the specified country in the graph searched for in the hostedby of the results",
"paramRequired": true
}
]