forked from D-Net/dnet-hadoop
refactoring
This commit is contained in:
parent
e5a177f0a7
commit
1b0e0bd1b5
|
@ -1,88 +1,95 @@
|
|||
package eu.dnetlib.dhp;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
public class PropagationConstant {
|
||||
public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional";
|
||||
|
||||
public final static String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
public static final String PROPAGATION_DATA_INFO_TYPE = "propagation";
|
||||
|
||||
public static final String TRUE = "true";
|
||||
|
||||
public static final String DNET_COUNTRY_SCHEMA = "dnet:countries";
|
||||
public static final String DNET_SCHEMA_NAME = "dnet:provenanceActions";
|
||||
public static final String DNET_SCHEMA_ID = "dnet:provenanceActions";
|
||||
|
||||
public final static String DNET_COUNTRY_SCHEMA = "dnet:countries";
|
||||
public final static String DNET_SCHEMA_NAME = "dnet:provenanceActions";
|
||||
public final static String DNET_SCHEMA_ID = "dnet:provenanceActions";
|
||||
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos";
|
||||
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME =
|
||||
"Propagation of country to result collected from datasources of type institutional repositories";
|
||||
|
||||
public final static String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos";
|
||||
public final static String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = "Propagation of country to result collected from datasources of type institutional repositories";
|
||||
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID =
|
||||
"result:organization:instrepo";
|
||||
public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME =
|
||||
"Propagation of affiliation to result collected from datasources of type institutional repository";
|
||||
|
||||
public final static String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID = "result:organization:instrepo";
|
||||
public final static String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = "Propagation of affiliation to result collected from datasources of type institutional repository";
|
||||
public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID =
|
||||
"result:project:semrel";
|
||||
public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME =
|
||||
"Propagation of result to project through semantic relation";
|
||||
|
||||
public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel";
|
||||
public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID =
|
||||
"result:community:semrel";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME =
|
||||
" Propagation of result belonging to community through semantic relation";
|
||||
|
||||
public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID = "result:community:semrel";
|
||||
public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME = " Propagation of result belonging to community through semantic relation";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID =
|
||||
"result:community:organization";
|
||||
public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME =
|
||||
" Propagation of result belonging to community through organization";
|
||||
|
||||
public final static String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization";
|
||||
public final static String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID =
|
||||
"authorpid:result";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME =
|
||||
"Propagation of authors pid to result through semantic relations";
|
||||
|
||||
public final static String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "propagation:orcid:result";
|
||||
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of ORCID through result linked by isSupplementedBy or isSupplementTo semantic relations";
|
||||
public static final String RELATION_DATASOURCE_ORGANIZATION_REL_CLASS = "provides";
|
||||
|
||||
public final static String RELATION_DATASOURCEORGANIZATION_REL_TYPE = "datasourceOrganization";
|
||||
public final static String RELATION_DATASOURCEORGANIZATION_SUBREL_TYPE = "provision";
|
||||
public final static String RELATION_ORGANIZATION_DATASOURCE_REL_CLASS = "isProvidedBy";
|
||||
public final static String RELATION_DATASOURCE_ORGANIZATION_REL_CLASS = "provides";
|
||||
|
||||
public final static String RELATION_RESULTORGANIZATION_REL_TYPE = "resultOrganization";
|
||||
public final static String RELATION_RESULTORGANIZATION_SUBREL_TYPE = "affiliation";
|
||||
public final static String RELATION_ORGANIZATION_RESULT_REL_CLASS = "isAuthorInstitutionOf";
|
||||
public final static String RELATION_RESULT_ORGANIZATION_REL_CLASS = "hasAuthorInstitution";
|
||||
public static final String RELATION_RESULTORGANIZATION_REL_TYPE = "resultOrganization";
|
||||
public static final String RELATION_RESULTORGANIZATION_SUBREL_TYPE = "affiliation";
|
||||
public static final String RELATION_ORGANIZATION_RESULT_REL_CLASS = "isAuthorInstitutionOf";
|
||||
public static final String RELATION_RESULT_ORGANIZATION_REL_CLASS = "hasAuthorInstitution";
|
||||
|
||||
public static final String RELATION_RESULTRESULT_REL_TYPE = "resultResult";
|
||||
public static final String RELATION_RESULTRESULT_SUBREL_TYPE = "supplement";
|
||||
|
||||
public static final String RELATION_RESULTPROJECT_REL_TYPE = "resultProject";
|
||||
public static final String RELATION_RESULTPROJECT_SUBREL_TYPE = "outcome";
|
||||
public static final String RELATION_RESULT_PROJECT_REL_CLASS = "isProducedBy";
|
||||
public static final String RELATION_PROJECT_RESULT_REL_CLASS = "produces";
|
||||
|
||||
|
||||
public static final String RELATION_RESULT_REPRESENTATIVERESULT_REL_CLASS = "isMergedIn";
|
||||
public static final String RELATION_REPRESENTATIVERESULT_RESULT_CLASS = "merges";
|
||||
|
||||
|
||||
public static final String RELATION_ORGANIZATIONORGANIZATION_REL_TYPE = "organizationOrganization";
|
||||
|
||||
public static final String RELATION_DEDUPORGANIZATION_SUBREL_TYPE = "dedup";
|
||||
|
||||
public static final String PROPAGATION_AUTHOR_PID = "ORCID";
|
||||
|
||||
public static Country getCountry(String country){
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static Country getCountry(String classid, String classname) {
|
||||
Country nc = new Country();
|
||||
nc.setClassid(country);
|
||||
nc.setClassname(country);
|
||||
nc.setClassid(classid);
|
||||
nc.setClassname(classname);
|
||||
nc.setSchemename(DNET_COUNTRY_SCHEMA);
|
||||
nc.setSchemeid(DNET_COUNTRY_SCHEMA);
|
||||
nc.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME));
|
||||
nc.setDataInfo(
|
||||
getDataInfo(
|
||||
PROPAGATION_DATA_INFO_TYPE,
|
||||
PROPAGATION_COUNTRY_INSTREPO_CLASS_ID,
|
||||
PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME));
|
||||
return nc;
|
||||
}
|
||||
|
||||
public static DataInfo getDataInfo(String inference_provenance, String inference_class_id, String inference_class_name){
|
||||
public static DataInfo getDataInfo(
|
||||
String inference_provenance, String inference_class_id, String inference_class_name) {
|
||||
DataInfo di = new DataInfo();
|
||||
di.setInferred(true);
|
||||
di.setDeletedbyinference(false);
|
||||
|
@ -101,8 +108,15 @@ public class PropagationConstant {
|
|||
return pa;
|
||||
}
|
||||
|
||||
|
||||
public static Relation getRelation(String source, String target, String rel_class, String rel_type, String subrel_type, String inference_provenance, String inference_class_id, String inference_class_name){
|
||||
public static Relation getRelation(
|
||||
String source,
|
||||
String target,
|
||||
String rel_class,
|
||||
String rel_type,
|
||||
String subrel_type,
|
||||
String inference_provenance,
|
||||
String inference_class_id,
|
||||
String inference_class_name) {
|
||||
Relation r = new Relation();
|
||||
r.setSource(source);
|
||||
r.setTarget(target);
|
||||
|
@ -111,107 +125,17 @@ public class PropagationConstant {
|
|||
r.setSubRelType(subrel_type);
|
||||
r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name));
|
||||
return r;
|
||||
}
|
||||
|
||||
public static PairFunction<TypedRow, String, TypedRow> toPair() {
|
||||
return e -> new Tuple2<>( e.getSourceId(), e);
|
||||
|
||||
}
|
||||
|
||||
public static JavaPairRDD<String, TypedRow> getResultResultSemRel(List<String> allowedsemrel, JavaRDD<Relation> relations) {
|
||||
return relations
|
||||
.filter(r -> !r.getDataInfo().getDeletedbyinference())
|
||||
.filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType()))
|
||||
.map(r -> {
|
||||
TypedRow tr = new TypedRow();
|
||||
tr.setSourceId(r.getSource());
|
||||
tr.setTargetId(r.getTarget());
|
||||
return tr;
|
||||
})
|
||||
.mapToPair(toPair());
|
||||
}
|
||||
|
||||
|
||||
public static String getConstraintList(String text, List<String> constraints){
|
||||
public static String getConstraintList(String text, List<String> constraints) {
|
||||
String ret = " and (" + text + constraints.get(0) + "'";
|
||||
for (int i =1; i < constraints.size(); i++){
|
||||
for (int i = 1; i < constraints.size(); i++) {
|
||||
ret += " OR " + text + constraints.get(i) + "'";
|
||||
}
|
||||
ret += ")";
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
public static List<TypedRow> getTypedRowsDatasourceResult(OafEntity oaf) {
|
||||
List<TypedRow> lst = new ArrayList<>();
|
||||
Set<String> datasources_provenance = new HashSet<>();
|
||||
List<Instance> instanceList = null;
|
||||
String type = "";
|
||||
if (oaf.getClass() == Publication.class) {
|
||||
instanceList = ((Publication) oaf).getInstance();
|
||||
type = "publication";
|
||||
}
|
||||
if (oaf.getClass() == Dataset.class){
|
||||
instanceList = ((Dataset)oaf).getInstance();
|
||||
type = "dataset";
|
||||
}
|
||||
|
||||
if (oaf.getClass() == Software.class){
|
||||
instanceList = ((Software)oaf).getInstance();
|
||||
type = "software";
|
||||
}
|
||||
|
||||
if (oaf.getClass() == OtherResearchProduct.class){
|
||||
instanceList = ((OtherResearchProduct)oaf).getInstance();
|
||||
type = "otherresearchproduct";
|
||||
}
|
||||
|
||||
|
||||
for (Instance i : instanceList) {
|
||||
datasources_provenance.add(i.getCollectedfrom().getKey());
|
||||
datasources_provenance.add(i.getHostedby().getKey());
|
||||
}
|
||||
for (String dsId : datasources_provenance) {
|
||||
TypedRow tr = new TypedRow();
|
||||
tr.setSourceId(dsId);
|
||||
tr.setTargetId(oaf.getId());
|
||||
tr.setType(type);
|
||||
lst.add(tr);
|
||||
}
|
||||
return lst;
|
||||
}
|
||||
|
||||
public static void updateResultForCommunity(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type, String class_id, String class_name) {
|
||||
results.leftOuterJoin(toupdateresult)
|
||||
.map(p -> {
|
||||
Result r = p._2()._1();
|
||||
if (p._2()._2().isPresent()){
|
||||
Set<String> communityList = p._2()._2().get().getAccumulator();
|
||||
for(Context c: r.getContext()){
|
||||
if (communityList.contains(c.getId())){
|
||||
//verify if the datainfo for this context contains propagation
|
||||
if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){
|
||||
c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name));
|
||||
//community id already in the context of the result. Remove it from the set that has to be added
|
||||
communityList.remove(c.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Context> cc = r.getContext();
|
||||
for(String cId: communityList){
|
||||
Context context = new Context();
|
||||
context.setId(cId);
|
||||
context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)));
|
||||
cc.add(context);
|
||||
}
|
||||
r.setContext(cc);
|
||||
}
|
||||
return r;
|
||||
})
|
||||
.map(p -> new ObjectMapper().writeValueAsString(p))
|
||||
.saveAsTextFile(outputPath+"/"+type);
|
||||
}
|
||||
|
||||
public static void createOutputDirs(String outputPath, FileSystem fs) throws IOException {
|
||||
if (fs.exists(new Path(outputPath))) {
|
||||
fs.delete(new Path(outputPath), true);
|
||||
|
@ -219,4 +143,61 @@ public class PropagationConstant {
|
|||
fs.mkdirs(new Path(outputPath));
|
||||
}
|
||||
|
||||
public static void removeOutputDir(SparkSession spark, String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) {
|
||||
return Optional.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
}
|
||||
|
||||
public static Boolean isTest(ArgumentApplicationParser parser) {
|
||||
return Optional.ofNullable(parser.get("isTest"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.FALSE);
|
||||
}
|
||||
|
||||
public static void createCfHbforresult(SparkSession spark) {
|
||||
String query;
|
||||
query =
|
||||
"SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb "
|
||||
+ "FROM ( SELECT id, instance "
|
||||
+ "FROM result "
|
||||
+ " WHERE datainfo.deletedbyinference = false) ds "
|
||||
+ "LATERAL VIEW EXPLODE(instance) i AS inst";
|
||||
org.apache.spark.sql.Dataset<Row> cfhb = spark.sql(query);
|
||||
cfhb.createOrReplaceTempView("cfhb");
|
||||
}
|
||||
|
||||
public static <R extends Result> org.apache.spark.sql.Dataset<R> readPathEntity(
|
||||
SparkSession spark, String inputEntityPath, Class<R> resultClazz) {
|
||||
|
||||
return spark.read()
|
||||
.textFile(inputEntityPath)
|
||||
.map(
|
||||
(MapFunction<String, R>)
|
||||
value -> OBJECT_MAPPER.readValue(value, resultClazz),
|
||||
Encoders.bean(resultClazz));
|
||||
}
|
||||
|
||||
public static org.apache.spark.sql.Dataset<Relation> readRelations(
|
||||
SparkSession spark, String inputPath) {
|
||||
return spark.read()
|
||||
.textFile(inputPath)
|
||||
.map(
|
||||
(MapFunction<String, Relation>)
|
||||
value -> OBJECT_MAPPER.readValue(value, Relation.class),
|
||||
Encoders.bean(Relation.class));
|
||||
}
|
||||
|
||||
public static org.apache.spark.sql.Dataset<ResultCommunityList> readResultCommunityList(
|
||||
SparkSession spark, String possibleUpdatesPath) {
|
||||
return spark.read()
|
||||
.textFile(possibleUpdatesPath)
|
||||
.map(
|
||||
value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class),
|
||||
Encoders.bean(ResultCommunityList.class));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,21 +3,17 @@ package eu.dnetlib.dhp;
|
|||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public class QueryInformationSystem {
|
||||
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
|
||||
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']" +
|
||||
" and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'" +
|
||||
" return $x//CONFIGURATION/context/@id/string()";
|
||||
private static final String XQUERY =
|
||||
"for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')"
|
||||
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']"
|
||||
+ " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'"
|
||||
+ " return $x//CONFIGURATION/context/@id/string()";
|
||||
|
||||
public static List<String> getCommunityList(final String isLookupUrl) throws ISLookUpException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
return isLookUp.quickSearchProfile(XQUERY);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
package eu.dnetlib.dhp;
|
||||
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
|
||||
public class TypedRow implements Serializable {
|
||||
private String sourceId;
|
||||
private String targetId;
|
||||
private String type;
|
||||
private String value;
|
||||
private Set<String> accumulator;
|
||||
private List<Author> authors;
|
||||
|
||||
public List<Author> getAuthors() {
|
||||
return authors;
|
||||
}
|
||||
|
||||
public TypedRow setAuthors(List<Author> authors) {
|
||||
this.authors = authors;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void addAuthor(Author a){
|
||||
if(authors == null){
|
||||
authors = new ArrayList<>();
|
||||
}
|
||||
authors.add(a);
|
||||
}
|
||||
|
||||
public Set<String> getAccumulator() {
|
||||
return accumulator;
|
||||
}
|
||||
|
||||
public TypedRow setAccumulator(Set<String> accumulator) {
|
||||
this.accumulator = accumulator;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public void addAll(Collection<String> toadd){
|
||||
if(accumulator == null){
|
||||
accumulator = new HashSet<>();
|
||||
}
|
||||
this.accumulator.addAll(toadd);
|
||||
}
|
||||
|
||||
|
||||
public void add(String a){
|
||||
if (accumulator == null){
|
||||
accumulator = new HashSet<>();
|
||||
}
|
||||
accumulator.add(a);
|
||||
}
|
||||
|
||||
public Iterator<String> getAccumulatorIterator(){
|
||||
return accumulator.iterator();
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public TypedRow setValue(String value) {
|
||||
this.value = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getSourceId() {
|
||||
return sourceId;
|
||||
}
|
||||
public TypedRow setSourceId(String sourceId) {
|
||||
this.sourceId = sourceId;
|
||||
return this;
|
||||
}
|
||||
public String getTargetId() {
|
||||
return targetId;
|
||||
}
|
||||
public TypedRow setTargetId(String targetId) {
|
||||
this.targetId = targetId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
public TypedRow setType(String type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue