forked from D-Net/dnet-hadoop
[Enrichment Step] get rid of hive
This commit is contained in:
parent
a6c26a9e0e
commit
157e6bf5e1
|
@ -63,12 +63,6 @@ public class SparkBulkTagJob {
|
||||||
final String resultClassName = parser.get("resultTableName");
|
final String resultClassName = parser.get("resultTableName");
|
||||||
log.info("resultTableName: {}", resultClassName);
|
log.info("resultTableName: {}", resultClassName);
|
||||||
|
|
||||||
final Boolean saveGraph = Optional
|
|
||||||
.ofNullable(parser.get("saveGraph"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("saveGraph: {}", saveGraph);
|
|
||||||
|
|
||||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
|
@ -40,4 +40,13 @@ public class AutoritativeAuthor {
|
||||||
this.orcid = orcid;
|
this.orcid = orcid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static AutoritativeAuthor newInstance(String name, String surname, String fullname, String orcid) {
|
||||||
|
AutoritativeAuthor aa = new AutoritativeAuthor();
|
||||||
|
aa.name = name;
|
||||||
|
aa.surname = surname;
|
||||||
|
aa.fullname = fullname;
|
||||||
|
aa.orcid = orcid;
|
||||||
|
return aa;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,17 +2,22 @@
|
||||||
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.apache.spark.sql.sources.v2.reader.InputPartition;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -20,8 +25,11 @@ import com.google.gson.Gson;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class PrepareResultOrcidAssociationStep1 {
|
public class PrepareResultOrcidAssociationStep1 {
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class);
|
||||||
|
@ -51,13 +59,15 @@ public class PrepareResultOrcidAssociationStep1 {
|
||||||
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
final List<String> allowedsemrel = Arrays.asList(parser.get("allowedsemrels").split(";"));
|
||||||
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
|
||||||
|
|
||||||
|
final List<String> allowedPids = Arrays.asList(parser.get("allowedpids").split(";"));
|
||||||
|
log.info("allowedPids: {}", new Gson().toJson(allowedPids));
|
||||||
|
|
||||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
||||||
log.info("resultType: {}", resultType);
|
log.info("resultType: {}", resultType);
|
||||||
|
|
||||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
|
||||||
|
|
||||||
String inputRelationPath = inputPath + "/relation";
|
String inputRelationPath = inputPath + "/relation";
|
||||||
log.info("inputRelationPath: {}", inputRelationPath);
|
log.info("inputRelationPath: {}", inputRelationPath);
|
||||||
|
@ -68,57 +78,82 @@ public class PrepareResultOrcidAssociationStep1 {
|
||||||
String outputResultPath = outputPath + "/" + resultType;
|
String outputResultPath = outputPath + "/" + resultType;
|
||||||
log.info("outputResultPath: {}", outputResultPath);
|
log.info("outputResultPath: {}", outputResultPath);
|
||||||
|
|
||||||
runWithSparkHiveSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
prepareInfo(
|
prepareInfo(
|
||||||
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
|
spark, inputPath, outputPath, resultType, resultClazz, allowedsemrel, allowedPids);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <R extends Result> void prepareInfo(
|
private static <R extends Result> void prepareInfo(
|
||||||
SparkSession spark,
|
SparkSession spark,
|
||||||
String inputRelationPath,
|
String inputPath,
|
||||||
String inputResultPath,
|
String outputPath,
|
||||||
String outputResultPath,
|
String resultType,
|
||||||
Class<R> resultClazz,
|
Class<R> resultClazz,
|
||||||
List<String> allowedsemrel) {
|
List<String> allowedsemrel,
|
||||||
|
List<String> allowedPids) {
|
||||||
|
|
||||||
Dataset<Relation> relation = readPath(spark, inputRelationPath, Relation.class);
|
final String inputResultPath = inputPath + "/" + resultType;
|
||||||
relation.createOrReplaceTempView("relation");
|
readPath(spark, inputPath + "/relation", Relation.class)
|
||||||
|
.filter(
|
||||||
|
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference()
|
||||||
|
&& allowedsemrel.contains(r.getRelClass().toLowerCase()))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/relationSubset");
|
||||||
|
|
||||||
|
Dataset<Relation> relation = readPath(spark, outputPath + "/relationSubset", Relation.class);
|
||||||
|
|
||||||
log.info("Reading Graph table from: {}", inputResultPath);
|
log.info("Reading Graph table from: {}", inputResultPath);
|
||||||
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
|
readPath(spark, inputResultPath, resultClazz)
|
||||||
result.createOrReplaceTempView("result");
|
.filter(
|
||||||
|
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() && !r.getDataInfo().getInvisible())
|
||||||
|
.filter((FilterFunction<R>) r -> r.getAuthor().stream().anyMatch(a -> hasAllowedPid(a, allowedPids)))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/resultSubset");
|
||||||
|
|
||||||
String query = "SELECT target resultId, author authorList"
|
Dataset<R> result = readPath(spark, outputPath + "/resultSubset", resultClazz);
|
||||||
+ " FROM (SELECT id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
|
|
||||||
+ " FROM ( "
|
|
||||||
+ " SELECT DISTINCT id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid "
|
|
||||||
+ " FROM result "
|
|
||||||
+ " LATERAL VIEW EXPLODE (author) a AS MyT "
|
|
||||||
+ " LATERAL VIEW EXPLODE (MyT.pid) p AS MyP "
|
|
||||||
+ " WHERE lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID + "' or "
|
|
||||||
+ " lower(MyP.qualifier.classid) = '" + ModelConstants.ORCID_PENDING + "') tmp "
|
|
||||||
+ " GROUP BY id) r_t "
|
|
||||||
+ " JOIN ("
|
|
||||||
+ " SELECT source, target "
|
|
||||||
+ " FROM relation "
|
|
||||||
+ " WHERE datainfo.deletedbyinference = false "
|
|
||||||
+ getConstraintList(" lower(relclass) = '", allowedsemrel)
|
|
||||||
+ " ) rel_rel "
|
|
||||||
+ " ON source = id";
|
|
||||||
|
|
||||||
log.info("executedQuery: {}", query);
|
result
|
||||||
spark
|
.joinWith(relation, result.col("id").equalTo(relation.col("source")))
|
||||||
.sql(query)
|
.map((MapFunction<Tuple2<R, Relation>, ResultOrcidList>) t2 -> {
|
||||||
.as(Encoders.bean(ResultOrcidList.class))
|
ResultOrcidList rol = new ResultOrcidList();
|
||||||
|
rol.setResultId(t2._2().getTarget());
|
||||||
|
List<AutoritativeAuthor> aal = new ArrayList<>();
|
||||||
|
t2._1().getAuthor().stream().forEach(a -> {
|
||||||
|
a.getPid().stream().forEach(p -> {
|
||||||
|
if (allowedPids.contains(p.getQualifier().getClassid().toLowerCase())) {
|
||||||
|
aal
|
||||||
|
.add(
|
||||||
|
AutoritativeAuthor
|
||||||
|
.newInstance(a.getName(), a.getSurname(), a.getFullname(), p.getValue()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return rol;
|
||||||
|
}, Encoders.bean(ResultOrcidList.class))
|
||||||
.write()
|
.write()
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(outputResultPath);
|
.json(outputPath + "/" + resultType);
|
||||||
|
;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean hasAllowedPid(Author a, List<String> allowedPids) {
|
||||||
|
Optional<List<StructuredProperty>> oPid = Optional.ofNullable(a.getPid());
|
||||||
|
if (!oPid.isPresent()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return oPid.get().stream().anyMatch(p -> allowedPids.contains(p.getQualifier().getClassid().toLowerCase()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,12 +71,6 @@ public class SparkResultToOrganizationFromIstRepoJob {
|
||||||
final String resultClassName = parser.get("resultTableName");
|
final String resultClassName = parser.get("resultTableName");
|
||||||
log.info("resultTableName: {}", resultClassName);
|
log.info("resultTableName: {}", resultClassName);
|
||||||
|
|
||||||
final Boolean saveGraph = Optional
|
|
||||||
.ofNullable(parser.get("saveGraph"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("saveGraph: {}", saveGraph);
|
|
||||||
|
|
||||||
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
@ -86,15 +80,15 @@ public class SparkResultToOrganizationFromIstRepoJob {
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
if (saveGraph) {
|
|
||||||
execPropagation(
|
execPropagation(
|
||||||
spark,
|
spark,
|
||||||
datasourceorganization,
|
datasourceorganization,
|
||||||
alreadylinked,
|
alreadylinked,
|
||||||
inputPath,
|
inputPath,
|
||||||
outputPath,
|
outputPath,
|
||||||
resultClazz);
|
resultClazz);
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "out",
|
"paramName": "out",
|
||||||
"paramLongName": "workingPath",
|
"paramLongName": "outputPath",
|
||||||
"paramDescription": "the path used to store temporary output files",
|
"paramDescription": "the path used to store temporary output files",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
|
|
@ -12,9 +12,9 @@
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName":"h",
|
"paramName":"ap",
|
||||||
"paramLongName":"hive_metastore_uris",
|
"paramLongName":"allowedpids",
|
||||||
"paramDescription": "the hive metastore uris",
|
"paramDescription": "the allowed pid type to be used for propagation",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -41,6 +41,10 @@
|
||||||
<name>pathMap</name>
|
<name>pathMap</name>
|
||||||
<description>the json path associated to each selection field</description>
|
<description>the json path associated to each selection field</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>blacklist</name>
|
||||||
|
<description>list of datasources in blacklist for the affiliation from instrepo propagation</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>hiveDbName</name>
|
<name>hiveDbName</name>
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
<description>the json path associated to each selection field</description>
|
<description>the json path associated to each selection field</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>workingPath</name>
|
<name>outputPath</name>
|
||||||
<description>the output path</description>
|
<description>the output path</description>
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
|
@ -225,7 +225,7 @@
|
||||||
--conf spark.dynamicAllocation.enabled=true
|
--conf spark.dynamicAllocation.enabled=true
|
||||||
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc</arg>
|
<arg>--sourcePath</arg><arg>${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="fork-join-exec-propagation"/>
|
<ok to="fork-join-exec-propagation"/>
|
||||||
|
|
|
@ -0,0 +1,236 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
|
||||||
|
public class PrepareStep1Test {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PrepareStep1Test.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files.createTempDirectory(PrepareStep1Test.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(PrepareStep1Test.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(PrepareStep1Test.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void noUpdateTest() throws Exception {
|
||||||
|
//7 relationi fra issupplementedby e issupplementto
|
||||||
|
|
||||||
|
final String sourcePath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparestep1")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
PrepareResultOrcidAssociationStep1
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath", sourcePath,
|
||||||
|
"-resultTableName", Dataset.class.getCanonicalName(),
|
||||||
|
"-outputPath", workingDir.toString() + "/preparedInfo",
|
||||||
|
"-allowedsemrels", "IsSupplementedBy;IsSupplementTo",
|
||||||
|
"-allowedpids","orcid;orcid_pending"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<ResultOrcidList> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/preparedInfo")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, ResultOrcidList.class));
|
||||||
|
|
||||||
|
System.out.println("***************** COUNT ********************* \n" + tmp.count());
|
||||||
|
tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void oneUpdateTest() throws Exception {
|
||||||
|
SparkOrcidToResultFromSemRelJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
|
||||||
|
.getPath(),
|
||||||
|
"-hive_metastore_uris",
|
||||||
|
"",
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/dataset",
|
||||||
|
"-possibleUpdatesPath",
|
||||||
|
getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
||||||
|
.getPath()
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||||
|
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
|
String query = "select id, MyT.name name, MyT.surname surname, MyP.value pid, MyP.qualifier.classid pidType "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(author) a as MyT "
|
||||||
|
+ "lateral view explode(MyT.pid) p as MyP "
|
||||||
|
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, propagatedAuthors.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
propagatedAuthors
|
||||||
|
.filter(
|
||||||
|
"id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' "
|
||||||
|
+ "and name = 'Vajinder' and surname = 'Kumar' and pidType = '" +
|
||||||
|
|
||||||
|
ModelConstants.ORCID_PENDING + "'")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0002-8825-3517'").count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void twoUpdatesTest() throws Exception {
|
||||||
|
SparkOrcidToResultFromSemRelJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
|
||||||
|
.getPath(),
|
||||||
|
"-hive_metastore_uris",
|
||||||
|
"",
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/dataset",
|
||||||
|
"-possibleUpdatesPath",
|
||||||
|
getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
|
||||||
|
.getPath()
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||||
|
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
|
String query = "select id, MyT.name name, MyT.surname surname, MyP.value pid, MyP.qualifier.classid pidType "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(author) a as MyT "
|
||||||
|
+ "lateral view explode(MyT.pid) p as MyP "
|
||||||
|
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(2, propagatedAuthors.count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, propagatedAuthors.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, propagatedAuthors.filter("name = 'Ruediger' and surname = 'Beckhaus'").count());
|
||||||
|
|
||||||
|
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(author) a as MyT "
|
||||||
|
+ "lateral view explode(MyT.pid) p as MyP ";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> authorsExplodedPids = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
2, authorsExplodedPids.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
authorsExplodedPids
|
||||||
|
.filter(
|
||||||
|
"name = 'Marc' and surname = 'Schmidtmann' and pidType = 'MAG Identifier'")
|
||||||
|
.count());
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,18 @@
|
||||||
|
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"isSupplementedBy","relType":"datasourceOrganization","source":"50|57a035e5b1ae::0637d444355058eb76ab6d7a842aa8b4","subRelType":"provision","target":"50|475c1990cbb2::02d3c300ac2d07135a6208159c512f62","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::21f8a223b9925c2f87c404096080b046","value":"Registry of Research Data Repository"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"isSupplementedBy","relType":"datasourceOrganization","source":"50|57a035e5b1ae::01894f77220771428abaecbfa2bcc8f7","subRelType":"provision","target":"50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::6ac933301a3933c8a22ceebea7000326","value":"Academy of Finland"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"isSupplementTo","relType":"projectOrganization","source":"50|475c1990cbb2::02d3c300ac2d07135a6208159c512f62","subRelType":"participation","target":"50|57a035e5b1ae::0637d444355058eb76ab6d7a842aa8b4","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::6ac933301a3933c8a22ceebea7000326","value":"Academy of Finland"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"isSupplementTo","relType":"projectOrganization","source":"50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217","subRelType":"participation","target":"50|57a035e5b1ae::01894f77220771428abaecbfa2bcc8f7","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::457528c43fabd74e212db2ed61101075","value":"Agence Nationale de la Recherche"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"isSupplementedBy","relType":"projectOrganization","source":"50|57a035e5b1ae::07b10647d24e46073785210d4715f4e9","subRelType":"participation","target":"50|475c1990cbb2::699e01797642d72238c502ffcae18277","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::457528c43fabd74e212db2ed61101075","value":"Agence Nationale de la Recherche"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"IsSupplementedBy","relType":"projectOrganization","source":"50|57a035e5b1ae::0cee1d69f1cab270c382eaa853bcf4dc","subRelType":"participation","target":"50|475c1990cbb2::b778659ec5014f3db4c4e03c7907a69d","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::457528c43fabd74e212db2ed61101075","value":"Agence Nationale de la Recherche"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"IsSupplementTo","relType":"projectOrganization","source":"50|57a035e5b1ae::0d428b3119b0c822270df15058029172","subRelType":"participation","target":"50|475c1990cbb2::c8172336a860b66965e8d43a5494de2c","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::27b677f5d4a8b3a1159dba624016dc70","subRelType":"participation","target":"20|corda_______::0790e5c820c6a795d2b7524415cefb53","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::b5db617bb0f475b49584f5ee5120227c","subRelType":"participation","target":"20|corda_______::16220fe1781e3beb748872d31aa7f789","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::2907ce789238006cbe07f3e89820c9df","subRelType":"participation","target":"20|corda_______::43edcb7ca35d487ec357959e05c7ed7b","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::d185f413b046d7a7b15808388dad71a5","subRelType":"participation","target":"20|corda_______::46ac0acd65a3c66b10842bf291be9660","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::9c454e23267b520b621199fd4a79e3a6","subRelType":"participation","target":"20|corda_______::86fa29ae6a36610616e1691e1283f807","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::e40925978874b5f57378f301370e1293","subRelType":"participation","target":"20|corda_______::88e4a05f9c42a4830ffdd51663ed4538","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::0362fcdb3076765d9c0041ad331553e8","value":"OpenOrgs Database"}],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.990"},"lastupdatetimestamp":1649252022894,"properties":[],"relClass":"merges","relType":"organizationOrganization","source":"20|pending_org_::5a01343420bc742ec1891cd98c36a258","subRelType":"dedup","target":"20|corda_______::a7468d48c5f0517ec67a2a9163af7150","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::531cfba3fa5e10f6be1e42e3c54cc95f","subRelType":"participation","target":"20|corda_______::b2233c6930da222c40e78302385a277d","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::48cb178c2561829bc2eedd787c052d48","subRelType":"participation","target":"20|corda_______::cd8ad1c4f710b667b74362c1674b92e6","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::795be98a5ba4c9190a32fc56033a9540","subRelType":"participation","target":"20|corda_______::f2323f9ed70f0f3a93fdfbb92f715e0e","validated":false}
|
||||||
|
{"collectedfrom":[{"key":"10|openaire____::b30dac7baac631f3da7c2bb18dd9891f","value":"CORDA - COmmon Research DAta Warehouse"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"lastupdatetimestamp":1649252022977,"properties":[],"relClass":"hasParticipant","relType":"projectOrganization","source":"40|corda_______::ca5b255e4b2ef49ff424e0019962591c","subRelType":"participation","target":"20|corda_______::f2323f9ed70f0f3a93fdfbb92f715e0e","validated":false}
|
Loading…
Reference in New Issue