Merge branch 'beta' into update_pivots_table

This commit is contained in:
Claudio Atzori 2024-01-22 16:37:22 +01:00
commit f76852f385
30 changed files with 696 additions and 160 deletions

View File

@ -0,0 +1,39 @@
package eu.dnetlib.dhp.common.api.context;
public class CategorySummary {
private String id;
private String label;
private boolean hasConcept;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public boolean isHasConcept() {
return hasConcept;
}
public CategorySummary setId(final String id) {
this.id = id;
return this;
}
public CategorySummary setLabel(final String label) {
this.label = label;
return this;
}
public CategorySummary setHasConcept(final boolean hasConcept) {
this.hasConcept = hasConcept;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class CategorySummaryList extends ArrayList<CategorySummary> {
}

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.List;
public class ConceptSummary {
private String id;
private String label;
public boolean hasSubConcept;
private List<ConceptSummary> concepts;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public List<ConceptSummary> getConcepts() {
return concepts;
}
public ConceptSummary setId(final String id) {
this.id = id;
return this;
}
public ConceptSummary setLabel(final String label) {
this.label = label;
return this;
}
public boolean isHasSubConcept() {
return hasSubConcept;
}
public ConceptSummary setHasSubConcept(final boolean hasSubConcept) {
this.hasSubConcept = hasSubConcept;
return this;
}
public ConceptSummary setConcept(final List<ConceptSummary> concepts) {
this.concepts = concepts;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class ConceptSummaryList extends ArrayList<ConceptSummary> {
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.common.api.context;
public class ContextSummary {
private String id;
private String label;
private String type;
private String status;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public String getType() {
return type;
}
public String getStatus() {
return status;
}
public ContextSummary setId(final String id) {
this.id = id;
return this;
}
public ContextSummary setLabel(final String label) {
this.label = label;
return this;
}
public ContextSummary setType(final String type) {
this.type = type;
return this;
}
public ContextSummary setStatus(final String status) {
this.status = status;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class ContextSummaryList extends ArrayList<ContextSummary> {
}

View File

@ -124,8 +124,19 @@ public class PrepareFOSSparkJob implements Serializable {
FOSDataModel first) { FOSDataModel first) {
level1.add(first.getLevel1()); level1.add(first.getLevel1());
level2.add(first.getLevel2()); level2.add(first.getLevel2());
level3.add(first.getLevel3() + "@@" + first.getScoreL3()); if (Optional.ofNullable(first.getLevel3()).isPresent() &&
level4.add(first.getLevel4() + "@@" + first.getScoreL4()); !first.getLevel3().equalsIgnoreCase(NA) && !first.getLevel3().equalsIgnoreCase(NULL)
&& first.getLevel3() != null)
level3.add(first.getLevel3() + "@@" + first.getScoreL3());
else
level3.add(NULL);
if (Optional.ofNullable(first.getLevel4()).isPresent() &&
!first.getLevel4().equalsIgnoreCase(NA) &&
!first.getLevel4().equalsIgnoreCase(NULL) &&
first.getLevel4() != null)
level4.add(first.getLevel4() + "@@" + first.getScoreL4());
else
level4.add(NULL);
} }
} }

View File

@ -0,0 +1,84 @@
package eu.dnetlib.dhp;
import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
import static eu.dnetlib.dhp.PropagationConstant.readPath;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
/**
* @author miriam.baglioni
* @Date 15/01/24
*/
public class MoveResult implements Serializable {
private static final Logger log = LoggerFactory.getLogger(MoveResult.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
MoveResult.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
moveResults(spark, inputPath, outputPath);
});
}
public static <R extends Result> void moveResults(SparkSession spark, String inputPath, String outputPath) {
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(e -> ModelSupport.isResult(e))
// .parallelStream()
.forEach(e -> {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> resultDataset = readPath(spark, inputPath + e.name(), resultClazz);
if (resultDataset.count() > 0) {
resultDataset
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
}
});
}
}

View File

@ -97,12 +97,6 @@ public class SparkCountryPropagationJob {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath); .json(outputPath);
readPath(spark, outputPath, resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(sourcePath);
} }
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() { private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {

View File

@ -64,7 +64,7 @@ public class SparkResultToProjectThroughSemRelJob {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} }
execPropagation( execPropagation(
spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph); spark, outputPath, alreadyLinkedPath, potentialUpdatePath);
}); });
} }
@ -72,24 +72,23 @@ public class SparkResultToProjectThroughSemRelJob {
SparkSession spark, SparkSession spark,
String outputPath, String outputPath,
String alreadyLinkedPath, String alreadyLinkedPath,
String potentialUpdatePath, String potentialUpdatePath) {
Boolean saveGraph) {
Dataset<ResultProjectSet> toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class); Dataset<ResultProjectSet> toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class);
Dataset<ResultProjectSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class); Dataset<ResultProjectSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class);
if (saveGraph) { // if (saveGraph) {
toaddrelations toaddrelations
.joinWith( .joinWith(
alreadyLinked, alreadyLinked,
toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")), toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer") "left_outer")
.flatMap(mapRelationRn(), Encoders.bean(Relation.class)) .flatMap(mapRelationRn(), Encoders.bean(Relation.class))
.write() .write()
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
} // }
} }
private static FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation> mapRelationRn() { private static FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation> mapRelationRn() {

View File

@ -76,29 +76,41 @@ public class SparkResultToCommunityFromOrganizationJob {
ModelSupport.entityTypes ModelSupport.entityTypes
.keySet() .keySet()
.parallelStream() .parallelStream()
.filter(e -> ModelSupport.isResult(e))
// .parallelStream()
.forEach(e -> { .forEach(e -> {
if (ModelSupport.isResult(e)) { // if () {
Class<R> resultClazz = ModelSupport.entityTypes.get(e); Class<R> resultClazz = ModelSupport.entityTypes.get(e);
removeOutputDir(spark, outputPath + e.name()); removeOutputDir(spark, outputPath + e.name());
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz); Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result log.info("executing left join");
.joinWith( result
possibleUpdates, .joinWith(
result.col("id").equalTo(possibleUpdates.col("resultId")), possibleUpdates,
"left_outer") result.col("id").equalTo(possibleUpdates.col("resultId")),
.map(resultCommunityFn(), Encoders.bean(resultClazz)) "left_outer")
.write() .map(resultCommunityFn(), Encoders.bean(resultClazz))
.mode(SaveMode.Overwrite) .write()
.option("compression", "gzip") .mode(SaveMode.Overwrite)
.json(outputPath + e.name()); .option("compression", "gzip")
.json(outputPath + e.name());
readPath(spark, outputPath + e.name(), resultClazz) // log
.write() // .info(
.mode(SaveMode.Overwrite) // "reading results from " + outputPath + e.name() + " and copying them to " + inputPath
.option("compression", "gzip") // + e.name());
.json(inputPath + e.name()); // Dataset<R> tmp = readPath(spark, outputPath + e.name(), resultClazz);
} // if (tmp.count() > 0){
//
// tmp
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(inputPath + e.name());
// }
// }
}); });
} }
@ -115,11 +127,11 @@ public class SparkResultToCommunityFromOrganizationJob {
.map(Context::getId) .map(Context::getId)
.collect(Collectors.toList()); .collect(Collectors.toList());
@SuppressWarnings("unchecked") // @SuppressWarnings("unchecked")
R res = (R) ret.getClass().newInstance(); // R res = (R) ret.getClass().newInstance();
res.setId(ret.getId()); // res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>(); // List<Context> propagatedContexts = new ArrayList<>();
for (String cId : communitySet) { for (String cId : communitySet) {
if (!contextList.contains(cId)) { if (!contextList.contains(cId)) {
Context newContext = new Context(); Context newContext = new Context();
@ -133,11 +145,11 @@ public class SparkResultToCommunityFromOrganizationJob {
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME, PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS))); ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext); ret.getContext().add(newContext);
} }
} }
res.setContext(propagatedContexts); // res.setContext(propagatedContexts);
ret.mergeFrom(res); // ret.mergeFrom(res);
} }
return ret; return ret;
}; };

View File

@ -86,29 +86,30 @@ public class SparkResultToCommunityFromProject implements Serializable {
ModelSupport.entityTypes ModelSupport.entityTypes
.keySet() .keySet()
.parallelStream() .parallelStream()
.filter(e -> ModelSupport.isResult(e))
.forEach(e -> { .forEach(e -> {
if (ModelSupport.isResult(e)) { // if () {
removeOutputDir(spark, outputPath + e.name()); removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e); Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz); Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result result
.joinWith( .joinWith(
possibleUpdates, possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")), result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer") "left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz)) .map(resultCommunityFn(), Encoders.bean(resultClazz))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + e.name()); .json(outputPath + e.name());
readPath(spark, outputPath + e.name(), resultClazz) // readPath(spark, outputPath + e.name(), resultClazz)
.write() // .write()
.mode(SaveMode.Overwrite) // .mode(SaveMode.Overwrite)
.option("compression", "gzip") // .option("compression", "gzip")
.json(inputPath + e.name()); // .json(inputPath + e.name());
} // }
}); });
} }

View File

@ -101,11 +101,6 @@ public class SparkResultToCommunityThroughSemRelJob {
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
readPath(spark, outputPath, resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath);
} }
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() { private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {
@ -115,11 +110,11 @@ public class SparkResultToCommunityThroughSemRelJob {
if (rcl.isPresent()) { if (rcl.isPresent()) {
Set<String> contexts = new HashSet<>(); Set<String> contexts = new HashSet<>();
ret.getContext().forEach(c -> contexts.add(c.getId())); ret.getContext().forEach(c -> contexts.add(c.getId()));
List<Context> contextList = rcl rcl
.get() .get()
.getCommunityList() .getCommunityList()
.stream() .stream()
.map( .forEach(
c -> { c -> {
if (!contexts.contains(c)) { if (!contexts.contains(c)) {
Context newContext = new Context(); Context newContext = new Context();
@ -133,19 +128,11 @@ public class SparkResultToCommunityThroughSemRelJob {
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS))); ModelConstants.DNET_PROVENANCE_ACTIONS)));
return newContext; ret.getContext().add(newContext);
} }
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
@SuppressWarnings("unchecked") });
R r = (R) ret.getClass().newInstance();
r.setId(ret.getId());
r.setContext(contextList);
ret.mergeFrom(r);
} }
return ret; return ret;

View File

@ -1,12 +1,12 @@
sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched
resumeFrom=CountryPropagation resumeFrom=ResultProject
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48 datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48
#allowedtypes=pubsrepository::institutional #allowedtypes=pubsrepository::institutional
allowedtypes=Institutional allowedtypes=Institutional
outputPath=/tmp/miriam/enrichment_one_step outputPath=/tmp/miriam/graph/11_graph_orcid
pathMap ={"author":"$['author'][*]['fullname']", \ pathMap ={"author":"$['author'][*]['fullname']", \
"title":"$['title'][*]['value']",\ "title":"$['title'][*]['value']",\
"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\ "orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\

View File

@ -45,10 +45,18 @@
</property> </property>
<property> <property>
<name>sparkExecutorMemory</name> <name>sparkExecutorMemory</name>
<value>6G</value> <value>5G</value>
</property> </property>
<property> <property>
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<value>1</value> <value>4</value>
</property>
<property>
<name>memoryOverhead</name>
<value>3G</value>
</property>
<property>
<name>partitions</name>
<value>3284</value>
</property> </property>
</configuration> </configuration>

View File

@ -12,6 +12,10 @@
<name>baseURL</name> <name>baseURL</name>
<description>The URL to access the community APIs</description> <description>The URL to access the community APIs</description>
</property> </property>
<property>
<name>startFrom></name>
<value>undelete</value>
</property>
</parameters> </parameters>
@ -26,12 +30,20 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="startFrom"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<decision name="startFrom">
<switch>
<case to="exec_bulktag">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<action name="reset_outputpath"> <action name="reset_outputpath">
<fs> <fs>
<delete path="${workingDir}"/> <delete path="${workingDir}"/>
@ -45,7 +57,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>bulkTagging-publication</name> <name>bulkTagging</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class> <class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -53,6 +65,8 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${memoryOverhead}
--conf spark.sql.shuffle.partitions=${partitions}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -45,11 +45,11 @@
</property> </property>
<property> <property>
<name>sparkExecutorMemory</name> <name>sparkExecutorMemory</name>
<value>6G</value> <value>5G</value>
</property> </property>
<property> <property>
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<value>1</value> <value>4</value>
</property> </property>
<property> <property>
<name>spark2MaxExecutors</name> <name>spark2MaxExecutors</name>

View File

@ -12,6 +12,10 @@
<name>allowedtypes</name> <name>allowedtypes</name>
<description>the allowed types</description> <description>the allowed types</description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<global> <global>
@ -25,7 +29,15 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="resumeFrom"/>
<decision name="resumeFrom">
<switch>
<case to="prepare_datasource_country_association">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -61,7 +73,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg> <arg>--whitelist</arg><arg>${whitelist}</arg>
<arg>--allowedtypes</arg><arg>${allowedtypes}</arg> <arg>--allowedtypes</arg><arg>${allowedtypes}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--outputPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark> </spark>
<ok to="fork_prepare_result_country"/> <ok to="fork_prepare_result_country"/>
<error to="Kill"/> <error to="Kill"/>
@ -95,10 +107,10 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingP</arg> <arg>--workingPath</arg><arg>${workingDir}/country/workingP</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark> </spark>
<ok to="wait_prepare"/> <ok to="wait_prepare"/>
<error to="Kill"/> <error to="Kill"/>
@ -125,10 +137,10 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg> <arg>--outputPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingD</arg> <arg>--workingPath</arg><arg>${workingDir}/country/workingD</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark> </spark>
<ok to="wait_prepare"/> <ok to="wait_prepare"/>
<error to="Kill"/> <error to="Kill"/>
@ -155,10 +167,10 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingO</arg> <arg>--workingPath</arg><arg>${workingDir}/country/workingO</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark> </spark>
<ok to="wait_prepare"/> <ok to="wait_prepare"/>
<error to="Kill"/> <error to="Kill"/>
@ -185,10 +197,10 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg> <arg>--outputPath</arg><arg>${workingDir}/country/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingS</arg> <arg>--workingPath</arg><arg>${workingDir}/country/workingS</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark> </spark>
<ok to="wait_prepare"/> <ok to="wait_prepare"/>
<error to="Kill"/> <error to="Kill"/>
@ -221,12 +233,12 @@
--conf spark.speculation=false --conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/publication</arg> <arg>--outputPath</arg><arg>${workingDir}/country/country/publication</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
@ -253,9 +265,9 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/dataset</arg> <arg>--outputPath</arg><arg>${workingDir}/country/country/dataset</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
@ -282,9 +294,9 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${workingDir}/country/country/otherresearchproduct</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
@ -311,15 +323,49 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/country/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/software</arg> <arg>--outputPath</arg><arg>${workingDir}/country/country/software</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait" to="reset_workingDir"/>
<join name="wait" to="move-results"/>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization - move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/country/country/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>&#45;&#45;outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="deleteWD"/>
<error to="Kill"/>
</action>
<decision name="deleteWD">
<switch>
<case to="End">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_workingDir"/>
</switch>
</decision>
<action name="reset_workingDir"> <action name="reset_workingDir">
<fs> <fs>
<delete path="${workingDir}"/> <delete path="${workingDir}"/>

View File

@ -4,7 +4,10 @@
<name>sourcePath</name> <name>sourcePath</name>
<description>the source path</description> <description>the source path</description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<global> <global>
@ -18,7 +21,15 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_info">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

View File

@ -0,0 +1,22 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -114,7 +114,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg> <arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg> <arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
@ -142,7 +142,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg> <arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg> <arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
@ -170,7 +170,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg> <arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg> <arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
@ -198,7 +198,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg> <arg>--outputPath</arg><arg>${workingDir}/orcid/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg> <arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark> </spark>
<ok to="wait"/> <ok to="wait"/>
@ -225,8 +225,8 @@
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/orcidprop</arg> <arg>--sourcePath</arg><arg>${workingDir}/orcid/orcidprop</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg> <arg>--outputPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
</spark> </spark>
<ok to="fork-join-exec-propagation"/> <ok to="fork-join-exec-propagation"/>
<error to="Kill"/> <error to="Kill"/>
@ -261,7 +261,7 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg> <arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg> <arg>--outputPath</arg><arg>${outputPath}/publication</arg>
@ -291,7 +291,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts> </spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg> <arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg> <arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
@ -321,7 +321,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts> </spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg> <arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
@ -351,7 +351,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false --conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false --conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts> </spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg> <arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/orcidprop/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg> <arg>--outputPath</arg><arg>${outputPath}/software</arg>

View File

@ -8,7 +8,10 @@
<name>allowedsemrels</name> <name>allowedsemrels</name>
<description>the allowed semantics </description> <description>the allowed semantics </description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<global> <global>
@ -22,7 +25,14 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_project_results_association">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -86,17 +96,9 @@
<arg>--potentialUpdatePath</arg><arg>${workingDir}/resultproject/preparedInfo/potentialUpdates</arg> <arg>--potentialUpdatePath</arg><arg>${workingDir}/resultproject/preparedInfo/potentialUpdates</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/resultproject/preparedInfo/alreadyLinked</arg> <arg>--alreadyLinkedPath</arg><arg>${workingDir}/resultproject/preparedInfo/alreadyLinked</arg>
</spark> </spark>
<ok to="reset_workingDir"/>
<error to="Kill"/>
</action>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/>
<end name="End"/>
</workflow-app> </workflow-app>

View File

@ -8,6 +8,10 @@
<name>baseURL</name> <name>baseURL</name>
<description>the baseURL from where to reach the community APIs</description> <description>the baseURL from where to reach the community APIs</description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<global> <global>
@ -21,7 +25,15 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_result_communitylist">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -69,7 +81,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>community2resultfromorganization-Publication</name> <name>community2resultfromorganization</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class> <class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -88,6 +100,33 @@
<arg>--sourcePath</arg><arg>${sourcePath}/</arg> <arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg> <arg>--outputPath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
</spark> </spark>
<ok to="move-results"/>
<error to="Kill"/>
</action>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization - move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>&#45;&#45;outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -8,6 +8,10 @@
<name>baseURL</name> <name>baseURL</name>
<description>the base URL to use to select the right community APIs</description> <description>the base URL to use to select the right community APIs</description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<global> <global>
@ -21,7 +25,15 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_result_communitylist">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -86,12 +98,37 @@
<arg>--sourcePath</arg><arg>${sourcePath}/</arg> <arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${workingDir}/communitythroughproject/</arg> <arg>--outputPath</arg><arg>${workingDir}/communitythroughproject/</arg>
</spark> </spark>
<ok to="move-results"/>
<error to="Kill"/>
</action>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/communitythroughproject/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -16,9 +16,21 @@
<name>outputPath</name> <name>outputPath</name>
<description>the output path</description> <description>the output path</description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<start to="reset_outputpath"/> <start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="fork_prepare_assoc_step1">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -209,9 +221,9 @@
<jar>dhp-enrichment-${projectVersion}.jar</jar> <jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=6 --executor-cores=6
--executor-memory=5G --executor-memory=4G
--conf spark.executor.memoryOverhead=3g --conf spark.executor.memoryOverhead=5G
--conf spark.sql.shuffle.partitions=3284 --conf spark.sql.shuffle.partitions=15000
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -324,7 +336,34 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait2" to="End"/> <join name="wait2" to="move-results"/>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/communitysemrel/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>

View File

@ -8,6 +8,10 @@
<name>blacklist</name> <name>blacklist</name>
<description>The list of institutional repositories that should not be used for the propagation</description> <description>The list of institutional repositories that should not be used for the propagation</description>
</property> </property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters> </parameters>
<global> <global>
@ -21,7 +25,15 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_result_organization_association">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

View File

@ -43,6 +43,17 @@
<arg>--graphPath</arg><arg>${graphPath}</arg> <arg>--graphPath</arg><arg>${graphPath}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
</spark> </spark>
<ok to="reset_outputpath"/>
<error to="Kill"/>
</action>
<action name="reset_outputpath">
<fs>
<delete path="${graphPath}/datasource"/>
<delete path="${graphPath}/organization"/>
<delete path="${graphPath}/project"/>
<delete path="${graphPath}/relation"/>
</fs>
<ok to="copy_datasource"/> <ok to="copy_datasource"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -62,8 +62,8 @@ public class XmlConverterJob {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String isLookupUrl = parser.get("isLookupUrl"); final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
log.info("isLookupUrl: {}", isLookupUrl); log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -71,7 +71,7 @@ public class XmlConverterJob {
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl)); convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl));
}); });
} }

View File

@ -1,18 +1,22 @@
package eu.dnetlib.dhp.oa.provision.utils; package eu.dnetlib.dhp.oa.provision.utils;
import java.io.Serializable; import java.io.*;
import java.io.StringReader; import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.jetbrains.annotations.NotNull;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import eu.dnetlib.dhp.common.api.context.*;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -23,6 +27,42 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>"; private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
public static ContextMapper fromAPI(final String baseURL) throws Exception {
final ContextMapper contextMapper = new ContextMapper();
for (ContextSummary ctx : DNetRestClient.doGET(baseURL + "/contexts", ContextSummaryList.class)) {
contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType()));
for (CategorySummary cat : DNetRestClient
.doGET(baseURL + "/context/" + ctx.getId(), CategorySummaryList.class)) {
contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", ""));
if (cat.isHasConcept()) {
for (ConceptSummary c : DNetRestClient
.doGET(baseURL + "/context/category/" + cat.getId(), ConceptSummaryList.class)) {
contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", ""));
if (c.isHasSubConcept()) {
for (ConceptSummary cs : c.getConcepts()) {
contextMapper.put(cs.getId(), new ContextDef(cs.getId(), cs.getLabel(), "concept", ""));
if (cs.isHasSubConcept()) {
for (ConceptSummary css : cs.getConcepts()) {
contextMapper
.put(
css.getId(),
new ContextDef(css.getId(), css.getLabel(), "concept", ""));
}
}
}
}
}
}
}
}
return contextMapper;
}
@Deprecated
public static ContextMapper fromIS(final String isLookupUrl) public static ContextMapper fromIS(final String isLookupUrl)
throws DocumentException, ISLookUpException, SAXException { throws DocumentException, ISLookUpException, SAXException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -32,6 +72,7 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
return fromXml(sb.toString()); return fromXml(sb.toString());
} }
@Deprecated
public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException { public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();

View File

@ -9,6 +9,10 @@
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>URL for the isLookup service</description> <description>URL for the isLookup service</description>
</property> </property>
<property>
<name>contextApiBaseUrl</name>
<description>context API URL</description>
</property>
<property> <property>
<name>relPartitions</name> <name>relPartitions</name>
<description>number or partitions for the relations Dataset</description> <description>number or partitions for the relations Dataset</description>
@ -589,7 +593,7 @@
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg> <arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg> <arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
</spark> </spark>
<ok to="should_index"/> <ok to="should_index"/>
<error to="Kill"/> <error to="Kill"/>