diff --git a/dhp-common/src/main/java/eu/dnetlib/scholexplorer/relation/RelInfo.java b/dhp-common/src/main/java/eu/dnetlib/scholexplorer/relation/RelInfo.java new file mode 100644 index 0000000000..ff88cda4c1 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/scholexplorer/relation/RelInfo.java @@ -0,0 +1,24 @@ +package eu.dnetlib.scholexplorer.relation; + +import java.io.Serializable; + +public class RelInfo implements Serializable { + private String original; + private String inverse; + + public String getOriginal() { + return original; + } + + public void setOriginal(String original) { + this.original = original; + } + + public String getInverse() { + return inverse; + } + + public void setInverse(String inverse) { + this.inverse = inverse; + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/scholexplorer/relation/RelationMapper.java b/dhp-common/src/main/java/eu/dnetlib/scholexplorer/relation/RelationMapper.java new file mode 100644 index 0000000000..647c117896 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/scholexplorer/relation/RelationMapper.java @@ -0,0 +1,19 @@ +package eu.dnetlib.scholexplorer.relation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; + +import java.io.Serializable; +import java.util.HashMap; + +public class RelationMapper extends HashMap implements Serializable { + + public static RelationMapper load() throws Exception { + + final String json = IOUtils.toString(RelationMapper.class.getResourceAsStream("relations.json")); + + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, RelationMapper.class); + } + +} diff --git a/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json b/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json new file mode 100644 index 0000000000..98e8daa18c --- /dev/null +++ b/dhp-common/src/main/resources/eu/dnetlib/scholexplorer/relation/relations.json @@ -0,0 +1,158 @@ +{ + "cites":{ + "original":"Cites", + "inverse":"IsCitedBy" + }, + "compiles":{ + "original":"Compiles", + "inverse":"IsCompiledBy" + }, + "continues":{ + "original":"Continues", + "inverse":"IsContinuedBy" + }, + "derives":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "describes":{ + "original":"Describes", + "inverse":"IsDescribedBy" + }, + "documents":{ + "original":"Documents", + "inverse":"IsDocumentedBy" + }, + "hasmetadata":{ + "original":"HasMetadata", + "inverse":"IsMetadataOf" + }, + "hasassociationwith":{ + "original":"HasAssociationWith", + "inverse":"HasAssociationWith" + }, + "haspart":{ + "original":"HasPart", + "inverse":"IsPartOf" + }, + "hasversion":{ + "original":"HasVersion", + "inverse":"IsVersionOf" + }, + "iscitedby":{ + "original":"IsCitedBy", + "inverse":"Cites" + }, + "iscompiledby":{ + "original":"IsCompiledBy", + "inverse":"Compiles" + }, + "iscontinuedby":{ + "original":"IsContinuedBy", + "inverse":"Continues" + }, + "isderivedfrom":{ + "original":"IsDerivedFrom", + "inverse":"IsSourceOf" + }, + "isdescribedby":{ + "original":"IsDescribedBy", + "inverse":"Describes" + }, + "isdocumentedby":{ + "original":"IsDocumentedBy", + "inverse":"Documents" + }, + "isidenticalto":{ + "original":"IsIdenticalTo", + "inverse":"IsIdenticalTo" + }, + "ismetadatafor":{ + "original":"IsMetadataFor", + "inverse":"IsMetadataOf" + }, + "ismetadataof":{ + "original":"IsMetadataOf", + "inverse":"IsMetadataFor" + }, + "isnewversionof":{ + "original":"IsNewVersionOf", + "inverse":"IsPreviousVersionOf" + }, + "isobsoletedby":{ + "original":"IsObsoletedBy", + "inverse":"Obsoletes" + }, + "isoriginalformof":{ + "original":"IsOriginalFormOf", + "inverse":"IsVariantFormOf" + }, + "ispartof":{ + "original":"IsPartOf", + "inverse":"HasPart" + }, + "ispreviousversionof":{ + "original":"IsPreviousVersionOf", + "inverse":"IsNewVersionOf" + }, + "isreferencedby":{ + "original":"IsReferencedBy", + "inverse":"References" + }, + "isrelatedto":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "isrequiredby":{ + "original":"IsRequiredBy", + "inverse":"Requires" + }, + "isreviewedby":{ + "original":"IsReviewedBy", + "inverse":"Reviews" + }, + "issourceof":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "issupplementedby":{ + "original":"IsSupplementedBy", + "inverse":"IsSupplementTo" + }, + "issupplementto":{ + "original":"IsSupplementTo", + "inverse":"IsSupplementedBy" + }, + "isvariantformof":{ + "original":"IsVariantFormOf", + "inverse":"IsOriginalFormOf" + }, + "isversionof":{ + "original":"IsVersionOf", + "inverse":"HasVersion" + }, + "obsoletes":{ + "original":"Obsoletes", + "inverse":"IsObsoletedBy" + }, + "references":{ + "original":"References", + "inverse":"IsReferencedBy" + }, + "requires":{ + "original":"Requires", + "inverse":"IsRequiredBy" + }, + "related":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "reviews":{ + "original":"Reviews", + "inverse":"IsReviewedBy" + }, + "unknown":{ + "original":"Unknown", + "inverse":"Unknown" + } +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/scholexplorer/relation/RelationMapperTest.java b/dhp-common/src/test/java/eu/dnetlib/scholexplorer/relation/RelationMapperTest.java new file mode 100644 index 0000000000..db6f4429a2 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/scholexplorer/relation/RelationMapperTest.java @@ -0,0 +1,15 @@ +package eu.dnetlib.scholexplorer.relation; + +import org.apache.commons.io.IOUtils; +import org.junit.Test; + +public class RelationMapperTest { + + @Test + public void testLoadRels() throws Exception{ + + RelationMapper relationMapper = RelationMapper.load(); + relationMapper.keySet().forEach(System.out::println); + + } +} diff --git a/dhp-common/src/test/resources/eu/dnetlib/scholexplorer/relation/relations.json b/dhp-common/src/test/resources/eu/dnetlib/scholexplorer/relation/relations.json new file mode 100644 index 0000000000..98e8daa18c --- /dev/null +++ b/dhp-common/src/test/resources/eu/dnetlib/scholexplorer/relation/relations.json @@ -0,0 +1,158 @@ +{ + "cites":{ + "original":"Cites", + "inverse":"IsCitedBy" + }, + "compiles":{ + "original":"Compiles", + "inverse":"IsCompiledBy" + }, + "continues":{ + "original":"Continues", + "inverse":"IsContinuedBy" + }, + "derives":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "describes":{ + "original":"Describes", + "inverse":"IsDescribedBy" + }, + "documents":{ + "original":"Documents", + "inverse":"IsDocumentedBy" + }, + "hasmetadata":{ + "original":"HasMetadata", + "inverse":"IsMetadataOf" + }, + "hasassociationwith":{ + "original":"HasAssociationWith", + "inverse":"HasAssociationWith" + }, + "haspart":{ + "original":"HasPart", + "inverse":"IsPartOf" + }, + "hasversion":{ + "original":"HasVersion", + "inverse":"IsVersionOf" + }, + "iscitedby":{ + "original":"IsCitedBy", + "inverse":"Cites" + }, + "iscompiledby":{ + "original":"IsCompiledBy", + "inverse":"Compiles" + }, + "iscontinuedby":{ + "original":"IsContinuedBy", + "inverse":"Continues" + }, + "isderivedfrom":{ + "original":"IsDerivedFrom", + "inverse":"IsSourceOf" + }, + "isdescribedby":{ + "original":"IsDescribedBy", + "inverse":"Describes" + }, + "isdocumentedby":{ + "original":"IsDocumentedBy", + "inverse":"Documents" + }, + "isidenticalto":{ + "original":"IsIdenticalTo", + "inverse":"IsIdenticalTo" + }, + "ismetadatafor":{ + "original":"IsMetadataFor", + "inverse":"IsMetadataOf" + }, + "ismetadataof":{ + "original":"IsMetadataOf", + "inverse":"IsMetadataFor" + }, + "isnewversionof":{ + "original":"IsNewVersionOf", + "inverse":"IsPreviousVersionOf" + }, + "isobsoletedby":{ + "original":"IsObsoletedBy", + "inverse":"Obsoletes" + }, + "isoriginalformof":{ + "original":"IsOriginalFormOf", + "inverse":"IsVariantFormOf" + }, + "ispartof":{ + "original":"IsPartOf", + "inverse":"HasPart" + }, + "ispreviousversionof":{ + "original":"IsPreviousVersionOf", + "inverse":"IsNewVersionOf" + }, + "isreferencedby":{ + "original":"IsReferencedBy", + "inverse":"References" + }, + "isrelatedto":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "isrequiredby":{ + "original":"IsRequiredBy", + "inverse":"Requires" + }, + "isreviewedby":{ + "original":"IsReviewedBy", + "inverse":"Reviews" + }, + "issourceof":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "issupplementedby":{ + "original":"IsSupplementedBy", + "inverse":"IsSupplementTo" + }, + "issupplementto":{ + "original":"IsSupplementTo", + "inverse":"IsSupplementedBy" + }, + "isvariantformof":{ + "original":"IsVariantFormOf", + "inverse":"IsOriginalFormOf" + }, + "isversionof":{ + "original":"IsVersionOf", + "inverse":"HasVersion" + }, + "obsoletes":{ + "original":"Obsoletes", + "inverse":"IsObsoletedBy" + }, + "references":{ + "original":"References", + "inverse":"IsReferencedBy" + }, + "requires":{ + "original":"Requires", + "inverse":"IsRequiredBy" + }, + "related":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "reviews":{ + "original":"Reviews", + "inverse":"IsReviewedBy" + }, + "unknown":{ + "original":"Unknown", + "inverse":"Unknown" + } +} \ No newline at end of file diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java index 5cf0883beb..03122983dc 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.schema.oaf; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; public class Relation extends Oaf { @@ -63,4 +64,22 @@ public class Relation extends Oaf { public void setCollectedFrom(List collectedFrom) { this.collectedFrom = collectedFrom; } + + public void mergeFrom(Relation other) { + this.mergeOAFDataInfo(other); + if (other.getCollectedFrom() == null || other.getCollectedFrom().size() == 0) + return; + if (collectedFrom == null && other.getCollectedFrom() != null) { + collectedFrom = other.getCollectedFrom(); + return; + } + if (other.getCollectedFrom() != null) { + collectedFrom.addAll(other.getCollectedFrom()); + + collectedFrom = new ArrayList<>(collectedFrom + .stream() + .collect(Collectors.toMap(KeyValue::toComparableString, x -> x, (x1, x2) -> x1)) + .values()); + } + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java index df124395fc..10aafaa4c3 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIDataset.java @@ -11,6 +11,8 @@ import java.util.Map; public class DLIDataset extends Dataset { + private String originalObjIdentifier; + private List dlicollectedfrom; private String completionStatus; @@ -31,6 +33,14 @@ public class DLIDataset extends Dataset { this.dlicollectedfrom = dlicollectedfrom; } + public String getOriginalObjIdentifier() { + return originalObjIdentifier; + } + + public void setOriginalObjIdentifier(String originalObjIdentifier) { + this.originalObjIdentifier = originalObjIdentifier; + } + @Override public void mergeFrom(OafEntity e) { super.mergeFrom(e); diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java index f0b5d0bd62..ebd56eaa91 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIPublication.java @@ -7,6 +7,9 @@ import java.io.Serializable; import java.util.*; public class DLIPublication extends Publication implements Serializable { + + private String originalObjIdentifier; + private List dlicollectedfrom; private String completionStatus; @@ -27,6 +30,14 @@ public class DLIPublication extends Publication implements Serializable { this.dlicollectedfrom = dlicollectedfrom; } + public String getOriginalObjIdentifier() { + return originalObjIdentifier; + } + + public void setOriginalObjIdentifier(String originalObjIdentifier) { + this.originalObjIdentifier = originalObjIdentifier; + } + @Override public void mergeFrom(OafEntity e) { super.mergeFrom(e); diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java index 52c9983f0c..9f48ce521d 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java @@ -13,11 +13,9 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import scala.Tuple2; import java.io.IOException; @@ -45,42 +43,31 @@ public class SparkPropagateRelationsJob { final String targetRelPath = parser.get("targetRelPath"); - final Dataset df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); + final Dataset merge = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)).where("relClass == 'merges'"); + + final Dataset rels= spark.read().load(relationPath).as(Encoders.bean(Relation.class)); + final Dataset firstJoin = rels.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer") + .map((MapFunction, Relation>) r -> { + final Relation mergeRelation = r._2(); + final Relation relation = r._1(); - final JavaPairRDD mergedIds = df - .where("relClass == 'merges'") - .select(df.col("source"),df.col("target")) - .distinct() - .toJavaRDD() - .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(1), r.getString(0))); + if(mergeRelation!= null) + relation.setSource(mergeRelation.getSource()); + return relation; + }, Encoders.bean(Relation.class)); + final Dataset secondJoin = firstJoin.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer") + .map((MapFunction, Relation>) r -> { + final Relation mergeRelation = r._2(); + final Relation relation = r._1(); + if (mergeRelation != null ) + relation.setTarget(mergeRelation.getSource()); + return relation; + }, Encoders.bean(Relation.class)); - final JavaRDD sourceEntity = sc.textFile(relationPath); - JavaRDD newRels = sourceEntity.mapToPair( - (PairFunction) s -> - new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s)) - .leftOuterJoin(mergedIds) - .map((Function>>, String>) v1 -> { - if (v1._2()._2().isPresent()) { - return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE); - } - return v1._2()._1(); - }) - .mapToPair( - (PairFunction) s -> - new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s)) - .leftOuterJoin(mergedIds) - .map((Function>>, String>) v1 -> { - if (v1._2()._2().isPresent()) { - return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET); - } - return v1._2()._1(); - }).filter(SparkPropagateRelationsJob::containsDedup) - .repartition(500); - - newRels.union(sourceEntity).repartition(1000).saveAsTextFile(targetRelPath, GzipCodec.class); + secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath); } private static boolean containsDedup(final String json) { diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java index 1381633e54..3ea7982d10 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java @@ -15,11 +15,9 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import scala.Tuple2; import java.io.IOException; @@ -55,18 +53,7 @@ public class SparkUpdateEntityJob { .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); final JavaRDD sourceEntity = sc.textFile(entityPath); - if ("relation".equalsIgnoreCase(entity)) { - sourceEntity.mapToPair( - (PairFunction) s -> - new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s)) - .leftOuterJoin(mergedIds) - .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1()) - .mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s)) - .leftOuterJoin(mergedIds) - .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1()) - .saveAsTextFile(destination, GzipCodec.class); - } else { - final JavaRDD dedupEntity = sc.textFile(dedupRecordPath); + final JavaRDD dedupEntity = sc.textFile(dedupRecordPath); JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, s), s)); Class mainClass; switch (entity) { @@ -83,19 +70,12 @@ public class SparkUpdateEntityJob { throw new IllegalArgumentException("Illegal type " + entity); } - JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1()); - - map.union(dedupEntity).saveAsTextFile(destination, GzipCodec.class); - } - } - private static String updateDeletedByInference(final String json, final Class clazz) { - final ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); try { diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml index 995ef076a2..ddbf39e5fe 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml @@ -26,7 +26,7 @@ - + @@ -132,7 +132,7 @@ -mtyarn-cluster --mergeRelPath${targetPath}/${entity}/mergeRel --relationPath${sourcePath}/relation - --targetRelPath${targetPath}/${entity}/relation_propagated + --targetRelPath${targetPath}/${entity}/updated_relation @@ -160,35 +160,35 @@ --dedupRecordPath${targetPath}/${entity}/dedup_records --targetPath${targetPath}/${entity}/updated_record - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - Update ${entity} set deleted by Inference - eu.dnetlib.dedup.SparkUpdateEntityJob - dhp-dedup-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - -mtyarn-cluster - --entityPath${targetPath}/${entity}/relation_propagated - --mergeRelPath${targetPath}/${entity}/mergeRel - --entityrelation - --dedupRecordPath${targetPath}/${entity}/dedup_records - --targetPath${targetPath}/${entity}/updated_relation - + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java index fb1be554ba..a7b7cb8c84 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java @@ -21,15 +21,19 @@ public class SparkCreateDedupTest { } + + + @Test - @Ignore - public void createSimRelsTest() throws Exception { - SparkCreateSimRels.main(new String[] { + public void PropagateRelationsTest() throws Exception { + SparkPropagateRelationsJob.main(new String[] { "-mt", "local[*]", - "-s", "/Users/miconis/dumps", - "-e", entity, - "-c", ArgumentApplicationParser.compressArgument(configuration), - "-t", "/tmp/dedup", + + + "-ep", "/Users/sandro/Downloads/scholix/graph/relation", + "-mr", "/Users/sandro/Downloads/scholix/dedupGraphWD/publication/mergeRel", + "-mt", "local[*]", + "-t", "/Users/sandro/Downloads/scholix/dedupGraphWD/publication/rel_fixed", }); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGenerateSimRel.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGenerateSimRel.java new file mode 100644 index 0000000000..33bcb1e5dd --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGenerateSimRel.java @@ -0,0 +1,56 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.graph.SparkGraphImporterJob; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +public class SparkScholexplorerGenerateSimRel { + + final static String IDJSONPATH = "$.id"; + final static String OBJIDPATH = "$.originalObjIdentifier"; + + + + public static void generateDataFrame(final SparkSession spark, final JavaSparkContext sc, final String inputPath, final String targetPath) { + + + final JavaPairRDD datasetSimRel = sc.textFile(inputPath+"/dataset/*") + .mapToPair((PairFunction) k -> + new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, k),DHPUtils.getJPathString(OBJIDPATH, k))) + .filter(t -> + !StringUtils.substringAfter(t._1(), "|") + .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) + .distinct(); + + final JavaPairRDD publicationSimRel = sc.textFile(inputPath+"/publication/*") + .mapToPair((PairFunction) k -> + new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, k),DHPUtils.getJPathString(OBJIDPATH, k))) + .filter(t -> + !StringUtils.substringAfter(t._1(), "|") + .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) + .distinct(); + + JavaRDD simRel = datasetSimRel.union(publicationSimRel).map(s -> { + final Relation r = new Relation(); + r.setSource(s._1()); + r.setTarget(s._2()); + r.setRelType("similar"); + return r; + } + ); + spark.createDataset(simRel.rdd(), Encoders.bean(Relation.class)).distinct().write() + .mode(SaveMode.Overwrite).save(targetPath+"/pid_simRel"); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java index 33c2696223..d6023435cb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerGraphImporter.java @@ -6,6 +6,7 @@ import eu.dnetlib.dhp.graph.SparkGraphImporterJob; import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser; import eu.dnetlib.dhp.graph.scholexplorer.parser.PublicationScholexplorerParser; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -29,15 +30,17 @@ public class SparkScholexplorerGraphImporter { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); + RelationMapper relationMapper = RelationMapper.load(); + sc.sequenceFile(inputPath, IntWritable.class, Text.class).map(Tuple2::_2).map(Text::toString).repartition(500) .flatMap((FlatMapFunction) record -> { switch (parser.get("entity")) { case "dataset": final DatasetScholexplorerParser d = new DatasetScholexplorerParser(); - return d.parseObject(record).iterator(); + return d.parseObject(record,relationMapper).iterator(); case "publication": final PublicationScholexplorerParser p = new PublicationScholexplorerParser(); - return p.parseObject(record).iterator(); + return p.parseObject(record,relationMapper).iterator(); default: throw new IllegalArgumentException("wrong values of entities"); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java index 54496671f1..d3c257fc68 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java @@ -12,16 +12,23 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.utils.DHPUtils; import net.minidev.json.JSONArray; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.*; import scala.Tuple2; +import scala.collection.JavaConverters; +import sun.rmi.log.ReliableLog; +import javax.xml.crypto.Data; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -41,6 +48,8 @@ public class SparkScholexplorerMergeEntitiesJob { parser.parseArgument(args); final SparkSession spark = SparkSession .builder() + .config(new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")) .appName(SparkGraphImporterJob.class.getSimpleName()) .master(parser.get("master")) .getOrCreate(); @@ -102,21 +111,54 @@ public class SparkScholexplorerMergeEntitiesJob { }).saveAsTextFile(targetPath, GzipCodec.class); break; case "relation": - union.mapToPair((PairFunction) f -> { + + SparkScholexplorerGenerateSimRel.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") ); + RDD rdd = union.mapToPair((PairFunction) f -> { final String source = getJPathString(SOURCEJSONPATH, f); final String target = getJPathString(TARGETJSONPATH, f); final String reltype = getJPathString(RELJSONPATH, f); ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source, reltype, target)), mapper.readValue(f, Relation.class)); + return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, Relation.class)); }).reduceByKey((a, b) -> { - a.mergeOAFDataInfo(b); + a.mergeFrom(b); return a; - }).map(item -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(item._2()); - }).saveAsTextFile(targetPath, GzipCodec.class); - break; + }).map(Tuple2::_2).rdd(); + + spark.createDataset(rdd, Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).save(targetPath); + Dataset rel_ds =spark.read().load(targetPath).as(Encoders.bean(Relation.class)); + + System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel"); + Datasetsim_ds =spark.read().load(targetPath.replace("/relation","")+"/pid_simRel").as(Encoders.bean(Relation.class)); + + TargetFunction tf = new TargetFunction(); + + Dataset ids = sim_ds.map(tf, Encoders.bean(Relation.class)); + + + final Dataset firstJoin = rel_ds + .joinWith(ids, ids.col("target") + .equalTo(rel_ds.col("source")), "left_outer") + .map((MapFunction, Relation>) s -> + { + if (s._2() != null) { + s._1().setSource(s._2().getSource()); + } + return s._1(); + } + , Encoders.bean(Relation.class)); + + + Dataset secondJoin = firstJoin.joinWith(ids, ids.col("target").equalTo(firstJoin.col("target")),"left_outer") + .map((MapFunction, Relation>) s -> + { + if (s._2() != null) { + s._1().setTarget(s._2().getSource()); + } + return s._1(); + } + , Encoders.bean(Relation.class)); + secondJoin.write().mode(SaveMode.Overwrite).save(targetPath+"_fixed"); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/TargetFunction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/TargetFunction.java new file mode 100644 index 0000000000..31a554a63b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/TargetFunction.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + + +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.MapFunction; + +public class TargetFunction implements MapFunction { + @Override + public Relation call(Relation relation) throws Exception { + final String type = StringUtils.substringBefore(relation.getSource(), "|"); + relation.setTarget(String.format("%s|%s", type, StringUtils.substringAfter(relation.getTarget(),"::"))); + return relation; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java index 5277f794b6..6f3aa68d23 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java @@ -6,6 +6,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -21,7 +22,7 @@ public abstract class AbstractScholexplorerParser { final static Pattern pattern = Pattern.compile("10\\.\\d{4,9}/[-._;()/:A-Z0-9]+$", Pattern.CASE_INSENSITIVE); private List datasetSubTypes = Arrays.asList("dataset", "software", "film", "sound", "physicalobject", "audiovisual", "collection", "other", "study", "metadata"); - public abstract List parseObject(final String record); + public abstract List parseObject(final String record, final RelationMapper relMapper); protected Map getAttributes(final XMLStreamReader parser) { final Map attributesMap = new HashMap<>(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java index 3a671e6a14..21545092b9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java @@ -10,6 +10,8 @@ import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node; +import eu.dnetlib.scholexplorer.relation.RelInfo; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; @@ -21,7 +23,7 @@ import java.util.stream.Collectors; public class DatasetScholexplorerParser extends AbstractScholexplorerParser { @Override - public List parseObject(String record) { + public List parseObject(String record, final RelationMapper relationMapper) { try { final DLIDataset parsedObject = new DLIDataset(); final VTDGen vg = new VTDGen(); @@ -40,7 +42,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']"))); - + parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']")); parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']")); final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']"); @@ -145,9 +147,20 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { final String relatedPid = n.getTextValue(); final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); - final String relationSemantic = n.getAttributes().get("relationType"); - final String inverseRelation = n.getAttributes().get("inverseRelationType"); + String relationSemantic = n.getAttributes().get("relationType"); + String inverseRelation = n.getAttributes().get("inverseRelationType"); final String targetId = generateId(relatedPid, relatedPidType, relatedType); + + if (relationMapper.containsKey(relationSemantic.toLowerCase())) + { + RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); + relationSemantic = relInfo.getOriginal(); + inverseRelation = relInfo.getInverse(); + } + else { + relationSemantic = "Unknown"; + inverseRelation = "Unknown"; + } r.setTarget(targetId); r.setRelType(relationSemantic); r.setRelClass("datacite"); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java index 45ef2066bb..d5cf94a775 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java @@ -8,6 +8,8 @@ import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; +import eu.dnetlib.scholexplorer.relation.RelInfo; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; @@ -19,7 +21,7 @@ import java.util.stream.Collectors; public class PublicationScholexplorerParser extends AbstractScholexplorerParser { @Override - public List parseObject(final String record) { + public List parseObject(final String record, final RelationMapper relationMapper) { try { final List result = new ArrayList<>(); final DLIPublication parsedObject = new DLIPublication(); @@ -63,6 +65,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser final String sourceId = generateId(currentPid.getValue(), currentPid.getQualifier().getClassid(), "publication"); parsedObject.setId(sourceId); + parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']")); + String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); List collectedFromNodes = @@ -125,9 +129,19 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser final String relatedPid = n.getTextValue(); final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown"); - final String relationSemantic = n.getAttributes().get("relationType"); - final String inverseRelation = n.getAttributes().get("inverseRelationType"); + String relationSemantic = n.getAttributes().get("relationType"); + String inverseRelation = "Unknown"; final String targetId = generateId(relatedPid, relatedPidType, relatedType); + + if (relationMapper.containsKey(relationSemantic.toLowerCase())) + { + RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); + relationSemantic = relInfo.getOriginal(); + inverseRelation = relInfo.getInverse(); + } + else { + relationSemantic = "Unknown"; + } r.setTarget(targetId); r.setRelType(relationSemantic); r.setCollectedFrom(parsedObject.getCollectedfrom()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml index d04e76b2a0..44c6004e2d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/generate_sim_rel_scholix_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/generate_sim_rel_scholix_parameters.json new file mode 100644 index 0000000000..34f0d6776a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/generate_sim_rel_scholix_parameters.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/relations.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/relations.json new file mode 100644 index 0000000000..98e8daa18c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/relations.json @@ -0,0 +1,158 @@ +{ + "cites":{ + "original":"Cites", + "inverse":"IsCitedBy" + }, + "compiles":{ + "original":"Compiles", + "inverse":"IsCompiledBy" + }, + "continues":{ + "original":"Continues", + "inverse":"IsContinuedBy" + }, + "derives":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "describes":{ + "original":"Describes", + "inverse":"IsDescribedBy" + }, + "documents":{ + "original":"Documents", + "inverse":"IsDocumentedBy" + }, + "hasmetadata":{ + "original":"HasMetadata", + "inverse":"IsMetadataOf" + }, + "hasassociationwith":{ + "original":"HasAssociationWith", + "inverse":"HasAssociationWith" + }, + "haspart":{ + "original":"HasPart", + "inverse":"IsPartOf" + }, + "hasversion":{ + "original":"HasVersion", + "inverse":"IsVersionOf" + }, + "iscitedby":{ + "original":"IsCitedBy", + "inverse":"Cites" + }, + "iscompiledby":{ + "original":"IsCompiledBy", + "inverse":"Compiles" + }, + "iscontinuedby":{ + "original":"IsContinuedBy", + "inverse":"Continues" + }, + "isderivedfrom":{ + "original":"IsDerivedFrom", + "inverse":"IsSourceOf" + }, + "isdescribedby":{ + "original":"IsDescribedBy", + "inverse":"Describes" + }, + "isdocumentedby":{ + "original":"IsDocumentedBy", + "inverse":"Documents" + }, + "isidenticalto":{ + "original":"IsIdenticalTo", + "inverse":"IsIdenticalTo" + }, + "ismetadatafor":{ + "original":"IsMetadataFor", + "inverse":"IsMetadataOf" + }, + "ismetadataof":{ + "original":"IsMetadataOf", + "inverse":"IsMetadataFor" + }, + "isnewversionof":{ + "original":"IsNewVersionOf", + "inverse":"IsPreviousVersionOf" + }, + "isobsoletedby":{ + "original":"IsObsoletedBy", + "inverse":"Obsoletes" + }, + "isoriginalformof":{ + "original":"IsOriginalFormOf", + "inverse":"IsVariantFormOf" + }, + "ispartof":{ + "original":"IsPartOf", + "inverse":"HasPart" + }, + "ispreviousversionof":{ + "original":"IsPreviousVersionOf", + "inverse":"IsNewVersionOf" + }, + "isreferencedby":{ + "original":"IsReferencedBy", + "inverse":"References" + }, + "isrelatedto":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "isrequiredby":{ + "original":"IsRequiredBy", + "inverse":"Requires" + }, + "isreviewedby":{ + "original":"IsReviewedBy", + "inverse":"Reviews" + }, + "issourceof":{ + "original":"IsSourceOf", + "inverse":"IsDerivedFrom" + }, + "issupplementedby":{ + "original":"IsSupplementedBy", + "inverse":"IsSupplementTo" + }, + "issupplementto":{ + "original":"IsSupplementTo", + "inverse":"IsSupplementedBy" + }, + "isvariantformof":{ + "original":"IsVariantFormOf", + "inverse":"IsOriginalFormOf" + }, + "isversionof":{ + "original":"IsVersionOf", + "inverse":"HasVersion" + }, + "obsoletes":{ + "original":"Obsoletes", + "inverse":"IsObsoletedBy" + }, + "references":{ + "original":"References", + "inverse":"IsReferencedBy" + }, + "requires":{ + "original":"Requires", + "inverse":"IsRequiredBy" + }, + "related":{ + "original":"IsRelatedTo", + "inverse":"IsRelatedTo" + }, + "reviews":{ + "original":"Reviews", + "inverse":"IsReviewedBy" + }, + "unknown":{ + "original":"Unknown", + "inverse":"Unknown" + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java index e87bc89136..ead2ddf226 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; import org.junit.Test; @@ -15,11 +16,11 @@ public class ScholexplorerParserTest { @Test - public void testDataciteParser() throws IOException { + public void testDataciteParser() throws Exception { String xml = IOUtils.toString(this.getClass().getResourceAsStream("dmf.xml")); DatasetScholexplorerParser p = new DatasetScholexplorerParser(); - List oaves = p.parseObject(xml); + List oaves = p.parseObject(xml, RelationMapper.load()); ObjectMapper m = new ObjectMapper(); m.enable(SerializationFeature.INDENT_OUTPUT); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJobTest.java new file mode 100644 index 0000000000..0ab51f6f61 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJobTest.java @@ -0,0 +1,18 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import org.junit.Ignore; +import org.junit.Test; + +public class SparkScholexplorerMergeEntitiesJobTest { + + @Test + @Ignore + public void testMerge() throws Exception { + SparkScholexplorerMergeEntitiesJob.main(new String[]{ + "-mt", "local[*]", + "-e", "relation", + "-s", "file:///Users/sandro/Downloads/scholix/relation", + "-t", "file:///Users/sandro/Downloads/scholix/relation"} + ); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/t.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/t.xml new file mode 100644 index 0000000000..abc5621f84 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/t.xml @@ -0,0 +1,305 @@ + +
+ + + + + +
+ + InfoSpace Deduplication using Spark + InfoSpace Deduplication using Spark + + InfoSpace Deduplication + 35 + + + executeOozieJobICM + /user/sandro.labruzzo/scholix/ + IIS + true + true + true + true + true + dedup-dli-dataset + d1e24272-939d-4216-ad58-22abe90b7fb4_RGVkdXBDb25maWd1cmF0aW9uRFNSZXNvdXJjZXMvRGVkdXBDb25maWd1cmF0aW9uRFNSZXNvdXJjZVR5cGU= + dedup-dli-unknown + + + + import PMF Publications to HDFS DIR + + + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + import PMF Publications to HDFS DIR + + + + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + import PMF Publications to HDFS DIR + + + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + + import PMF Publications to HDFS DIR + + + + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + Run M/R import Job + + + + + + + + + + + + + + + + + + + + import PMF Publications to HDFS DIR + + + + + + + + + + + + + + + + + + + + + + + 29 5 22 ? * * + 10080 + + + wf_20200311_132512_626 + 2020-03-11T13:50:54+00:00 + FAILURE + eu.dnetlib.rmi.data.hadoop.HadoopServiceException: hadoop job: 0004121-190920055838013-oozie-oozi-W failed with status: KILLED, oozie log: 2020-03-11 13:38:02,044 INFO org.apache.oozie.service.JPAService: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[] No results found 2020-03-11 13:38:02,095 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@:start:] Start action [0004121-190920055838013-oozie-oozi-W@:start:] with user-retry state : userRetryCount [0], userRetryMax [0], userRetryInterval [10] 2020-03-11 13:38:02,119 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@:start:] [***0004121-190920055838013-oozie-oozi-W@:start:***]Action status=DONE 2020-03-11 13:38:02,119 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@:start:] [***0004121-190920055838013-oozie-oozi-W@:start:***]Action updated in DB! 2020-03-11 13:38:02,241 INFO org.apache.oozie.service.JPAService: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@:start:] No results found 2020-03-11 13:38:02,307 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@:start:] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W@:start: 2020-03-11 13:38:02,307 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W 2020-03-11 13:38:02,370 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@DeleteTargetPath] Start action [0004121-190920055838013-oozie-oozi-W@DeleteTargetPath] with user-retry state : userRetryCount [0], userRetryMax [0], userRetryInterval [10] 2020-03-11 13:38:02,444 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@DeleteTargetPath] [***0004121-190920055838013-oozie-oozi-W@DeleteTargetPath***]Action status=DONE 2020-03-11 13:38:02,474 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@DeleteTargetPath] [***0004121-190920055838013-oozie-oozi-W@DeleteTargetPath***]Action updated in DB! 2020-03-11 13:38:02,595 INFO org.apache.oozie.service.JPAService: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@DeleteTargetPath] No results found 2020-03-11 13:38:02,707 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] Start action [0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] with user-retry state : userRetryCount [0], userRetryMax [0], userRetryInterval [10] 2020-03-11 13:38:05,274 INFO org.apache.oozie.action.hadoop.SparkActionExecutor: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] checking action, hadoop job ID [job_1568959071843_15753] status [RUNNING] 2020-03-11 13:38:05,295 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] [***0004121-190920055838013-oozie-oozi-W@MergeDLIEntities***]Action status=RUNNING 2020-03-11 13:38:05,295 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] [***0004121-190920055838013-oozie-oozi-W@MergeDLIEntities***]Action updated in DB! 2020-03-11 13:38:05,344 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W@MergeDLIEntities 2020-03-11 13:38:05,355 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@DeleteTargetPath] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W@DeleteTargetPath 2020-03-11 13:48:07,901 INFO org.apache.oozie.action.hadoop.SparkActionExecutor: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] checking action, hadoop job ID [job_1568959071843_15753] status [RUNNING] 2020-03-11 13:50:50,514 INFO org.apache.oozie.servlet.CallbackServlet: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] callback for action [0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] 2020-03-11 13:50:50,922 INFO org.apache.oozie.action.hadoop.SparkActionExecutor: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] Hadoop Jobs launched : [job_1568959071843_15754] 2020-03-11 13:50:50,952 INFO org.apache.oozie.action.hadoop.SparkActionExecutor: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] action completed, external ID [job_1568959071843_15753] 2020-03-11 13:50:50,973 WARN org.apache.oozie.action.hadoop.SparkActionExecutor: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] Launcher ERROR, reason: Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Application application_1568959071843_15754 finished with failed status 2020-03-11 13:50:50,995 WARN org.apache.oozie.action.hadoop.SparkActionExecutor: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] Launcher exception: Application application_1568959071843_15754 finished with failed status org.apache.spark.SparkException: Application application_1568959071843_15754 finished with failed status at org.apache.spark.deploy.yarn.Client.run(Client.scala:1171) at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1608) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) at org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:178) at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:90) at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:81) at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:57) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:235) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:459) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 2020-03-11 13:50:51,041 INFO org.apache.oozie.command.wf.ActionEndXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] ERROR is considered as FAILED for SLA 2020-03-11 13:50:51,094 INFO org.apache.oozie.service.JPAService: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] No results found 2020-03-11 13:50:51,115 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@Kill] Start action [0004121-190920055838013-oozie-oozi-W@Kill] with user-retry state : userRetryCount [0], userRetryMax [0], userRetryInterval [10] 2020-03-11 13:50:51,116 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@Kill] [***0004121-190920055838013-oozie-oozi-W@Kill***]Action status=DONE 2020-03-11 13:50:51,116 INFO org.apache.oozie.command.wf.ActionStartXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[sandro.labruzzo] GROUP[-] TOKEN[] APP[Infospace Merge Entities] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@Kill] [***0004121-190920055838013-oozie-oozi-W@Kill***]Action updated in DB! 2020-03-11 13:50:51,273 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@Kill] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W@Kill 2020-03-11 13:50:51,303 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W 2020-03-11 13:50:51,277 INFO org.apache.oozie.command.wf.WorkflowNotificationXCommand: SERVER[iis-cdh5-test-m3.ocean.icm.edu.pl] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0004121-190920055838013-oozie-oozi-W] ACTION[0004121-190920055838013-oozie-oozi-W@MergeDLIEntities] No Notification URL is defined. Therefore nothing to notify for job 0004121-190920055838013-oozie-oozi-W@MergeDLIEntities + + +
\ No newline at end of file