This commit is contained in:
Miriam Baglioni 2020-07-17 12:39:27 +02:00
parent da0d22e367
commit e1e024c013
38 changed files with 146 additions and 146 deletions

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-actionset-remapping</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-aggregation</artifactId>
<version>1.2.4-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;
import java.util.List;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;
@ -12,7 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
public class common implements Serializable {
public class Common implements Serializable {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -57,7 +57,7 @@ public class PrepareInfo implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
common.removeOutputDir(spark, outputPath);
Common.removeOutputDir(spark, outputPath);
exec(
spark, asInputPath, inputGraphPath, outputPath,
actionSetList.stream().map(as -> {
@ -74,7 +74,7 @@ public class PrepareInfo implements Serializable {
private static void exec(SparkSession spark, String asInputPath, String graphInputPath, String outputPath,
List<ActionSet> actionSets) {
Dataset<Relation> relation = common.readPath(spark, graphInputPath + "/relation", Relation.class);
Dataset<Relation> relation = Common.readPath(spark, graphInputPath + "/relation", Relation.class);
actionSets
.forEach(
@ -104,7 +104,7 @@ public class PrepareInfo implements Serializable {
.createDataset(
sc
.sequenceFile(asInputPath, Text.class, Text.class)
.map(a -> common.OBJECT_MAPPER.readValue(a._2().toString(), AtomicAction.class))
.map(a -> Common.OBJECT_MAPPER.readValue(a._2().toString(), AtomicAction.class))
.map(aa -> getAsResultInfo(aa))
.filter(Objects::nonNull)
.rdd(),

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;
import java.util.List;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;
import java.util.List;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.common;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -28,7 +28,7 @@ public class SparkExpandResultInfo implements Serializable {
.toString(
SparkExpandResultInfo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/remapping/input_expand_parameters.json"));
"/eu/dnetlib/dhp/actionset/common/input_expand_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -51,15 +51,15 @@ public class SparkExpandResultInfo implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
common.removeOutputDir(spark, outputPath);
Common.removeOutputDir(spark, outputPath);
exec(spark, inputPath, outputPath);
});
}
private static void exec(SparkSession spark, String inputPath, String outputPath) {
Dataset<RelationMerges> rel = common.readPath(spark, inputPath + "/relation", RelationMerges.class);
Dataset<RelationMerges> rel = Common.readPath(spark, inputPath + "/relation", RelationMerges.class);
Dataset<ASResultInfo> asResultInfo = common.readPath(spark, inputPath + "/actionset", ASResultInfo.class);
Dataset<ASResultInfo> asResultInfo = Common.readPath(spark, inputPath + "/actionset", ASResultInfo.class);
asResultInfo
.joinWith(rel, asResultInfo.col("id").equalTo(rel.col("dedupId")), "left")

View File

@ -1,27 +1,16 @@
package eu.dnetlib.dhp.actionmanager.remapping;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
package eu.dnetlib.dhp.actionset.common;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkPrepareInfo implements Serializable {
@ -32,7 +21,7 @@ public class SparkPrepareInfo implements Serializable {
.toString(
SparkPrepareInfo.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/remapping/input_prepare_parameters.json"));
"/eu/dnetlib/dhp/actionset/common/input_prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.stable;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.stable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import eu.dnetlib.dhp.actionset.common.ASResultInfo;
import eu.dnetlib.dhp.actionset.common.Common;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.*;
@ -36,7 +38,7 @@ public class SparkRedistributeIISRelations implements Serializable {
.toString(
SparkRedistributeIISRelations.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/remapping/input_redistribute_parameters.json"));
"/eu/dnetlib/dhp/actionset/stable/input_redistribute_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -60,9 +62,9 @@ public class SparkRedistributeIISRelations implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
common.removeOutputDir(spark, outputPath);
Dataset<ResultPid> resultPidDataset = common.readPath(spark, inputPath, ResultPid.class);
Dataset<ASResultInfo> asResultInfoDataset = common.readPath(spark, asInputPath, ASResultInfo.class);
Common.removeOutputDir(spark, outputPath);
Dataset<ResultPid> resultPidDataset = Common.readPath(spark, inputPath, ResultPid.class);
Dataset<ASResultInfo> asResultInfoDataset = Common.readPath(spark, asInputPath, ASResultInfo.class);
execRelation(spark, asResultInfoDataset.filter("type = 'relation'"), resultPidDataset, outputPath);
});
}
@ -129,6 +131,6 @@ public class SparkRedistributeIISRelations implements Serializable {
String st = "";
System.out.println(st);
return new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(common.OBJECT_MAPPER.writeValueAsString(aa)));
new Text(Common.OBJECT_MAPPER.writeValueAsString(aa)));
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.stable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -22,6 +22,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import eu.dnetlib.dhp.actionset.common.ASResultInfo;
import eu.dnetlib.dhp.actionset.common.Common;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Context;
@ -30,16 +32,16 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
public class SparkRedistributeIISResult implements Serializable {
public class SparkRedistributeIISResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkRedistributeIISResult.class);
private static final Logger log = LoggerFactory.getLogger(SparkRedistributeIISResults.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkRedistributeIISResult.class
SparkRedistributeIISResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/remapping/input_redistribute_parameters.json"));
"/eu/dnetlib/dhp/actionset/stable/input_redistribute_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -66,9 +68,9 @@ public class SparkRedistributeIISResult implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
common.removeOutputDir(spark, outputPath);
Dataset<ResultPid> resultPidDataset = common.readPath(spark, inputPath, ResultPid.class);
Dataset<ASResultInfo> asResultInfoDataset = common.readPath(spark, asInputPath, ASResultInfo.class);
Common.removeOutputDir(spark, outputPath);
Dataset<ResultPid> resultPidDataset = Common.readPath(spark, inputPath, ResultPid.class);
Dataset<ASResultInfo> asResultInfoDataset = Common.readPath(spark, asInputPath, ASResultInfo.class);
execResult(spark, asResultInfoDataset.filter("type = 'result'"), resultPidDataset, outputPath);
// execRelation(spark, asResultInfoDataset.filter("type = 'relation'"), resultPidDataset,
// outputPathRelation);
@ -120,6 +122,6 @@ public class SparkRedistributeIISResult implements Serializable {
private static Tuple2<Text, Text> getTextTextTuple2(Result r) throws JsonProcessingException {
AtomicAction aa = new AtomicAction(Result.class, r);
return new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(common.OBJECT_MAPPER.writeValueAsString(aa)));
new Text(Common.OBJECT_MAPPER.writeValueAsString(aa)));
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset.stable;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -14,6 +14,7 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionset.common.Common;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -26,7 +27,7 @@ public class SparkSelectResults implements Serializable {
.toString(
SparkSelectResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/remapping/input_select_parameters.json"));
"/eu/dnetlib/dhp/actionset/stable/input_select_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -53,7 +54,7 @@ public class SparkSelectResults implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
common.removeOutputDir(spark, outputPath);
Common.removeOutputDir(spark, outputPath);
run(spark, inputPath, outputPath);
});
}
@ -66,7 +67,7 @@ public class SparkSelectResults implements Serializable {
private static <R extends Result> void exec(SparkSession spark, String inputPath, String outputPath,
Class<R> resultClazz) {
Dataset<R> result = common.readPath(spark, inputPath, resultClazz);
Dataset<R> result = Common.readPath(spark, inputPath, resultClazz);
result.createOrReplaceTempView("result");

View File

@ -51,4 +51,8 @@
<name>sparkExecutorCores</name>
<value>1</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -30,8 +30,10 @@
</kill>
<action name="deleteoutputpath">
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path='${relationOutputPath}'/>
<mkdir path='${relationOutputPath}'/>
<delete path='${resultOutputPath}'/>
<mkdir path='${resultOutputPath}'/>
<delete path='${workingDir}'/>
<mkdir path='${workingDir}'/>
</fs>
@ -45,8 +47,8 @@
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareInfo</name>
<class>eu.dnetlib.dhp.actionmanager.remapping.SparkPrepareInfo</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<class>eu.dnetlib.dhp.actionset.common.SparkPrepareInfo</class>
<jar>dhp-actionset-remapping-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -59,7 +61,7 @@
</spark-opts>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSets</arg><arg>${actionSets}</arg>
<arg>--inputPath</arg><arg>${inputPath</arg>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="prepare_next"/>
@ -76,8 +78,8 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ExpandRelation</name>
<class>eu.dnetlib.dhp.actionmanager.remapping.SparkExpandResultInfo</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<class>eu.dnetlib.dhp.actionset.common.SparkExpandResultInfo</class>
<jar>dhp-actionset-remapping-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -100,8 +102,8 @@
<master>yarn</master>
<mode>cluster</mode>
<name>SelectResults</name>
<class>eu.dnetlib.dhp.actionmanager.remapping.SparkSelectResults</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<class>eu.dnetlib.dhp.actionset.stable.SparkSelectResults</class>
<jar>dhp-actionset-remapping-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -131,8 +133,8 @@
<master>yarn</master>
<mode>cluster</mode>
<name>RedistributeRelation</name>
<class>eu.dnetlib.dhp.actionmanager.remapping.SparkRedistributeIISRelations</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<class>eu.dnetlib.dhp.actionset.stable.SparkRedistributeIISRelations</class>
<jar>dhp-actionset-remapping-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -145,7 +147,7 @@
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/selectedResults</arg>
<arg>--asInputPath</arg><arg>${workingDir}/expandedActionSet</arg>
<arg>--outputPath</arg><arg>${relationOutputPath></arg>
<arg>--outputPath</arg><arg>${relationOutputPath}</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
@ -156,8 +158,8 @@
<master>yarn</master>
<mode>cluster</mode>
<name>SelectResults</name>
<class>eu.dnetlib.dhp.actionmanager.remapping.SparkRedistributeIISResult</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<class>eu.dnetlib.dhp.actionset.stable.SparkRedistributeIISResult</class>
<jar>dhp-actionset-remapping-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -170,7 +172,7 @@
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/selectedResults</arg>
<arg>--asInputPath</arg><arg>${workingDir}/expandedActionSet</arg>
<arg>--outputPath</arg><arg>${resultOutputPath></arg>
<arg>--outputPath</arg><arg>${resultOutputPath}</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>

View File

@ -1,12 +1,11 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.neethi.Assertion;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@ -23,15 +22,13 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.actionset.common.ASResultInfo;
import eu.dnetlib.dhp.actionset.common.SparkExpandResultInfo;
public class ExpandResultInfoTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = PrepareInfoTest.class
.getClassLoader();
private static SparkSession spark;
private static final String FAKE_ISLOOKUP = "http://beta.services.openaire.eu/";
@ -76,7 +73,7 @@ public class ExpandResultInfoTest {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step2/preparedInfo").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step2/preparedInfo").getPath(),
"-outputPath",
workingDir.toString() + "/expandedActionSet"
});

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset;
import java.io.IOException;
import java.nio.file.Files;
@ -22,6 +22,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.actionset.common.ASResultInfo;
import eu.dnetlib.dhp.actionset.common.ActionSet;
import eu.dnetlib.dhp.actionset.common.PrepareInfo;
import eu.dnetlib.dhp.actionset.common.RelationMerges;
@Disabled
public class PrepareInfoTest {
@ -187,8 +192,8 @@ public class PrepareInfoTest {
public void testRelationsAS() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1").getPath(),
new Gson().fromJson("[\"iis-referenced-projects-main\"]", List.class),
actionSetList);
@ -286,8 +291,8 @@ public class PrepareInfoTest {
public void testInitiative() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1").getPath(),
new Gson().fromJson("[\"iis-researchinitiative\"]", List.class),
actionSetList);
@ -355,8 +360,8 @@ public class PrepareInfoTest {
public void testCommunities() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1").getPath(),
new Gson().fromJson("[\"iis-communities\"]", List.class),
actionSetList);
@ -416,8 +421,8 @@ public class PrepareInfoTest {
public void testCovid19() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1").getPath(),
new Gson().fromJson("[\"iis-covid-19\"]", List.class),
actionSetList);
@ -462,8 +467,8 @@ public class PrepareInfoTest {
public void testAll() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1").getPath(),
new Gson()
.fromJson(
"[\"iis-researchinitiative\",\"iis-referenced-projects-main\",\"iis-communities\",\"iis-covid-19\"]",
@ -497,8 +502,8 @@ public class PrepareInfoTest {
public void testRelationMerged() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionset/step1").getPath(),
new Gson().fromJson("[]", List.class),
actionSetList);

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset;
import java.io.IOException;
import java.nio.file.Files;
@ -22,30 +22,29 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.remapping.SparkRedistributeIISResult;
import eu.dnetlib.dhp.actionset.stable.SparkRedistributeIISRelations;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
public class RedistributeIISResultTest {
public class RedistributeIISLinksTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = RedistributeIISResultTest.class
.getClassLoader();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(RedistributeIISResultTest.class);
.getLogger(RedistributeIISLinksTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(RedistributeIISResultTest.class.getSimpleName());
.createTempDirectory(RedistributeIISLinksTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(RedistributeIISResultTest.class.getSimpleName());
conf.setAppName(RedistributeIISLinksTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
@ -56,7 +55,7 @@ public class RedistributeIISResultTest {
spark = SparkSession
.builder()
.appName(RedistributeIISResultTest.class.getSimpleName())
.appName(RedistributeIISLinksTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@ -76,13 +75,13 @@ public class RedistributeIISResultTest {
Boolean.FALSE.toString(),
"-asInputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/actionset")
.getResource("/eu/dnetlib/dhp/actionset/step4/actionset")
.getPath(),
"-outputPath",
workingDir.toString() + "/relationActionSet",
"-inputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/result")
.getResource("/eu/dnetlib/dhp/actionset/step4/result")
.getPath()
});
@ -156,13 +155,13 @@ public class RedistributeIISResultTest {
Boolean.FALSE.toString(),
"-asInputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/actionset")
.getResource("/eu/dnetlib/dhp/actionset/step4/actionset")
.getPath(),
"-outputPath",
workingDir.toString() + "/resultActionSet",
"-inputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/result")
.getResource("/eu/dnetlib/dhp/actionset/step4/result")
.getPath()
});
@ -179,50 +178,6 @@ public class RedistributeIISResultTest {
verificationDataset.createOrReplaceTempView("verificationDataset");
// Assertions.assertEquals(0, verificationDataset.filter("substr(id,1,8) = '50|dedup'").count());
//
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a'").count());
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|od_______166::779de9b3a2d224779be52fae43b5fc80'").count());
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|od_______165::779de9b3a2d224779be52fae43b5fc80'").count());
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|od______3515::779de9b3a2d224779be52fae43b5fc80'").count());
//
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::78329557c23bee513963ebf295d1434d'").count());
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::8978b9b797294da5306950a94a58d98c'").count());
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::fb2c70723d74f45329640255a959333d'").count());
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|base_oa_____::fb2c70723d74f45329640255a959333d'").count());
//
// Assertions.assertEquals(5, verificationDataset.filter("id = '50|_____OmicsDI::039dbb63f11b19dc15113b34ebceb0d2' " +
// "or id = '50|_____OmicsDI::05f133acca27d72866c6720a95515f57' or " +
// "id = '50|_____OmicsDI::2d508eba981699a30e969d1ab5a068b8' or " +
// "id = '50|datacite____::00bddedc38dc045780dc84c27bc8fecd' or " +
// "id = '50|datacite____::00f7f89392fa75e944dc8d329e9e8024'").count());
//
//
// verificationDataset.createOrReplaceTempView("verificationDataset");
//
// Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
// "FROM verificationDataset " +
// "LATERAL VIEW EXPLODE(value) v as val"));
//
// Assertions.assertEquals(25, verify.count());
//
// Assertions.assertEquals(20, verify.filter("type = 'relation'").count());
// Assertions.assertEquals(5, verify.filter("type = 'result'").count());
//
// Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
// "and value = '40|rcuk________::8dec51859e6b66cd040670b432b9e59c' and " +
// "prov = 'iis::document_referencedProjects' and " +
// "trust = '0.897'").count());
//
// Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
// "and value = '40|rcuk________::5e312e08bd65f126d7d79b3d1d677eb3' and " +
// "prov = 'iis::document_referencedProjects' and " +
// "trust = '0.897'").count());
//
// Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
// "and value = '40|corda_______::6d500f8fceb2bb81b0750820469e1cd8' and " +
// "prov = 'iis::document_referencedProjects' and " +
// "trust = '0.7085'").count());
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.remapping;
package eu.dnetlib.dhp.actionset;
import java.io.IOException;
import java.nio.file.Files;
@ -21,12 +21,12 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionset.stable.ResultPid;
import eu.dnetlib.dhp.actionset.stable.SparkSelectResults;
public class SelectResultTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = SelectResultTest.class
.getClassLoader();
private static SparkSession spark;
private static Path workingDir;
@ -71,7 +71,7 @@ public class SelectResultTest {
Boolean.FALSE.toString(),
"-inputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step3")
.getResource("/eu/dnetlib/dhp/actionset/step3")
.getPath(),
"-outputPath",
workingDir.toString() + "/selectedResults"