diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index fa0ab94ee..1a0d98182 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -1,88 +1,95 @@ package eu.dnetlib.dhp; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; - import java.io.IOException; import java.util.*; -import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; public class PropagationConstant { public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional"; - public final static String PROPAGATION_DATA_INFO_TYPE = "propagation"; + public static final String PROPAGATION_DATA_INFO_TYPE = "propagation"; public static final String TRUE = "true"; + public static final String DNET_COUNTRY_SCHEMA = "dnet:countries"; + public static final String DNET_SCHEMA_NAME = "dnet:provenanceActions"; + public static final String DNET_SCHEMA_ID = "dnet:provenanceActions"; - public final static String DNET_COUNTRY_SCHEMA = "dnet:countries"; - public final static String DNET_SCHEMA_NAME = "dnet:provenanceActions"; - public final static String DNET_SCHEMA_ID = "dnet:provenanceActions"; + public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos"; + public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = + "Propagation of country to result collected from datasources of type institutional repositories"; - public final static String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos"; - public final static String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = "Propagation of country to result collected from datasources of type institutional repositories"; + public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID = + "result:organization:instrepo"; + public static final String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = + "Propagation of affiliation to result collected from datasources of type institutional repository"; - public final static String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID = "result:organization:instrepo"; - public final static String PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME = "Propagation of affiliation to result collected from datasources of type institutional repository"; + public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = + "result:project:semrel"; + public static final String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = + "Propagation of result to project through semantic relation"; - public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID = "result:project:semrel"; - public final static String PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME = "Propagation of result to project through semantic relation"; + public static final String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID = + "result:community:semrel"; + public static final String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME = + " Propagation of result belonging to community through semantic relation"; - public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID = "result:community:semrel"; - public final static String PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME = " Propagation of result belonging to community through semantic relation"; + public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = + "result:community:organization"; + public static final String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = + " Propagation of result belonging to community through organization"; - public final static String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID = "result:community:organization"; - public final static String PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME = " Propagation of result belonging to community through organization"; + public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = + "authorpid:result"; + public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = + "Propagation of authors pid to result through semantic relations"; - public final static String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "propagation:orcid:result"; - public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of ORCID through result linked by isSupplementedBy or isSupplementTo semantic relations"; + public static final String RELATION_DATASOURCE_ORGANIZATION_REL_CLASS = "provides"; - public final static String RELATION_DATASOURCEORGANIZATION_REL_TYPE = "datasourceOrganization"; - public final static String RELATION_DATASOURCEORGANIZATION_SUBREL_TYPE = "provision"; - public final static String RELATION_ORGANIZATION_DATASOURCE_REL_CLASS = "isProvidedBy"; - public final static String RELATION_DATASOURCE_ORGANIZATION_REL_CLASS = "provides"; - - public final static String RELATION_RESULTORGANIZATION_REL_TYPE = "resultOrganization"; - public final static String RELATION_RESULTORGANIZATION_SUBREL_TYPE = "affiliation"; - public final static String RELATION_ORGANIZATION_RESULT_REL_CLASS = "isAuthorInstitutionOf"; - public final static String RELATION_RESULT_ORGANIZATION_REL_CLASS = "hasAuthorInstitution"; + public static final String RELATION_RESULTORGANIZATION_REL_TYPE = "resultOrganization"; + public static final String RELATION_RESULTORGANIZATION_SUBREL_TYPE = "affiliation"; + public static final String RELATION_ORGANIZATION_RESULT_REL_CLASS = "isAuthorInstitutionOf"; + public static final String RELATION_RESULT_ORGANIZATION_REL_CLASS = "hasAuthorInstitution"; public static final String RELATION_RESULTRESULT_REL_TYPE = "resultResult"; - public static final String RELATION_RESULTRESULT_SUBREL_TYPE = "supplement"; public static final String RELATION_RESULTPROJECT_REL_TYPE = "resultProject"; public static final String RELATION_RESULTPROJECT_SUBREL_TYPE = "outcome"; public static final String RELATION_RESULT_PROJECT_REL_CLASS = "isProducedBy"; public static final String RELATION_PROJECT_RESULT_REL_CLASS = "produces"; - - public static final String RELATION_RESULT_REPRESENTATIVERESULT_REL_CLASS = "isMergedIn"; public static final String RELATION_REPRESENTATIVERESULT_RESULT_CLASS = "merges"; - - public static final String RELATION_ORGANIZATIONORGANIZATION_REL_TYPE = "organizationOrganization"; - - public static final String RELATION_DEDUPORGANIZATION_SUBREL_TYPE = "dedup"; - public static final String PROPAGATION_AUTHOR_PID = "ORCID"; - public static Country getCountry(String country){ + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static Country getCountry(String classid, String classname) { Country nc = new Country(); - nc.setClassid(country); - nc.setClassname(country); + nc.setClassid(classid); + nc.setClassname(classname); nc.setSchemename(DNET_COUNTRY_SCHEMA); nc.setSchemeid(DNET_COUNTRY_SCHEMA); - nc.setDataInfo(getDataInfo(PROPAGATION_DATA_INFO_TYPE, PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME)); + nc.setDataInfo( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_COUNTRY_INSTREPO_CLASS_ID, + PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME)); return nc; } - public static DataInfo getDataInfo(String inference_provenance, String inference_class_id, String inference_class_name){ + public static DataInfo getDataInfo( + String inference_provenance, String inference_class_id, String inference_class_name) { DataInfo di = new DataInfo(); di.setInferred(true); di.setDeletedbyinference(false); @@ -101,8 +108,15 @@ public class PropagationConstant { return pa; } - - public static Relation getRelation(String source, String target, String rel_class, String rel_type, String subrel_type, String inference_provenance, String inference_class_id, String inference_class_name){ + public static Relation getRelation( + String source, + String target, + String rel_class, + String rel_type, + String subrel_type, + String inference_provenance, + String inference_class_id, + String inference_class_name) { Relation r = new Relation(); r.setSource(source); r.setTarget(target); @@ -111,107 +125,17 @@ public class PropagationConstant { r.setSubRelType(subrel_type); r.setDataInfo(getDataInfo(inference_provenance, inference_class_id, inference_class_name)); return r; -} - - public static PairFunction toPair() { - return e -> new Tuple2<>( e.getSourceId(), e); - } - public static JavaPairRDD getResultResultSemRel(List allowedsemrel, JavaRDD relations) { - return relations - .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())) - .map(r -> { - TypedRow tr = new TypedRow(); - tr.setSourceId(r.getSource()); - tr.setTargetId(r.getTarget()); - return tr; - }) - .mapToPair(toPair()); - } - - - public static String getConstraintList(String text, List constraints){ + public static String getConstraintList(String text, List constraints) { String ret = " and (" + text + constraints.get(0) + "'"; - for (int i =1; i < constraints.size(); i++){ - ret += " OR " + text + constraints.get(i) + "'"; + for (int i = 1; i < constraints.size(); i++) { + ret += " OR " + text + constraints.get(i) + "'"; } ret += ")"; return ret; } - - public static List getTypedRowsDatasourceResult(OafEntity oaf) { - List lst = new ArrayList<>(); - Set datasources_provenance = new HashSet<>(); - List instanceList = null; - String type = ""; - if (oaf.getClass() == Publication.class) { - instanceList = ((Publication) oaf).getInstance(); - type = "publication"; - } - if (oaf.getClass() == Dataset.class){ - instanceList = ((Dataset)oaf).getInstance(); - type = "dataset"; - } - - if (oaf.getClass() == Software.class){ - instanceList = ((Software)oaf).getInstance(); - type = "software"; - } - - if (oaf.getClass() == OtherResearchProduct.class){ - instanceList = ((OtherResearchProduct)oaf).getInstance(); - type = "otherresearchproduct"; - } - - - for (Instance i : instanceList) { - datasources_provenance.add(i.getCollectedfrom().getKey()); - datasources_provenance.add(i.getHostedby().getKey()); - } - for (String dsId : datasources_provenance) { - TypedRow tr = new TypedRow(); - tr.setSourceId(dsId); - tr.setTargetId(oaf.getId()); - tr.setType(type); - lst.add(tr); - } - return lst; - } - - public static void updateResultForCommunity(JavaPairRDD results, JavaPairRDD toupdateresult, String outputPath, String type, String class_id, String class_name) { - results.leftOuterJoin(toupdateresult) - .map(p -> { - Result r = p._2()._1(); - if (p._2()._2().isPresent()){ - Set communityList = p._2()._2().get().getAccumulator(); - for(Context c: r.getContext()){ - if (communityList.contains(c.getId())){ - //verify if the datainfo for this context contains propagation - if (!c.getDataInfo().stream().map(di -> di.getInferenceprovenance()).collect(Collectors.toSet()).contains(PROPAGATION_DATA_INFO_TYPE)){ - c.getDataInfo().add(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name)); - //community id already in the context of the result. Remove it from the set that has to be added - communityList.remove(c.getId()); - } - } - } - List cc = r.getContext(); - for(String cId: communityList){ - Context context = new Context(); - context.setId(cId); - context.setDataInfo(Arrays.asList(getDataInfo(PROPAGATION_DATA_INFO_TYPE, class_id, class_name))); - cc.add(context); - } - r.setContext(cc); - } - return r; - }) - .map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath+"/"+type); - } - public static void createOutputDirs(String outputPath, FileSystem fs) throws IOException { if (fs.exists(new Path(outputPath))) { fs.delete(new Path(outputPath), true); @@ -219,4 +143,61 @@ public class PropagationConstant { fs.mkdirs(new Path(outputPath)); } + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) { + return Optional.ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + } + + public static Boolean isTest(ArgumentApplicationParser parser) { + return Optional.ofNullable(parser.get("isTest")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); + } + + public static void createCfHbforresult(SparkSession spark) { + String query; + query = + "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb " + + "FROM ( SELECT id, instance " + + "FROM result " + + " WHERE datainfo.deletedbyinference = false) ds " + + "LATERAL VIEW EXPLODE(instance) i AS inst"; + org.apache.spark.sql.Dataset cfhb = spark.sql(query); + cfhb.createOrReplaceTempView("cfhb"); + } + + public static org.apache.spark.sql.Dataset readPathEntity( + SparkSession spark, String inputEntityPath, Class resultClazz) { + + return spark.read() + .textFile(inputEntityPath) + .map( + (MapFunction) + value -> OBJECT_MAPPER.readValue(value, resultClazz), + Encoders.bean(resultClazz)); + } + + public static org.apache.spark.sql.Dataset readRelations( + SparkSession spark, String inputPath) { + return spark.read() + .textFile(inputPath) + .map( + (MapFunction) + value -> OBJECT_MAPPER.readValue(value, Relation.class), + Encoders.bean(Relation.class)); + } + + public static org.apache.spark.sql.Dataset readResultCommunityList( + SparkSession spark, String possibleUpdatesPath) { + return spark.read() + .textFile(possibleUpdatesPath) + .map( + value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class), + Encoders.bean(ResultCommunityList.class)); + } } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java index b1ec7726e..a33919d19 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/QueryInformationSystem.java @@ -3,21 +3,17 @@ package eu.dnetlib.dhp; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; - import java.util.List; - public class QueryInformationSystem { - private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" + - " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']" + - " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'" + - " return $x//CONFIGURATION/context/@id/string()"; + private static final String XQUERY = + "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" + + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']" + + " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'" + + " return $x//CONFIGURATION/context/@id/string()"; public static List getCommunityList(final String isLookupUrl) throws ISLookUpException { ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); return isLookUp.quickSearchProfile(XQUERY); - } - - } diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java deleted file mode 100644 index 56d519509..000000000 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/TypedRow.java +++ /dev/null @@ -1,94 +0,0 @@ -package eu.dnetlib.dhp; - - -import eu.dnetlib.dhp.schema.oaf.Author; - -import java.io.Serializable; -import java.util.*; - -public class TypedRow implements Serializable { - private String sourceId; - private String targetId; - private String type; - private String value; - private Set accumulator; - private List authors; - - public List getAuthors() { - return authors; - } - - public TypedRow setAuthors(List authors) { - this.authors = authors; - return this; - } - - public void addAuthor(Author a){ - if(authors == null){ - authors = new ArrayList<>(); - } - authors.add(a); - } - - public Set getAccumulator() { - return accumulator; - } - - public TypedRow setAccumulator(Set accumulator) { - this.accumulator = accumulator; - return this; - } - - - public void addAll(Collection toadd){ - if(accumulator == null){ - accumulator = new HashSet<>(); - } - this.accumulator.addAll(toadd); - } - - - public void add(String a){ - if (accumulator == null){ - accumulator = new HashSet<>(); - } - accumulator.add(a); - } - - public Iterator getAccumulatorIterator(){ - return accumulator.iterator(); - } - - public String getValue() { - return value; - } - - public TypedRow setValue(String value) { - this.value = value; - return this; - } - - public String getSourceId() { - return sourceId; - } - public TypedRow setSourceId(String sourceId) { - this.sourceId = sourceId; - return this; - } - public String getTargetId() { - return targetId; - } - public TypedRow setTargetId(String targetId) { - this.targetId = targetId; - return this; - } - - public String getType() { - return type; - } - public TypedRow setType(String type) { - this.type = type; - return this; - } - -}