DispatchEntitiesSparkJob: manage all entity types together, support filtering by dataInfo.invisible flag

This commit is contained in:
Giambattista Bloisi 2023-08-08 15:52:20 +02:00
parent c25ac21e5e
commit fab9920271
6 changed files with 137 additions and 454 deletions

View File

@ -11,25 +11,18 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.*;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
public class DispatchEntitiesSparkJob { public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
@ -54,11 +47,12 @@ public class DispatchEntitiesSparkJob {
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName"); boolean filterInvisible = Optional
log.info("graphTableClassName: {}", graphTableClassName); .ofNullable(parser.get("filterInvisible"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
@SuppressWarnings("unchecked") log.info("filterInvisible: {}", filterInvisible);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -66,32 +60,43 @@ public class DispatchEntitiesSparkJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, entityClazz, outputPath); dispatchEntities(spark, inputPath, outputPath, filterInvisible);
}); });
} }
private static <T extends Oaf> void dispatchEntities( private static void dispatchEntities(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
Class<T> clazz, String outputPath,
String outputPath) { boolean filterInvisible) {
spark Dataset<String> df = spark.read().textFile(inputPath);
ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
String entityType = entry.getKey();
Class<?> clazz = entry.getValue();
if (!entityType.equalsIgnoreCase("relation")) {
Dataset<Row> entityDF = spark
.read() .read()
.textFile(inputPath) .schema(Encoders.bean(clazz).schema())
.filter((FilterFunction<String>) s -> isEntityType(s, clazz)) .json(
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) df
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
.map( .map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), (MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
Encoders.bean(clazz)) Encoders.STRING()));
if (filterInvisible) {
entityDF = entityDF.filter("dataInfo.invisible != true");
}
entityDF
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath + "/" + entityType);
} }
});
private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
return StringUtils.substringBefore(s, "|").equals(clazz.getName());
} }
} }

View File

@ -18,9 +18,9 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "c", "paramName": "fi",
"paramLongName": "graphTableClassName", "paramLongName": "filterInvisible",
"paramDescription": "the graph entity class name", "paramDescription": "if true filters out invisible entities",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -144,25 +144,15 @@
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark> </spark>
<ok to="fork_dispatch_entities"/> <ok to="dispatch_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_dispatch_entities"> <action name="dispatch_entities">
<path start="dispatch_datasource"/>
<path start="dispatch_project"/>
<path start="dispatch_organization"/>
<path start="dispatch_publication"/>
<path start="dispatch_dataset"/>
<path start="dispatch_software"/>
<path start="dispatch_otherresearchproduct"/>
</fork>
<action name="dispatch_datasource">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Dispatch publications</name> <name>Dispatch grouped entitities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class> <class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar> <jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -176,164 +166,12 @@
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--filterInvisible</arg><arg>true</arg>
</spark> </spark>
<ok to="wait_dispatch"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="dispatch_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch project</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch organization</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch publication</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch dataset</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch software</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<join name="wait_dispatch" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -1,15 +1,25 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import com.fasterxml.jackson.databind.DeserializationFeature; import static java.nio.file.Files.createTempDirectory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets; import static org.apache.spark.sql.functions.col;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static org.apache.spark.sql.functions.count;
import eu.dnetlib.dhp.schema.common.ModelConstants; import static org.junit.jupiter.api.Assertions.*;
import eu.dnetlib.dhp.schema.oaf.*; import static org.mockito.Mockito.lenient;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import java.io.File;
import eu.dnetlib.pace.util.MapDocumentUtil; import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -26,26 +36,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2; import scala.Tuple2;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest implements Serializable { public class SparkDedupTest implements Serializable {
@ -726,7 +729,10 @@ public class SparkDedupTest implements Serializable {
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
// append dangling relations to be cleaned up // append dangling relations to be cleaned up
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testGraphBasePath + "/relation"); Dataset<Row> df_before = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testGraphBasePath + "/relation");
Dataset<Row> df_input = df_before Dataset<Row> df_input = df_before
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
@ -742,7 +748,10 @@ public class SparkDedupTest implements Serializable {
new SparkCleanRelation(parser, spark).run(isLookUpService); new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_input.count()); assertNotEquals(df_before.count(), df_input.count());
assertNotEquals(df_input.count(), df_after.count()); assertNotEquals(df_input.count(), df_after.count());
@ -772,7 +781,10 @@ public class SparkDedupTest implements Serializable {
new SparkCleanRelation(parser, spark).run(isLookUpService); new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_after.count()); assertNotEquals(df_before.count(), df_after.count());
assertEquals(0, df_after.count()); assertEquals(0, df_after.count());

View File

@ -94,25 +94,15 @@
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark> </spark>
<ok to="fork_dispatch_entities"/> <ok to="dispatch_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_dispatch_entities"> <action name="dispatch_entities">
<path start="dispatch_datasource"/>
<path start="dispatch_project"/>
<path start="dispatch_organization"/>
<path start="dispatch_publication"/>
<path start="dispatch_dataset"/>
<path start="dispatch_software"/>
<path start="dispatch_otherresearchproduct"/>
</fork>
<action name="dispatch_datasource">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Dispatch publications</name> <name>Dispatch grouped entities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class> <class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -126,165 +116,13 @@
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--filterInvisible</arg><arg>false</arg>
</spark> </spark>
<ok to="wait_dispatch"/> <ok to="delete_target_relation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="dispatch_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch project</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch organization</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch publication</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch dataset</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch software</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<join name="wait_dispatch" to="delete_target_relation"/>
<action name="delete_target_relation"> <action name="delete_target_relation">
<fs> <fs>
<delete path="${nameNode}/${graphOutputPath}/relation"/> <delete path="${nameNode}/${graphOutputPath}/relation"/>

View File

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import static org.junit.jupiter.api.Assertions.*; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import eu.dnetlib.dhp.common.HdfsSupport;
import java.net.URISyntaxException; import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import java.nio.file.Files; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import java.nio.file.Path; import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.nio.file.Paths; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -19,17 +19,13 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import com.fasterxml.jackson.databind.DeserializationFeature; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URISyntaxException;
import com.google.common.collect.Lists; import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import eu.dnetlib.dhp.common.HdfsSupport; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
@ -101,21 +97,16 @@ public class GroupEntitiesSparkJobTest {
@Test @Test
@Order(2) @Order(2)
void testDispatchEntities() throws Exception { void testDispatchEntities() throws Exception {
for (String type : Lists
.newArrayList(
Publication.class.getCanonicalName(), eu.dnetlib.dhp.schema.oaf.Dataset.class.getCanonicalName())) {
String directory = StringUtils.substringAfterLast(type, ".").toLowerCase();
DispatchEntitiesSparkJob.main(new String[] { DispatchEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged", "-isSparkSessionManaged",
Boolean.FALSE.toString(), Boolean.FALSE.toString(),
"-inputPath", "-inputPath",
groupEntityPath.toString(), groupEntityPath.toString(),
"-outputPath", "-outputPath",
dispatchEntityPath.resolve(directory).toString(), dispatchEntityPath.resolve(".").toString(),
"-graphTableClassName", "-filterInvisible",
type Boolean.TRUE.toString()
}); });
}
Dataset<Result> output = spark Dataset<Result> output = spark
.read() .read()
@ -140,5 +131,4 @@ public class GroupEntitiesSparkJobTest {
.filter((FilterFunction<String>) s -> s.equals("dataset")) .filter((FilterFunction<String>) s -> s.equals("dataset"))
.count()); .count());
} }
} }