Merge pull request 'Priority to records from delegated authorities' (#187) from delegated_authorities into beta
Reviewed-on: #187pull/192/head
commit
a70b0990c9
@ -0,0 +1,18 @@
|
||||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
@ -0,0 +1,298 @@
|
||||
<workflow-app name="Group Graph Entities" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>graphBasePath</name>
|
||||
<description>the input graph base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>path of the working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>graphOutputPath</name>
|
||||
<description>path of the output graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="group_entities"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="group_entities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>group graph entities</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=15000
|
||||
</spark-opts>
|
||||
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
</spark>
|
||||
<ok to="fork_dispatch_entities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_dispatch_entities">
|
||||
<path start="dispatch_datasource"/>
|
||||
<path start="dispatch_project"/>
|
||||
<path start="dispatch_organization"/>
|
||||
<path start="dispatch_publication"/>
|
||||
<path start="dispatch_dataset"/>
|
||||
<path start="dispatch_software"/>
|
||||
<path start="dispatch_otherresearchproduct"/>
|
||||
</fork>
|
||||
|
||||
<action name="dispatch_datasource">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch publications</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="dispatch_project">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch project</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="dispatch_organization">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch organization</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="dispatch_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch publication</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="dispatch_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch dataset</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="dispatch_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch software</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="dispatch_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Dispatch otherresearchproduct</name>
|
||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
</spark>
|
||||
<ok to="wait_dispatch"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_dispatch" to="copy_relation"/>
|
||||
|
||||
<action name="copy_relation">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphBasePath}/relation</arg>
|
||||
<arg>${nameNode}/${graphOutputPath}/relation</arg>
|
||||
</distcp>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
@ -0,0 +1,144 @@
|
||||
|
||||
package eu.dnetlib.dhp.oa.graph.group;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
|
||||
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class GroupEntitiesSparkJobTest {
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static ObjectMapper mapper = new ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
private static Path workingDir;
|
||||
private Path dataInputPath;
|
||||
|
||||
private Path groupEntityPath;
|
||||
private Path dispatchEntityPath;
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
|
||||
conf.setMaster("local");
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void beforeEach() throws IOException, URISyntaxException {
|
||||
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
|
||||
groupEntityPath = workingDir.resolve("grouped_entity");
|
||||
dispatchEntityPath = workingDir.resolve("dispatched_entity");
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
spark.stop();
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
void testGroupEntities() throws Exception {
|
||||
GroupEntitiesSparkJob.main(new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-graphInputPath",
|
||||
dataInputPath.toString(),
|
||||
"-outputPath",
|
||||
groupEntityPath.toString()
|
||||
});
|
||||
|
||||
Dataset<Result> output = spark
|
||||
.read()
|
||||
.textFile(groupEntityPath.toString())
|
||||
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
|
||||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
||||
|
||||
assertEquals(
|
||||
1,
|
||||
output
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
|
||||
.equals(r.getId()) &&
|
||||
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
|
||||
.count());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(2)
|
||||
void testDispatchEntities() throws Exception {
|
||||
for (String type : Lists
|
||||
.newArrayList(
|
||||
Publication.class.getCanonicalName(), eu.dnetlib.dhp.schema.oaf.Dataset.class.getCanonicalName())) {
|
||||
String directory = StringUtils.substringAfterLast(type, ".").toLowerCase();
|
||||
DispatchEntitiesSparkJob.main(new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-inputPath",
|
||||
groupEntityPath.toString(),
|
||||
"-outputPath",
|
||||
dispatchEntityPath.resolve(directory).toString(),
|
||||
"-graphTableClassName",
|
||||
type
|
||||
});
|
||||
}
|
||||
|
||||
Dataset<Result> output = spark
|
||||
.read()
|
||||
.textFile(
|
||||
DHPUtils
|
||||
.toSeq(
|
||||
HdfsSupport
|
||||
.listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration())))
|
||||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
||||
|
||||
assertEquals(3, output.count());
|
||||
assertEquals(
|
||||
2,
|
||||
output
|
||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("publication"))
|
||||
.count());
|
||||
assertEquals(
|
||||
1,
|
||||
output
|
||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||
.filter((FilterFunction<String>) s -> s.equals("dataset"))
|
||||
.count());
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue