Merge pull request 'DispatchEntitiesSparkJob: manage all entity types together, support filtering by dataInfo.invisible flag' (#329) from dispatch_filter_invisible_entities into beta

Reviewed-on: D-Net/dnet-hadoop#329
This commit is contained in:
Miriam Baglioni 2023-08-10 12:56:18 +02:00
commit 35b8deb2c6
6 changed files with 142 additions and 456 deletions

View File

@ -11,25 +11,18 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.*;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
public class DispatchEntitiesSparkJob { public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
@ -54,11 +47,8 @@ public class DispatchEntitiesSparkJob {
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName"); boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
log.info("graphTableClassName: {}", graphTableClassName); log.info("filterInvisible: {}", filterInvisible);
@SuppressWarnings("unchecked")
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -66,32 +56,43 @@ public class DispatchEntitiesSparkJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, entityClazz, outputPath); dispatchEntities(spark, inputPath, outputPath, filterInvisible);
}); });
} }
private static <T extends Oaf> void dispatchEntities( private static void dispatchEntities(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
Class<T> clazz, String outputPath,
String outputPath) { boolean filterInvisible) {
spark Dataset<String> df = spark.read().textFile(inputPath);
.read()
.textFile(inputPath) ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
.filter((FilterFunction<String>) s -> isEntityType(s, clazz)) String entityType = entry.getKey();
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) Class<?> clazz = entry.getValue();
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), if (!entityType.equalsIgnoreCase("relation")) {
Encoders.bean(clazz)) Dataset<Row> entityDF = spark
.write() .read()
.mode(SaveMode.Overwrite) .schema(Encoders.bean(clazz).schema())
.option("compression", "gzip") .json(
.json(outputPath); df
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
.map(
(MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
Encoders.STRING()));
if (filterInvisible) {
entityDF = entityDF.filter("dataInfo.invisible != true");
}
entityDF
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + entityType);
}
});
} }
private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
return StringUtils.substringBefore(s, "|").equals(clazz.getName());
}
} }

View File

@ -18,9 +18,9 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "c", "paramName": "fi",
"paramLongName": "graphTableClassName", "paramLongName": "filterInvisible",
"paramDescription": "the graph entity class name", "paramDescription": "if true filters out invisible entities",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -12,6 +12,10 @@
<name>graphOutputPath</name> <name>graphOutputPath</name>
<description>path of the output graph</description> <description>path of the output graph</description>
</property> </property>
<property>
<name>filterInvisible</name>
<description>whether filter out invisible entities after merge</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -24,7 +28,6 @@
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<description>number of cores used by single executor</description> <description>number of cores used by single executor</description>
</property> </property>
<property> <property>
<name>oozieActionShareLibForSpark2</name> <name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description> <description>oozie action sharelib for spark 2.*</description>
@ -144,25 +147,15 @@
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark> </spark>
<ok to="fork_dispatch_entities"/> <ok to="dispatch_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_dispatch_entities"> <action name="dispatch_entities">
<path start="dispatch_datasource"/>
<path start="dispatch_project"/>
<path start="dispatch_organization"/>
<path start="dispatch_publication"/>
<path start="dispatch_dataset"/>
<path start="dispatch_software"/>
<path start="dispatch_otherresearchproduct"/>
</fork>
<action name="dispatch_datasource">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Dispatch publications</name> <name>Dispatch grouped entitities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class> <class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar> <jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -176,164 +169,12 @@
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>
<ok to="wait_dispatch"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="dispatch_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch project</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch organization</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch publication</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch dataset</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch software</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<join name="wait_dispatch" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -1,15 +1,25 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import com.fasterxml.jackson.databind.DeserializationFeature; import static java.nio.file.Files.createTempDirectory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets; import static org.apache.spark.sql.functions.col;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static org.apache.spark.sql.functions.count;
import eu.dnetlib.dhp.schema.common.ModelConstants; import static org.junit.jupiter.api.Assertions.*;
import eu.dnetlib.dhp.schema.oaf.*; import static org.mockito.Mockito.lenient;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import java.io.File;
import eu.dnetlib.pace.util.MapDocumentUtil; import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -26,26 +36,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2; import scala.Tuple2;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest implements Serializable { public class SparkDedupTest implements Serializable {
@ -723,26 +726,32 @@ public class SparkDedupTest implements Serializable {
@Order(8) @Order(8)
void testCleanBaseRelations() throws Exception { void testCleanBaseRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
// append dangling relations to be cleaned up // append dangling relations to be cleaned up
Dataset<Row> df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testGraphBasePath + "/relation"); Dataset<Row> df_before = spark
Dataset<Row> df_input =df_before .read()
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) .schema(Encoders.bean(Relation.class).schema())
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); .json(testGraphBasePath + "/relation");
Dataset<Row> df_input = df_before
.unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a")))
.unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a")));
df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp"); df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp");
parser parser
.parseArgument( .parseArgument(
new String[]{ new String[] {
"--graphBasePath", testGraphBasePath, "--graphBasePath", testGraphBasePath,
"--inputPath", testGraphBasePath + "/relation", "--inputPath", testGraphBasePath + "/relation",
"--outputPath", testDedupGraphBasePath + "/relation" "--outputPath", testDedupGraphBasePath + "/relation"
}); });
new SparkCleanRelation(parser, spark).run(isLookUpService); new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_input.count()); assertNotEquals(df_before.count(), df_input.count());
assertNotEquals(df_input.count(), df_after.count()); assertNotEquals(df_input.count(), df_after.count());
@ -753,7 +762,7 @@ public class SparkDedupTest implements Serializable {
@Order(9) @Order(9)
void testCleanDedupedRelations() throws Exception { void testCleanDedupedRelations() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( ArgumentApplicationParser parser = new ArgumentApplicationParser(
classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json"));
String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation"; String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation";
@ -763,16 +772,19 @@ public class SparkDedupTest implements Serializable {
df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false); df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false);
parser parser
.parseArgument( .parseArgument(
new String[]{ new String[] {
"--graphBasePath", testGraphBasePath, "--graphBasePath", testGraphBasePath,
"--inputPath", inputRelPath, "--inputPath", inputRelPath,
"--outputPath", testDedupGraphBasePath + "/relation" "--outputPath", testDedupGraphBasePath + "/relation"
}); });
new SparkCleanRelation(parser, spark).run(isLookUpService); new SparkCleanRelation(parser, spark).run(isLookUpService);
Dataset<Row> df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); Dataset<Row> df_after = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(testDedupGraphBasePath + "/relation");
assertNotEquals(df_before.count(), df_after.count()); assertNotEquals(df_before.count(), df_after.count());
assertEquals(0, df_after.count()); assertEquals(0, df_after.count());

View File

@ -12,6 +12,10 @@
<name>graphOutputPath</name> <name>graphOutputPath</name>
<description>path of the output graph</description> <description>path of the output graph</description>
</property> </property>
<property>
<name>filterInvisible</name>
<description>whether filter out invisible entities after merge</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -94,25 +98,15 @@
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg> <arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark> </spark>
<ok to="fork_dispatch_entities"/> <ok to="dispatch_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_dispatch_entities"> <action name="dispatch_entities">
<path start="dispatch_datasource"/>
<path start="dispatch_project"/>
<path start="dispatch_organization"/>
<path start="dispatch_publication"/>
<path start="dispatch_dataset"/>
<path start="dispatch_software"/>
<path start="dispatch_otherresearchproduct"/>
</fork>
<action name="dispatch_datasource">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Dispatch publications</name> <name>Dispatch grouped entities</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class> <class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -126,165 +120,13 @@
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg> <arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg> <arg>--outputPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg> <arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
</spark> </spark>
<ok to="wait_dispatch"/> <ok to="delete_target_relation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="dispatch_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch project</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch organization</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch publication</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch dataset</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch software</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="dispatch_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<join name="wait_dispatch" to="delete_target_relation"/>
<action name="delete_target_relation"> <action name="delete_target_relation">
<fs> <fs>
<delete path="${nameNode}/${graphOutputPath}/relation"/> <delete path="${nameNode}/${graphOutputPath}/relation"/>

View File

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import static org.junit.jupiter.api.Assertions.*; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import eu.dnetlib.dhp.common.HdfsSupport;
import java.net.URISyntaxException; import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import java.nio.file.Files; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import java.nio.file.Path; import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.nio.file.Paths; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -19,17 +19,13 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import com.fasterxml.jackson.databind.DeserializationFeature; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URISyntaxException;
import com.google.common.collect.Lists; import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import eu.dnetlib.dhp.common.HdfsSupport; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
@ -101,21 +97,16 @@ public class GroupEntitiesSparkJobTest {
@Test @Test
@Order(2) @Order(2)
void testDispatchEntities() throws Exception { void testDispatchEntities() throws Exception {
for (String type : Lists DispatchEntitiesSparkJob.main(new String[] {
.newArrayList( "-isSparkSessionManaged",
Publication.class.getCanonicalName(), eu.dnetlib.dhp.schema.oaf.Dataset.class.getCanonicalName())) { Boolean.FALSE.toString(),
String directory = StringUtils.substringAfterLast(type, ".").toLowerCase(); "-inputPath",
DispatchEntitiesSparkJob.main(new String[] { groupEntityPath.toString(),
"-isSparkSessionManaged", "-outputPath",
Boolean.FALSE.toString(), dispatchEntityPath.resolve(".").toString(),
"-inputPath", "-filterInvisible",
groupEntityPath.toString(), Boolean.TRUE.toString()
"-outputPath", });
dispatchEntityPath.resolve(directory).toString(),
"-graphTableClassName",
type
});
}
Dataset<Result> output = spark Dataset<Result> output = spark
.read() .read()
@ -140,5 +131,4 @@ public class GroupEntitiesSparkJobTest {
.filter((FilterFunction<String>) s -> s.equals("dataset")) .filter((FilterFunction<String>) s -> s.equals("dataset"))
.count()); .count());
} }
} }