[Enrichment Step] get rid of hive

This commit is contained in:
Miriam Baglioni 2022-04-13 11:48:03 +02:00
parent aecea5a095
commit d1519fa28f
29 changed files with 560 additions and 403 deletions

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.countrypropagation;
package eu.dnetlib.dhp;
import java.io.Serializable;

View File

@ -23,4 +23,5 @@ public class KeyValueSet implements Serializable {
public void setValueSet(ArrayList<String> valueSet) {
this.valueSet = valueSet;
}
}

View File

@ -4,22 +4,21 @@ package eu.dnetlib.dhp;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.*;
public class PropagationConstant {
@ -221,9 +220,28 @@ public class PropagationConstant {
.orElse(Boolean.FALSE);
}
public static void createCfHbforResult(SparkSession spark) {
org.apache.spark.sql.Dataset<Row> cfhb = spark.sql(cfHbforResultQuery);
cfhb.createOrReplaceTempView("cfhb");
// of the results collects the distinct keys for collected from (at the level of the result) and hosted by
// and produces pairs resultId, key for each distinct key associated to the result
public static <R extends Result> void createCfHbforResult(SparkSession spark, String inputPath, String outputPath,
Class<R> resultClazz) {
readPath(spark, inputPath, resultClazz)
.filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible())
.flatMap((FlatMapFunction<R, EntityEntityRel>) r -> {
Set<String> cfhb = r.getCollectedfrom().stream().map(cf -> cf.getKey()).collect(Collectors.toSet());
cfhb.addAll(r.getInstance().stream().map(i -> i.getHostedby().getKey()).collect(Collectors.toSet()));
return cfhb
.stream()
.map(value -> EntityEntityRel.newInstance(r.getId(), value))
.collect(Collectors.toList())
.iterator();
}, Encoders.bean(EntityEntityRel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
public static <R> Dataset<R> readPath(

View File

@ -10,7 +10,6 @@ import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -19,8 +18,7 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.EntityEntityRel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Datasource;
@ -56,8 +54,8 @@ public class PrepareDatasourceCountryAssociation {
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath {}: ", workingPath);
SparkConf conf = new SparkConf();
@ -65,13 +63,13 @@ public class PrepareDatasourceCountryAssociation {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
removeOutputDir(spark, workingPath);
prepareDatasourceCountryAssociation(
spark,
Arrays.asList(parser.get("whitelist").split(";")),
Arrays.asList(parser.get("allowedtypes").split(";")),
inputPath,
outputPath);
workingPath + "/datasourceCountry");
});
}

View File

@ -2,17 +2,14 @@
package eu.dnetlib.dhp.countrypropagation;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
@ -23,6 +20,8 @@ import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.EntityEntityRel;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@ -49,15 +48,18 @@ public class PrepareResultCountrySet {
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String datasourcecountrypath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", datasourcecountrypath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
log.info("resultType: {}", resultType);
String outputPath = workingPath + "/" + resultType; // parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String datasourcecountrypath = workingPath + "/datasourceCountry";// parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", datasourcecountrypath);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
@ -72,7 +74,7 @@ public class PrepareResultCountrySet {
inputPath,
outputPath,
datasourcecountrypath,
workingPath,
workingPath + "/resultCfHb/" + resultType,
resultClazz);
});
}
@ -85,31 +87,11 @@ public class PrepareResultCountrySet {
String workingPath,
Class<R> resultClazz) {
// selects all the results non deleted by inference and non invisible
Dataset<R> result = readPath(spark, inputPath, resultClazz)
.filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible());
// of the results collects the distinct keys for collected from (at the level of the result) and hosted by
// and produces pairs resultId, key for each distinct key associated to the result
result.flatMap((FlatMapFunction<R, EntityEntityRel>) r -> {
Set<String> cfhb = r.getCollectedfrom().stream().map(cf -> cf.getKey()).collect(Collectors.toSet());
cfhb.addAll(r.getInstance().stream().map(i -> i.getHostedby().getKey()).collect(Collectors.toSet()));
return cfhb
.stream()
.map(value -> EntityEntityRel.newInstance(r.getId(), value))
.collect(Collectors.toList())
.iterator();
}, Encoders.bean(EntityEntityRel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/resultCfHb");
PropagationConstant.createCfHbforResult(spark, inputPath, workingPath, resultClazz);
Dataset<DatasourceCountry> datasource_country = readPath(spark, datasourcecountrypath, DatasourceCountry.class);
Dataset<EntityEntityRel> cfhb = readPath(spark, workingPath + "/resultCfHb", EntityEntityRel.class);
Dataset<EntityEntityRel> cfhb = readPath(spark, workingPath, EntityEntityRel.class);
datasource_country
.joinWith(

View File

@ -47,8 +47,8 @@ public class SparkCountryPropagationJob {
String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath);
String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
@ -67,7 +67,7 @@ public class SparkCountryPropagationJob {
execPropagation(
spark,
sourcePath,
preparedInfoPath,
workingPath,
outputPath,
resultClazz);
});

View File

@ -1,7 +1,9 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
public class AutoritativeAuthor {
import java.io.Serializable;
public class AutoritativeAuthor implements Serializable {
private String name;
private String surname;

View File

@ -13,6 +13,7 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -22,6 +23,7 @@ import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -57,8 +59,10 @@ public class PrepareResultOrcidAssociationStep1 {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final List<String> allowedsemrel = Arrays.stream(parser.get("allowedsemrels").split(";"))
.map(s -> s.toLowerCase()).collect(Collectors.toList());
final List<String> allowedsemrel = Arrays
.stream(parser.get("allowedsemrels").split(";"))
.map(s -> s.toLowerCase())
.collect(Collectors.toList());
log.info("allowedSemRel: {}", new Gson().toJson(allowedsemrel));
@ -124,29 +128,32 @@ public class PrepareResultOrcidAssociationStep1 {
Dataset<R> result = readPath(spark, outputPath + "/resultSubset", resultClazz);
result.foreach((ForeachFunction<R>) r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
result
.joinWith(relation, result.col("id").equalTo(relation.col("source")))
.map((MapFunction<Tuple2<R, Relation>, ResultOrcidList>) t2 -> {
ResultOrcidList rol = new ResultOrcidList();
rol.setResultId(t2._2().getTarget());
List<AutoritativeAuthor> aal = new ArrayList<>();
t2._1().getAuthor().stream().forEach(a -> {
a.getPid().stream().forEach(p -> {
if (allowedPids.contains(p.getQualifier().getClassid().toLowerCase())) {
aal
.add(
AutoritativeAuthor
.newInstance(a.getName(), a.getSurname(), a.getFullname(), p.getValue()));
}
});
.joinWith(relation, result.col("id").equalTo(relation.col("source")))
.map((MapFunction<Tuple2<R, Relation>, ResultOrcidList>) t2 -> {
ResultOrcidList rol = new ResultOrcidList();
rol.setResultId(t2._2().getTarget());
List<AutoritativeAuthor> aal = new ArrayList<>();
t2._1().getAuthor().stream().forEach(a -> {
a.getPid().stream().forEach(p -> {
if (allowedPids.contains(p.getQualifier().getClassid().toLowerCase())) {
aal
.add(
AutoritativeAuthor
.newInstance(a.getName(), a.getSurname(), a.getFullname(), p.getValue()));
}
});
return rol;
}, Encoders.bean(ResultOrcidList.class)).write()
});
rol.setAuthorList(aal);
return rol;
}, Encoders.bean(ResultOrcidList.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + resultType);
}
private static boolean hasAllowedPid(Author a, List<String> allowedPids) {

View File

@ -65,30 +65,31 @@ public class PrepareResultOrcidAssociationStep2 {
.union(readPath(spark, inputPath + "/software", ResultOrcidList.class));
resultOrcidAssoc
.groupByKey((MapFunction<ResultOrcidList, String>) rol -> rol.getResultId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, ResultOrcidList, ResultOrcidList>) (k, it) ->{
ResultOrcidList resultOrcidList = it.next();
if(it.hasNext())
{
.groupByKey((MapFunction<ResultOrcidList, String>) rol -> rol.getResultId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, ResultOrcidList, ResultOrcidList>) (k, it) -> {
ResultOrcidList resultOrcidList = it.next();
if (it.hasNext()) {
Set<String> orcid_set = new HashSet<>();
resultOrcidList.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid()));
it.forEachRemaining(val -> val
.getAuthorList()
.stream()
.forEach(
it
.forEachRemaining(
val -> val
.getAuthorList()
.stream()
.forEach(
aa -> {
if (!orcid_set.contains(aa.getOrcid())) {
resultOrcidList.getAuthorList().add(aa);
orcid_set.add(aa.getOrcid());
}
}));
}
return resultOrcidList;
},Encoders.bean(ResultOrcidList.class) )
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
}
return resultOrcidList;
}, Encoders.bean(ResultOrcidList.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -2,7 +2,6 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.List;
@ -61,7 +60,6 @@ public class SparkOrcidToResultFromSemRelJob {
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,

View File

@ -23,4 +23,11 @@ public class DatasourceOrganization implements Serializable {
public void setOrganizationId(String organizationId) {
this.organizationId = organizationId;
}
public static DatasourceOrganization newInstance(String datasourceId, String organizationId) {
DatasourceOrganization dso = new DatasourceOrganization();
dso.datasourceId = datasourceId;
dso.organizationId = organizationId;
return dso;
}
}

View File

@ -2,17 +2,17 @@
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -28,6 +28,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultInstRepoAssociation {
@ -49,14 +50,11 @@ public class PrepareResultInstRepoAssociation {
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String datasourceOrganizationPath = parser.get("datasourceOrganizationPath");
log.info("datasourceOrganizationPath {}: ", datasourceOrganizationPath);
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
List<String> blacklist = Optional
.ofNullable(parser.get("blacklist"))
@ -64,82 +62,92 @@ public class PrepareResultInstRepoAssociation {
.orElse(new ArrayList<>());
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
readNeededResources(spark, inputPath);
readNeededResources(spark, inputPath, workingPath, blacklist);
removeOutputDir(spark, datasourceOrganizationPath);
prepareDatasourceOrganization(spark, datasourceOrganizationPath, blacklist);
prepareDatasourceOrganization(spark, workingPath);
removeOutputDir(spark, alreadyLinkedPath);
prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath);
prepareAlreadyLinkedAssociation(spark, workingPath);
});
}
private static void readNeededResources(SparkSession spark, String inputPath) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class);
datasource.createOrReplaceTempView("datasource");
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
relation.createOrReplaceTempView("relation");
Dataset<Organization> organization = readPath(spark, inputPath + "/organization", Organization.class);
organization.createOrReplaceTempView("organization");
}
private static void prepareDatasourceOrganization(
SparkSession spark, String datasourceOrganizationPath, List<String> blacklist) {
final String blacklisted = blacklist
.stream()
.map(s -> " AND id != '" + s + "'")
.collect(Collectors.joining());
String query = "SELECT source datasourceId, target organizationId "
+ "FROM ( SELECT id "
+ "FROM datasource "
+ "WHERE datasourcetype.classid = '"
+ INSTITUTIONAL_REPO_TYPE
+ "' "
+ "AND datainfo.deletedbyinference = false " + blacklisted + " ) d "
+ "JOIN ( SELECT source, target "
+ "FROM relation "
+ "WHERE lower(relclass) = '"
+ ModelConstants.IS_PROVIDED_BY.toLowerCase()
+ "' "
+ "AND datainfo.deletedbyinference = false ) rel "
+ "ON d.id = rel.source ";
spark
.sql(query)
.as(Encoders.bean(DatasourceOrganization.class))
private static void readNeededResources(SparkSession spark, String inputPath, String workingPath,
List<String> blacklist) {
readPath(spark, inputPath + "/datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) ds -> !blacklist.contains(ds.getId()) &&
!ds.getDataInfo().getDeletedbyinference() &&
ds.getDatasourcetype().getClassid().equals(INSTITUTIONAL_REPO_TYPE))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(datasourceOrganizationPath);
.json(workingPath + "/datasource");
readPath(spark, inputPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
(r.getRelClass().toLowerCase().equals(ModelConstants.IS_PROVIDED_BY.toLowerCase()) ||
r.getRelClass().toLowerCase().equals(ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase())))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/relation");
}
private static void prepareDatasourceOrganization(
SparkSession spark, String workingPath) {
Dataset<Datasource> datasource = readPath(spark, workingPath + "/datasource", Datasource.class);
Dataset<Relation> relation = readPath(spark, workingPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> r
.getRelClass()
.toLowerCase()
.equals(ModelConstants.IS_PROVIDED_BY.toLowerCase()));
datasource
.joinWith(relation, datasource.col("id").equalTo(relation.col("source")))
.map(
(MapFunction<Tuple2<Datasource, Relation>, DatasourceOrganization>) t2 -> DatasourceOrganization
.newInstance(t2._2().getSource(), t2._2().getTarget()),
Encoders.bean(DatasourceOrganization.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/ datasourceOrganization");
;
}
private static void prepareAlreadyLinkedAssociation(
SparkSession spark, String alreadyLinkedPath) {
String query = "Select source key, collect_set(target) valueSet "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ "and lower(relClass) = '"
+ ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase()
+ "' "
+ "group by source";
SparkSession spark, String workingPath) {
readPath(spark, workingPath + "/relation", Relation.class)
.filter(
(FilterFunction<Relation>) r -> r
.getRelClass()
.toLowerCase()
.equals(ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase()))
.groupByKey((MapFunction<Relation, String>) r -> r.getSource(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Relation, KeyValueSet>) (k, it) -> {
Set<String> values = new HashSet<>();
KeyValueSet kvs = new KeyValueSet();
kvs.setKey(k);
values.add(it.next().getTarget());
it.forEachRemaining(r -> values.add(r.getTarget()));
kvs.setValueSet(new ArrayList<>(values));
return kvs;
}, Encoders.bean(KeyValueSet.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/alreadyLinked");
spark
.sql(query)
.as(Encoders.bean(KeyValueSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
}

View File

@ -118,7 +118,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(inputPath.substring(0, inputPath.indexOf("/") + 1) + "relation");
.json(inputPath.substring(0, inputPath.lastIndexOf("/") + 1) + "relation");
}
private static FlatMapFunction<Tuple2<KeyValueSet, KeyValueSet>, Relation> createRelationFn() {
@ -157,12 +157,14 @@ public class SparkResultToOrganizationFromIstRepoJob {
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result.createOrReplaceTempView("result");
Dataset<Row> cfhb = spark.sql("select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb "
+
"from result r " +
"lateral view explode(instance) i as inst " +
"where r.datainfo.deletedbyinference=false");
//createCfHbforResult(spark);
Dataset<Row> cfhb = spark
.sql(
"select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb "
+
"from result r " +
"lateral view explode(instance) i as inst " +
"where r.datainfo.deletedbyinference=false");
// createCfHbforResult(spark);
cfhb.createOrReplaceTempView("cfhb");
dsOrg.createOrReplaceTempView("rels");

View File

@ -18,8 +18,8 @@
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "preparedInfoPath",
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the path where prepared info have been stored",
"paramRequired": false
},

View File

@ -5,12 +5,6 @@
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"out",
"paramLongName":"outputPath",
"paramDescription": "the output path",
"paramRequired": true
},
{
"paramName":"w",
"paramLongName":"workingPath",
@ -23,12 +17,7 @@
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "preparedInfoPath",
"paramDescription": "the path where prepared info have been stored",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",

View File

@ -6,21 +6,9 @@
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName":"dop",
"paramLongName":"datasourceOrganizationPath",
"paramDescription": "path where to store/find association from datasource and organization",
"paramRequired": true
},
{
"paramName":"alp",
"paramLongName":"alreadyLinkedPath",
"paramDescription": "path where to store/find already linked results and organizations",
"paramName":"wp",
"paramLongName":"workingPath",
"paramDescription": "path where to store/find prepared/ filtered data",
"paramRequired": true
},
{

View File

@ -220,10 +220,10 @@
<name>sourcePath</name>
<value>${outputPath}</value>
</property>
<property>
<name>outputPath</name>
<value>${outputPath}</value>
</property>
<!-- <property>-->
<!-- <name>outputPath</name>-->
<!-- <value>${outputPath}</value>-->
<!-- </property>-->
</configuration>
</sub-workflow>
<ok to="community_organization" />

View File

@ -65,7 +65,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
<arg>--allowedtypes</arg><arg>${allowedtypes}</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/preparedInfo</arg>
<arg>--workingPath</arg><arg>${workingDir}/country</arg>
</spark>
<ok to="fork_join_prepare_result_country"/>
<error to="Kill"/>
@ -99,10 +99,8 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingP</arg>
<arg>--workingPath</arg><arg>${workingDir}/country</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -129,10 +127,8 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingD</arg>
<arg>--workingPath</arg><arg>${workingDir}/country</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -159,10 +155,8 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingO</arg>
<arg>--workingPath</arg><arg>${workingDir}/country</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -189,10 +183,8 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingS</arg>
<arg>--workingPath</arg><arg>${workingDir}/country</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -228,12 +220,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<<<<<<< HEAD:dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
=======
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg>
>>>>>>> beta:dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml
<arg>--workingPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
</spark>
@ -262,12 +249,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<<<<<<< HEAD:dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
=======
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg>
>>>>>>> beta:dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml
<arg>--workingPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
</spark>
@ -296,12 +278,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<<<<<<< HEAD:dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app/workflow.xml
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
=======
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg>
>>>>>>> beta:dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml
<arg>--workingPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
</spark>
@ -330,7 +307,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
</spark>

View File

@ -112,10 +112,10 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--allowedpis</arg><arg>${allowedpids}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -140,10 +140,10 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--allowedpis</arg><arg>${allowedpids}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -168,10 +168,10 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--allowedpis</arg><arg>${allowedpids}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -196,10 +196,10 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/preparedInfo/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--allowedpis</arg><arg>${allowedpids}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -263,7 +263,6 @@
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
</spark>
@ -294,7 +293,6 @@
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
</spark>
@ -325,7 +323,6 @@
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
</spark>
@ -356,7 +353,6 @@
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/preparedInfo/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
</spark>

View File

@ -45,9 +45,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/datasourceOrganization</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/affiliationInstRepo/preparedInfo/alreadyLinked</arg>
<arg>--workingPath</arg><arg>${workingDir}/affiliationInstRepo</arg>
<arg>--blacklist</arg><arg>${blacklist}</arg>
</spark>
<ok to="fork_join_apply_resulttoorganization_propagation"/>

View File

@ -4,10 +4,10 @@
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>sets the outputPath</description>
</property>
<!-- <property>-->
<!-- <name>outputPath</name>-->
<!-- <description>sets the outputPath</description>-->
<!-- </property>-->
</parameters>
<global>
@ -21,27 +21,27 @@
</configuration>
</global>
<start to="resume_from"/>
<start to="prepare_info"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="resume_from">
<switch>
<case to="prepare_info">${wf:conf('resumeFrom') eq 'PrepareInfo'}</case>
<default to="reset_outputpath"/> <!-- first action to be done when downloadDump is to be performed -->
</switch>
</decision>
<!-- <decision name="resume_from">-->
<!-- <switch>-->
<!-- <case to="prepare_info">${wf:conf('resumeFrom') eq 'PrepareInfo'}</case>-->
<!-- <default to="reset_outputpath"/> &lt;!&ndash; first action to be done when downloadDump is to be performed &ndash;&gt;-->
<!-- </switch>-->
<!-- </decision>-->
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="prepare_info"/>
<error to="Kill"/>
</action>
<!-- <action name="reset_outputpath">-->
<!-- <fs>-->
<!-- <delete path="${outputPath}"/>-->
<!-- <mkdir path="${outputPath}"/>-->
<!-- </fs>-->
<!-- <ok to="prepare_info"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="prepare_info">
@ -91,7 +91,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--relationPath</arg><arg>${workingDir}/affiliationSemanticRelation/preparedInfo/relation</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--outputPath</arg><arg>${sourcePath}</arg>
<arg>--leavesPath</arg><arg>${workingDir}/affiliationSemanticRelation/preparedInfo/leavesPath</arg>
<arg>--childParentPath</arg><arg>${workingDir}/affiliationSemanticRelation/preparedInfo/childParentPath</arg>
<arg>--resultOrgPath</arg><arg>${workingDir}/affiliationSemanticRelation/preparedInfo/resultOrgPath</arg>

View File

@ -5,7 +5,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.FileUtils;
import org.apache.neethi.Assertion;
import org.apache.spark.SparkConf;
@ -26,6 +25,8 @@ import com.google.gson.Gson;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareStep1Test {
@ -89,154 +90,118 @@ public class PrepareStep1Test {
Assertions.assertEquals(0, tmp.count());
Assertions.assertEquals(7, sc
.textFile(workingDir.toString() + "/preparedInfo/relationSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class)).count());
Assertions
.assertEquals(
7, sc
.textFile(workingDir.toString() + "/preparedInfo/relationSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.count());
Assertions.assertEquals(0, sc
.textFile(workingDir.toString() + "/preparedInfo/resultSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)).count());
Assertions
.assertEquals(
0, sc
.textFile(workingDir.toString() + "/preparedInfo/resultSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class))
.count());
}
@Test
void oneUpdateTest() throws Exception {
SparkOrcidToResultFromSemRelJob
void matchTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparestep1")
.getPath();
PrepareResultOrcidAssociationStep1
.main(
new String[] {
"-isTest",
Boolean.TRUE.toString(),
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/oneupdate")
.getPath(),
"-hive_metastore_uris",
"",
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath",
workingDir.toString() + "/dataset",
"-possibleUpdatesPath",
getClass()
.getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
.getPath()
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-resultTableName", Publication.class.getCanonicalName(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-allowedsemrels", "IsSupplementedBy;IsSupplementTo",
"-allowedpids", "orcid;orcid_pending"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
JavaRDD<ResultOrcidList> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/publication")
.map(item -> OBJECT_MAPPER.readValue(item, ResultOrcidList.class));
// tmp.map(s -> new Gson().toJson(s)).foreach(s -> System.out.println(s));
Assertions.assertEquals(1, tmp.count());
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.name name, MyT.surname surname, MyP.value pid, MyP.qualifier.classid pidType "
+ "from dataset "
+ "lateral view explode(author) a as MyT "
+ "lateral view explode(MyT.pid) p as MyP "
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
Assertions.assertEquals(1, propagatedAuthors.count());
tmp.foreach(e -> System.out.println(OBJECT_MAPPER.writeValueAsString(e)));
Assertions
.assertEquals(
1,
propagatedAuthors
.filter(
"id = '50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e' "
+ "and name = 'Vajinder' and surname = 'Kumar' and pidType = '" +
1, tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.count());
Assertions
.assertEquals(
1, tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.size());
Assertions
.assertEquals(
"0000-0002-5001-6911",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getOrcid());
Assertions
.assertEquals(
"Barbarić-Mikočević, Željka",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getFullname());
Assertions
.assertEquals(
"Željka",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getName());
Assertions
.assertEquals(
"Barbarić-Mikočević",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getSurname());
ModelConstants.ORCID_PENDING + "'")
Assertions
.assertEquals(
7, sc
.textFile(workingDir.toString() + "/preparedInfo/relationSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.count());
Assertions.assertEquals(1, propagatedAuthors.filter("pid = '0000-0002-8825-3517'").count());
}
@Test
void twoUpdatesTest() throws Exception {
SparkOrcidToResultFromSemRelJob
.main(
new String[] {
"-isTest",
Boolean.TRUE.toString(),
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
.getPath(),
"-hive_metastore_uris",
"",
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath",
workingDir.toString() + "/dataset",
"-possibleUpdatesPath",
getClass()
.getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
.getPath()
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.name name, MyT.surname surname, MyP.value pid, MyP.qualifier.classid pidType "
+ "from dataset "
+ "lateral view explode(author) a as MyT "
+ "lateral view explode(MyT.pid) p as MyP "
+ "where MyP.datainfo.inferenceprovenance = 'propagation'";
org.apache.spark.sql.Dataset<Row> propagatedAuthors = spark.sql(query);
Assertions.assertEquals(2, propagatedAuthors.count());
Assertions
.assertEquals(
1, propagatedAuthors.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
Assertions
.assertEquals(
1, propagatedAuthors.filter("name = 'Ruediger' and surname = 'Beckhaus'").count());
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
+ "from dataset "
+ "lateral view explode(author) a as MyT "
+ "lateral view explode(MyT.pid) p as MyP ";
org.apache.spark.sql.Dataset<Row> authorsExplodedPids = spark.sql(query);
Assertions
.assertEquals(
2, authorsExplodedPids.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
Assertions
.assertEquals(
1,
authorsExplodedPids
.filter(
"name = 'Marc' and surname = 'Schmidtmann' and pidType = 'MAG Identifier'")
1, sc
.textFile(workingDir.toString() + "/preparedInfo/resultSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class))
.count());
}
}

View File

@ -0,0 +1,222 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareStep2Test {
private static final Logger log = LoggerFactory.getLogger(PrepareStep2Test.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareStep2Test.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareStep2Test.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
spark = SparkSession
.builder()
.appName(PrepareStep2Test.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testMatch() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/resultSubset")
.getPath();
PrepareResultOrcidAssociationStep2
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString() + "/preparedInfo/mergedOrcidAssoc"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultOrcidList> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/mergedOrcidAssoc")
.map(item -> OBJECT_MAPPER.readValue(item, ResultOrcidList.class));
Assertions.assertEquals(1, tmp.count());
Assertions
.assertEquals(
1,
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.count());
Assertions
.assertEquals(
2, tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.size());
Assertions
.assertTrue(
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.stream()
.anyMatch(aa -> aa.getOrcid().equals("0000-0002-1234-5678")));
Assertions
.assertTrue(
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.stream()
.anyMatch(aa -> aa.getOrcid().equals("0000-0002-5001-6911")));
}
@Test
void matchTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparestep1")
.getPath();
PrepareResultOrcidAssociationStep1
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-resultTableName", Publication.class.getCanonicalName(),
"-outputPath", workingDir.toString() + "/preparedInfo",
"-allowedsemrels", "IsSupplementedBy;IsSupplementTo",
"-allowedpids", "orcid;orcid_pending"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultOrcidList> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/publication")
.map(item -> OBJECT_MAPPER.readValue(item, ResultOrcidList.class));
Assertions.assertEquals(1, tmp.count());
tmp.foreach(e -> System.out.println(OBJECT_MAPPER.writeValueAsString(e)));
Assertions
.assertEquals(
1, tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.count());
Assertions
.assertEquals(
1, tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.size());
Assertions
.assertEquals(
"0000-0002-5001-6911",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getOrcid());
Assertions
.assertEquals(
"Barbarić-Mikočević, Željka",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getFullname());
Assertions
.assertEquals(
"Željka",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getName());
Assertions
.assertEquals(
"Barbarić-Mikočević",
tmp
.filter(rol -> rol.getResultId().equals("50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217"))
.collect()
.get(0)
.getAuthorList()
.get(0)
.getSurname());
Assertions
.assertEquals(
7, sc
.textFile(workingDir.toString() + "/preparedInfo/relationSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.count());
Assertions
.assertEquals(
1, sc
.textFile(workingDir.toString() + "/preparedInfo/resultSubset")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class))
.count());
}
}

View File

@ -0,0 +1 @@
{"resultId":"50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217","authorList":[{"name":"Željka","surname":"Barbarić-Mikočević","fullname":"Barbarić-Mikočević, Željka","orcid":"0000-0002-5001-6911"}]}

View File

@ -0,0 +1 @@
{"resultId":"50|475c1990cbb2::46b9f15a3e887ccb154a696c4e7e4217","authorList":[{"name":"Vesna","surname":"Džimbeg-Malčić","fullname":"Džimbeg-Malčić, Vesna","orcid":"0000-0002-1234-5678"}]}