diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java index bf48605d26..b27fc9267b 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java @@ -7,6 +7,7 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier; public class ModelConstants { + public static final String DNET_SUBJECT_TYPOLOGIES = "dnet:subject_classification_typologies"; public static final String DNET_RESULT_TYPOLOGIES = "dnet:result_typologies"; public static final String DNET_PUBLICATION_RESOURCE = "dnet:publication_resource"; public static final String DNET_ACCESS_MODES = "dnet:access_modes"; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java index 9518efdb57..1759180d27 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java @@ -289,4 +289,12 @@ public class JsonPathTest { System.out.println("d = " + d); } + + @Test + public void testNull() throws Exception { + final Object p = null; + + System.out.println((String) p); + + } } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java index d03cc25895..32b2503b2f 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -6,6 +6,7 @@ import java.util.Collection; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; @@ -30,10 +33,16 @@ public class DedupRecordFactory { final DedupConfig dedupConf) { long ts = System.currentTimeMillis(); // - final JavaPairRDD inputJsonEntities = sc - .textFile(entitiesInputPath) + final JavaPairRDD inputJsonEntities = spark + .read() + .load(entitiesInputPath) + .as(Encoders.kryo(Oaf.class)) + .map( + (MapFunction) p -> new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(p), + Encoders.STRING()) + .javaRDD() .mapToPair( - (PairFunction) it -> new Tuple2( + (PairFunction) it -> new Tuple2<>( MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it)); // : source is the dedup_id, target is the id of the mergedIn @@ -74,9 +83,9 @@ public class DedupRecordFactory { } } - private static Publication publicationMerger(Tuple2> e, final long ts) { + private static DLIPublication publicationMerger(Tuple2> e, final long ts) { - Publication p = new Publication(); // the result of the merge, to be returned at the end + DLIPublication p = new DLIPublication(); // the result of the merge, to be returned at the end p.setId(e._1()); @@ -110,9 +119,9 @@ public class DedupRecordFactory { return p; } - private static Dataset datasetMerger(Tuple2> e, final long ts) { + private static DLIDataset datasetMerger(Tuple2> e, final long ts) { - Dataset d = new Dataset(); // the result of the merge, to be returned at the end + DLIDataset d = new DLIDataset(); // the result of the merge, to be returned at the end d.setId(e._1()); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java index f86410d292..8646ac7422 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -9,18 +9,21 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.codehaus.jackson.map.ObjectMapper; import com.google.common.hash.Hashing; import eu.dnetlib.dedup.graph.ConnectedComponent; import eu.dnetlib.dedup.graph.GraphProcessor; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -42,7 +45,6 @@ public class SparkCreateConnectedComponent { .master(parser.get("master")) .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); final String entity = parser.get("entity"); final String targetPath = parser.get("targetPath"); @@ -50,8 +52,12 @@ public class SparkCreateConnectedComponent { // DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json"))); final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - final JavaPairRDD vertexes = sc - .textFile(inputPath + "/" + entity) + final JavaPairRDD vertexes = spark + .read() + .load(inputPath + "/" + entity) + .as(Encoders.kryo(Oaf.class)) + .map((MapFunction) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING()) + .javaRDD() .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair( (PairFunction) s -> new Tuple2(getHashcode(s), s)); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java index d87269f033..fa0ee1efb8 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java @@ -4,10 +4,10 @@ package eu.dnetlib.dedup; import org.apache.commons.io.IOUtils; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.pace.config.DedupConfig; @@ -41,12 +41,19 @@ public class SparkCreateDedupRecord { DedupUtility.createEntityPath(sourcePath, entity), OafEntityType.valueOf(entity), dedupConf); - dedupRecord - .map( - r -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(r); - }) - .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records"); + spark + .createDataset(dedupRecord.rdd(), Encoders.kryo(OafEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .save(dedupPath + "/" + entity + "/dedup_records"); +// +// +// dedupRecord +// .map( +// r -> { +// ObjectMapper mapper = new ObjectMapper(); +// return mapper.writeValueAsString(r); +// }) +// .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records"); } } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java index 41fe911e7e..6a98b112ad 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -7,10 +7,13 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.codehaus.jackson.map.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; @@ -46,8 +49,12 @@ public class SparkCreateSimRels { // DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - JavaPairRDD mapDocument = sc - .textFile(inputPath + "/" + entity) + JavaPairRDD mapDocument = spark + .read() + .load(inputPath + "/" + entity) + .as(Encoders.kryo(Oaf.class)) + .map((MapFunction) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING()) + .javaRDD() .mapToPair( s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java index e3d4fdbe36..59c0693991 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkPropagateRelationsJob.java @@ -14,16 +14,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class SparkPropagateRelationsJob { - enum FieldType { - SOURCE, TARGET - } - - static final String SOURCEJSONPATH = "$.source"; - static final String TARGETJSONPATH = "$.target"; public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -39,7 +34,6 @@ public class SparkPropagateRelationsJob { .master(parser.get("master")) .getOrCreate(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String relationPath = parser.get("relationPath"); final String mergeRelPath = parser.get("mergeRelPath"); final String targetRelPath = parser.get("targetRelPath"); @@ -50,63 +44,38 @@ public class SparkPropagateRelationsJob { .as(Encoders.bean(Relation.class)) .where("relClass == 'merges'"); - final Dataset rels = spark.read().load(relationPath).as(Encoders.bean(Relation.class)); + final Dataset rels = spark + .read() + .load(relationPath) + .as(Encoders.kryo(DLIRelation.class)) + .map( + (MapFunction) r -> r, + Encoders.bean(DLIRelation.class)); - final Dataset firstJoin = rels + final Dataset firstJoin = rels .joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer") .map( - (MapFunction, Relation>) r -> { + (MapFunction, DLIRelation>) r -> { final Relation mergeRelation = r._2(); - final Relation relation = r._1(); - + final DLIRelation relation = r._1(); if (mergeRelation != null) relation.setSource(mergeRelation.getSource()); return relation; }, - Encoders.bean(Relation.class)); + Encoders.bean(DLIRelation.class)); - final Dataset secondJoin = firstJoin + final Dataset secondJoin = firstJoin .joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer") .map( - (MapFunction, Relation>) r -> { + (MapFunction, DLIRelation>) r -> { final Relation mergeRelation = r._2(); - final Relation relation = r._1(); + final DLIRelation relation = r._1(); if (mergeRelation != null) relation.setTarget(mergeRelation.getSource()); return relation; }, - Encoders.bean(Relation.class)); + Encoders.kryo(DLIRelation.class)); secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath); } - - private static boolean containsDedup(final String json) { - final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json); - final String target = DHPUtils.getJPathString(TARGETJSONPATH, json); - - return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup"); - } - - private static String replaceField(final String json, final String id, final FieldType type) { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - try { - Relation relation = mapper.readValue(json, Relation.class); - if (relation.getDataInfo() == null) - relation.setDataInfo(new DataInfo()); - relation.getDataInfo().setDeletedbyinference(false); - switch (type) { - case SOURCE: - relation.setSource(id); - return mapper.writeValueAsString(relation); - case TARGET: - relation.setTarget(id); - return mapper.writeValueAsString(relation); - default: - throw new IllegalArgumentException(""); - } - } catch (IOException e) { - throw new RuntimeException("unable to deserialize json relation: " + json, e); - } - } } diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala new file mode 100644 index 0000000000..1b29fdea42 --- /dev/null +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/sx/SparkUpdateEntityWithDedupInfo.scala @@ -0,0 +1,75 @@ +package eu.dnetlib.dedup.sx + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils} +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.LoggerFactory +import org.apache.spark.sql.functions.col + +object SparkUpdateEntityWithDedupInfo { + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass) + parser.parseArgument(args) + + val workingPath: String = parser.get("workingPath") + logger.info(s"Working dir path = $workingPath") + + implicit val oafEncoder: Encoder[OafEntity] = Encoders.kryo[OafEntity] + implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) + + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] + implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + + + val spark: SparkSession = SparkSession + .builder() + .appName(SparkUpdateEntityWithDedupInfo.getClass.getSimpleName) + .master(parser.get("master")) + .getOrCreate() + + + val entityPath = parser.get("entityPath") + val mergeRelPath = parser.get("mergeRelPath") + val dedupRecordPath = parser.get("dedupRecordPath") + val entity = parser.get("entity") + val destination = parser.get("targetPath") + + val mergedIds = spark.read.load(mergeRelPath).as[Relation] + .where("relClass == 'merges'") + .select(col("target")) + + + val entities: Dataset[(String, OafEntity)] = spark + .read + .load(entityPath).as[OafEntity] + .map(o => (o.getId, o))(Encoders.tuple(Encoders.STRING, oafEncoder)) + + + val finalDataset:Dataset[OafEntity] = entities.joinWith(mergedIds, entities("_1").equalTo(mergedIds("target")), "left") + .map(k => { + val e: OafEntity = k._1._2 + val t = k._2 + if (t != null && t.getString(0).nonEmpty) { + if (e.getDataInfo == null) { + e.setDataInfo(OafUtils.generateDataInfo()) + } + e.getDataInfo.setDeletedbyinference(true) + } + e + }) + + val dedupRecords :Dataset[OafEntity] = spark.read.load(dedupRecordPath).as[OafEntity] + + finalDataset.union(dedupRecords) + .repartition(1200).write + .mode(SaveMode.Overwrite).save(destination) + + } + +} diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml index 2f22bb764d..d1196bfb1b 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/dedup/oozie_app/workflow.xml @@ -144,7 +144,7 @@ yarn-cluster cluster Update ${entity} and add DedupRecord - eu.dnetlib.dedup.sx.SparkUpdateEntityJob + eu.dnetlib.dedup.sx.SparkUpdateEntityWithDedupInfo dhp-dedup-scholexplorer-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 7091d97403..ae1b379061 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -90,6 +90,7 @@ public class CleanGraphSparkJob { final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); readTableFromPath(spark, inputPath, clazz) + .map((MapFunction) value -> fixVocabularyNames(value), Encoders.bean(clazz)) .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) .map((MapFunction) value -> fixDefaults(value), Encoders.bean(clazz)) .write() @@ -98,6 +99,65 @@ public class CleanGraphSparkJob { .json(outputPath); } + protected static T fixVocabularyNames(T value) { + if (value instanceof Datasource) { + // nothing to clean here + } else if (value instanceof Project) { + // nothing to clean here + } else if (value instanceof Organization) { + Organization o = (Organization) value; + if (Objects.nonNull(o.getCountry())) { + fixVocabName(o.getCountry(), ModelConstants.DNET_COUNTRY_TYPE); + } + } else if (value instanceof Relation) { + // nothing to clean here + } else if (value instanceof Result) { + + Result r = (Result) value; + + fixVocabName(r.getLanguage(), ModelConstants.DNET_LANGUAGES); + fixVocabName(r.getResourcetype(), ModelConstants.DNET_DATA_CITE_RESOURCE); + fixVocabName(r.getBestaccessright(), ModelConstants.DNET_ACCESS_MODES); + + if (Objects.nonNull(r.getSubject())) { + r.getSubject().forEach(s -> fixVocabName(s.getQualifier(), ModelConstants.DNET_SUBJECT_TYPOLOGIES)); + } + if (Objects.nonNull(r.getInstance())) { + for (Instance i : r.getInstance()) { + fixVocabName(i.getAccessright(), ModelConstants.DNET_ACCESS_MODES); + fixVocabName(i.getRefereed(), ModelConstants.DNET_REVIEW_LEVELS); + } + } + if (Objects.nonNull(r.getAuthor())) { + r.getAuthor().forEach(a -> { + if (Objects.nonNull(a.getPid())) { + a.getPid().forEach(p -> { + fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES); + }); + } + }); + } + if (value instanceof Publication) { + + } else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) { + + } else if (value instanceof OtherResearchProduct) { + + } else if (value instanceof Software) { + + } + } + + return value; + } + + private static void fixVocabName(Qualifier q, String vocabularyName) { + if (Objects.nonNull(q) && StringUtils.isBlank(q.getSchemeid())) { + q.setSchemeid(vocabularyName); + q.setSchemename(vocabularyName); + } + } + protected static T fixDefaults(T value) { if (value instanceof Datasource) { // nothing to clean here @@ -113,6 +173,9 @@ public class CleanGraphSparkJob { } else if (value instanceof Result) { Result r = (Result) value; + if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { + r.setPublisher(null); + } if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { r .setLanguage( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index da2ba47237..87c935d835 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -44,6 +44,7 @@ import java.util.Date; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; @@ -113,6 +115,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final String hdfsPath = parser.get("hdfsPath"); log.info("hdfsPath: {}", hdfsPath); + final String nsPrefixBlacklist = parser.get("nsPrefixBlacklist"); + log.info("nsPrefixBlacklist: {}", nsPrefixBlacklist); + + final Predicate verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist); + final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); log.info("processClaims: {}", processClaims); @@ -123,23 +130,25 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i smdbe.execute("queryClaims.sql", smdbe::processClaims); } else { log.info("Processing datasources..."); - smdbe.execute("queryDatasources.sql", smdbe::processDatasource); + smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix); log.info("Processing projects..."); if (dbSchema.equalsIgnoreCase("beta")) { - smdbe.execute("queryProjects.sql", smdbe::processProject); + smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix); } else { - smdbe.execute("queryProjects_production.sql", smdbe::processProject); + smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix); } log.info("Processing orgs..."); - smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); log.info("Processing relationsNoRemoval ds <-> orgs ..."); - smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); + smdbe + .execute( + "queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix); log.info("Processing projects <-> orgs ..."); - smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); + smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix); } log.info("All done."); } @@ -163,10 +172,20 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } public void execute(final String sqlFile, final Function> producer) + throws Exception { + execute(sqlFile, producer, oaf -> true); + } + + public void execute(final String sqlFile, final Function> producer, + final Predicate predicate) throws Exception { final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile)); - final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf)); + final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> { + if (predicate.test(oaf)) { + emitOaf(oaf); + } + }); dbClient.processResults(sql, consumer); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VerifyNsPrefixPredicate.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VerifyNsPrefixPredicate.java new file mode 100644 index 0000000000..1e99d298d7 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VerifyNsPrefixPredicate.java @@ -0,0 +1,62 @@ + +package eu.dnetlib.dhp.oa.graph.raw.common; + +import java.util.HashSet; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.base.Splitter; + +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; + +/** + * This predicate should be used to skip oaf objects using a blacklist of nsprefixes. + * + * @author michele + */ +public class VerifyNsPrefixPredicate implements Predicate { + + final Set invalids = new HashSet<>(); + + public VerifyNsPrefixPredicate(final String blacklist) { + if (StringUtils.isNotBlank(blacklist)) { + Splitter + .on(",") + .trimResults() + .omitEmptyStrings() + .split(blacklist) + .forEach(invalids::add); + } + } + + @Override + public boolean test(final Oaf oaf) { + if (oaf instanceof Datasource) { + return testValue(((Datasource) oaf).getNamespaceprefix().getValue()); + } else if (oaf instanceof OafEntity) { + return testValue(((OafEntity) oaf).getId()); + } else if (oaf instanceof Relation) { + return testValue(((Relation) oaf).getSource()) && testValue(((Relation) oaf).getTarget()); + } else { + return true; + } + } + + protected boolean testValue(final String s) { + if (StringUtils.isNotBlank(s)) { + for (final String invalid : invalids) { + if (Pattern.matches("^(\\d\\d\\|)?" + invalid + ".*$", s)) { + return false; + } + } + } + return true; + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala index 41fcd26362..be92b60ebe 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/EBIAggregator.scala @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator @@ -35,6 +36,88 @@ object EBIAggregator { } + + def getDLIUnknownAggregator(): Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown] = new Aggregator[(String, DLIUnknown), DLIUnknown, DLIUnknown]{ + + override def zero: DLIUnknown = new DLIUnknown() + + override def reduce(b: DLIUnknown, a: (String, DLIUnknown)): DLIUnknown = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + override def merge(wx: DLIUnknown, wy: DLIUnknown): DLIUnknown = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: DLIUnknown): DLIUnknown = reduction + + override def bufferEncoder: Encoder[DLIUnknown] = + Encoders.kryo(classOf[DLIUnknown]) + + override def outputEncoder: Encoder[DLIUnknown] = + Encoders.kryo(classOf[DLIUnknown]) + } + + def getDLIDatasetAggregator(): Aggregator[(String, DLIDataset), DLIDataset, DLIDataset] = new Aggregator[(String, DLIDataset), DLIDataset, DLIDataset]{ + + override def zero: DLIDataset = new DLIDataset() + + override def reduce(b: DLIDataset, a: (String, DLIDataset)): DLIDataset = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + override def merge(wx: DLIDataset, wy: DLIDataset): DLIDataset = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: DLIDataset): DLIDataset = reduction + + override def bufferEncoder: Encoder[DLIDataset] = + Encoders.kryo(classOf[DLIDataset]) + + override def outputEncoder: Encoder[DLIDataset] = + Encoders.kryo(classOf[DLIDataset]) + } + + + def getDLIPublicationAggregator(): Aggregator[(String, DLIPublication), DLIPublication, DLIPublication] = new Aggregator[(String, DLIPublication), DLIPublication, DLIPublication]{ + + override def zero: DLIPublication = new DLIPublication() + + override def reduce(b: DLIPublication, a: (String, DLIPublication)): DLIPublication = { + b.mergeFrom(a._2) + if (b.getId == null) + b.setId(a._2.getId) + b + } + + + override def merge(wx: DLIPublication, wy: DLIPublication): DLIPublication = { + wx.mergeFrom(wy) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: DLIPublication): DLIPublication = reduction + + override def bufferEncoder: Encoder[DLIPublication] = + Encoders.kryo(classOf[DLIPublication]) + + override def outputEncoder: Encoder[DLIPublication] = + Encoders.kryo(classOf[DLIPublication]) + } + + def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{ override def zero: Publication = new Publication() @@ -85,5 +168,27 @@ object EBIAggregator { } + def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{ + + override def zero: DLIRelation = new DLIRelation() + + override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = { + a._2 + } + + + override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = { + if(b!= null) b else a + } + override def finish(reduction: DLIRelation): DLIRelation = reduction + + override def bufferEncoder: Encoder[DLIRelation] = + Encoders.kryo(classOf[DLIRelation]) + + override def outputEncoder: Encoder[DLIRelation] = + Encoders.kryo(classOf[DLIRelation]) + } + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala index 897bbd5407..2d3f75b918 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkAddLinkUpdates.scala @@ -1,8 +1,9 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Instance, KeyValue, Oaf} +import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset} import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier -import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIRelation, OafUtils, ProvenaceInfo} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo} +import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.scholexplorer.relation.RelationMapper import org.apache.commons.io.IOUtils @@ -12,6 +13,7 @@ import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse +import org.apache.spark.sql.functions._ import scala.collection.JavaConverters._ @@ -28,6 +30,64 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin } + + def journalToOAF(pj:PMJournal): Journal = { + val j = new Journal + j.setIssnPrinted(pj.getIssn) + j.setVol(pj.getVolume) + j.setName(pj.getTitle) + j.setIss(pj.getIssue) + j.setDataInfo(OafUtils.generateDataInfo()) + j + } + + + def pubmedTOPublication(input:PMArticle):DLIPublication = { + + + val dnetPublicationId = s"50|${DHPUtils.md5(s"${input.getPmid}::pmid")}" + + val p = new DLIPublication + p.setId(dnetPublicationId) + p.setDataInfo(OafUtils.generateDataInfo()) + p.setPid(List(OafUtils.createSP(input.getPmid.toLowerCase.trim, "pmid", "dnet:pid_types")).asJava) + p.setCompletionStatus("complete") + val pi = new ProvenaceInfo + pi.setId("dli_________::europe_pmc__") + pi.setName( "Europe PMC") + pi.setCompletionStatus("complete") + pi.setCollectionMode("collected") + p.setDlicollectedfrom(List(pi).asJava) + p.setCollectedfrom(List(generatePubmedDLICollectedFrom()).asJava) + + if (input.getAuthors != null && input.getAuthors.size() >0) { + var aths: List[Author] = List() + input.getAuthors.asScala.filter(a=> a!= null).foreach(a => { + val c = new Author + c.setFullname(a.getFullName) + c.setName(a.getForeName) + c.setSurname(a.getLastName) + aths = aths ::: List(c) + }) + if (aths.nonEmpty) + p.setAuthor(aths.asJava) + } + + + if (input.getJournal != null) + p.setJournal(journalToOAF(input.getJournal)) + p.setTitle(List(OafUtils.createSP(input.getTitle, "main title", "dnet:dataCite_title")).asJava) + p.setDateofacceptance(OafUtils.asField(input.getDate)) + val i = new Instance + i.setCollectedfrom(generatePubmedDLICollectedFrom()) + i.setDateofacceptance(p.getDateofacceptance) + i.setUrl(List(s"https://pubmed.ncbi.nlm.nih.gov/${input.getPmid}").asJava) + i.setInstancetype(createQualifier("0001", "Article", "dnet:publication_resource", "dnet:publication_resource")) + p.setInstance(List(i).asJava) + p + } + + def ebiLinksToOaf(input:(String, String)):List[Oaf] = { val pmid :String = input._1 val input_json :String = input._2 @@ -116,8 +176,16 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin val workingPath = parser.get("workingPath") implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication] implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) + implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author]) + implicit val strEncoder:Encoder[String] = Encoders.STRING + implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) + implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) + implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) + val ds:Dataset[(String,String)] = spark.read.load(s"$workingPath/baseline_links_updates").as[(String,String)](Encoders.tuple(Encoders.STRING, Encoders.STRING)) @@ -133,6 +201,46 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset") + val idPublicationSolved:Dataset[String] = spark.read.load(s"$workingPath/baseline_links_updates").where(col("links").isNotNull).select("pmid").as[String] + val baseline:Dataset[(String, PMArticle)]= spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle].map(p=> (p.getPmid, p))(Encoders.tuple(strEncoder,PMEncoder)) + idPublicationSolved.joinWith(baseline, idPublicationSolved("pmid").equalTo(baseline("_1"))).map(k => pubmedTOPublication(k._2._2)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_publication") + + + val pmaDatasets = spark.read.load("/user/sandro.labruzzo/scholix/EBI/ebi_garr/baseline_dataset").as[PMArticle] + + pmaDatasets.map(p => pubmedTOPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_all") + + val pubs: Dataset[(String,Publication)] = spark.read.load("/user/sandro.labruzzo/scholix/EBI/publication").as[Publication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,oafpubEncoder)) + val pubdate:Dataset[(String,DLIPublication)] = spark.read.load(s"$workingPath/baseline_publication_all").as[DLIPublication].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING,pubEncoder)) + + + + pubs.joinWith(pubdate, pubs("_1").equalTo(pubdate("_1"))).map(k => k._2._2).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_publication_ebi") + + + + val dt : Dataset[DLIDataset] = spark.read.load(s"$workingPath/dataset").as[DLIDataset] + val update : Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_dataset").as[DLIDataset] + + + dt.union(update).map(d => (d.getId,d))(Encoders.tuple(Encoders.STRING, datEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi") + + + val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation] + val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation] + + + rel.union(relupdate) + .map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite) + .save(s"$workingPath/baseline_relation_ebi") } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala index 60857f0fcf..008e3c99b4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/SparkCreateEBIDataFrame.scala @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} import eu.dnetlib.scholexplorer.relation.RelationMapper import org.apache.commons.io.IOUtils @@ -10,6 +11,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.slf4j.{Logger, LoggerFactory} + import scala.collection.JavaConverters._ object SparkCreateEBIDataFrame { @@ -34,54 +36,51 @@ object SparkCreateEBIDataFrame { val relationMapper = RelationMapper.load implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) - implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset]) - implicit val pubEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication]) - implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) + implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) + implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) - logger.info("Extract Publication and relation from publication_xml") - val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => - { - new ObjectMapper().readValue(s, classOf[String]) - }).flatMap(s => { - val d = new PublicationScholexplorerParser - d.parseObject(s, relationMapper).asScala.iterator}) +// logger.info("Extract Publication and relation from publication_xml") +// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => +// { +// new ObjectMapper().readValue(s, classOf[String]) +// }).flatMap(s => { +// val d = new PublicationScholexplorerParser +// d.parseObject(s, relationMapper).asScala.iterator}) +// +// val mapper = new ObjectMapper() +// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) +// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") +// +// logger.info("Extract Publication and relation from dataset_xml") +// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s => +// { +// new ObjectMapper().readValue(s, classOf[String]) +// }).flatMap(s => { +// val d = new DatasetScholexplorerParser +// d.parseObject(s, relationMapper).asScala.iterator}) - val mapper = new ObjectMapper() - mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) - spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf") - - logger.info("Extract Publication and relation from dataset_xml") - val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s => - { - new ObjectMapper().readValue(s, classOf[String]) - }).flatMap(s => { - val d = new DatasetScholexplorerParser - d.parseObject(s, relationMapper).asScala.iterator}) - - spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") - val dataset: Dataset[OafDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[OafDataset]).map(d => d.asInstanceOf[OafDataset]) - val publication: Dataset[Publication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Publication]).map(d => d.asInstanceOf[Publication]) - val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation]) +// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") + val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset]) + val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication]) + val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation]) publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getPublicationAggregator().toColumn) + .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/publication") dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDatasetAggregator().toColumn) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset") relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getRelationAggregator().toColumn) + .agg(EBIAggregator.getDLIRelationAggregator().toColumn) .map(p => p._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") - - - relations.map(r => (r.getSource, r.getTarget))(Encoders.tuple(Encoders.STRING,Encoders.STRING)) } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java index 4a21985424..e27c9adaad 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/ebi/model/PMAuthor.java @@ -25,7 +25,8 @@ public class PMAuthor implements Serializable { } public String getFullName() { - return String.format("%s, %s", this.foreName, this.lastName); + return String + .format("%s, %s", this.foreName != null ? this.foreName : "", this.lastName != null ? this.lastName : ""); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala new file mode 100644 index 0000000000..b36c6abeff --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -0,0 +1,106 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} +import eu.dnetlib.dhp.sx.ebi.EBIAggregator +import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.LoggerFactory + +object SparkSplitOafTODLIEntities { + + + def getKeyRelation(rel:DLIRelation):String = { + s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}" + + + } + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) + parser.parseArgument(args) + + val workingPath: String = parser.get("workingPath") + logger.info(s"Working dir path = $workingPath") + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] + implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + + + + val spark:SparkSession = SparkSession + .builder() + .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) + .master(parser.get("master")) + .getOrCreate() + + + + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] + + val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset] + val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication] + val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation] + + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIPublication]) + .map(s =>s.asInstanceOf[DLIPublication]) + .union(ebi_publication) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication") + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIDataset]) + .map(s =>s.asInstanceOf[DLIDataset]) + .union(ebi_dataset) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIUnknown]) + .map(s =>s.asInstanceOf[DLIUnknown]) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, unkEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIUnknownAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown") + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIRelation]) + .map(s =>s.asInstanceOf[DLIRelation]) + .union(ebi_relation) + .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIRelationAggregator().toColumn) + .map(p => p._2) + .repartition(1000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") + + + + + + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala new file mode 100644 index 0000000000..f74a6043b7 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkXMLToOAFDataset.scala @@ -0,0 +1,73 @@ +package eu.dnetlib.dhp.sx.graph + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} +import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} +import eu.dnetlib.scholexplorer.relation.RelationMapper +import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + + +/** + * This new version of the Job read a sequential File containing XML stored in the aggregator and generates a Dataset OAF of heterogeneous + * entities like Dataset, Relation, Publication and Unknown + */ + +object SparkXMLToOAFDataset { + + + def main(args: Array[String]): Unit = { + val logger = LoggerFactory.getLogger(SparkXMLToOAFDataset.getClass) + val conf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkXMLToOAFDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_graph_scholix_parameters.json"))) + parser.parseArgument(args) + val spark = + SparkSession + .builder() + .config(conf) + .appName(SparkXMLToOAFDataset.getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + val sc = spark.sparkContext + + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication] + implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation] + + val relationMapper = RelationMapper.load + + val inputPath: String = parser.get("sourcePath") + val entity: String = parser.get("entity") + val targetPath = parser.get("targetPath") + + logger.info(s"Input path is $inputPath") + logger.info(s"Entity path is $entity") + logger.info(s"Target Path is $targetPath") + + val scholixRdd:RDD[Oaf] = sc.sequenceFile(inputPath, classOf[IntWritable], classOf[Text]) + .map(s => s._2.toString) + .flatMap(s => { + entity match { + case "publication" => + val p = new PublicationScholexplorerParser + val l =p.parseObject(s, relationMapper) + if (l != null) l.asScala else List() + case "dataset" => + val d = new DatasetScholexplorerParser + val l =d.parseObject(s, relationMapper) + if (l != null) l.asScala else List() + } + }).filter(s => s!= null) + spark.createDataset(scholixRdd).write.mode(SaveMode.Append).save(targetPath) + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java index 60371fa536..11d9905ccd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/parser/DatasetScholexplorerParser.java @@ -317,6 +317,15 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { .collect(Collectors.toList())); } + // TERRIBLE HACK TO AVOID EMPTY COLLECTED FROM + if (parsedObject.getDlicollectedfrom() == null) { + + final KeyValue cf = new KeyValue(); + cf.setKey("dli_________::europe_pmc__"); + cf.setValue("Europe PMC"); + parsedObject.setCollectedfrom(Collections.singletonList(cf)); + } + if (StringUtils.isNotBlank(resolvedURL)) { Instance i = new Instance(); i.setCollectedfrom(parsedObject.getCollectedfrom().get(0)); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json index 6dfef32db0..b23ac65464 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json @@ -40,5 +40,11 @@ "paramLongName": "dbschema", "paramDescription": "the database schema according to the D-Net infrastructure (beta or production)", "paramRequired": true + }, + { + "paramName": "nsbl", + "paramLongName": "nsPrefixBlacklist", + "paramDescription": "a blacklist of nsprefixes (comma separeted)", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index d8b61b5ead..d8146d9a22 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -43,7 +43,11 @@ isLookupUrl the address of the lookUp service - + + nsPrefixBlacklist + + a blacklist of nsprefixes (comma separeted) + sparkDriverMemory memory for driver process @@ -131,6 +135,7 @@ --isLookupUrl${isLookupUrl} --actionclaims --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} @@ -182,6 +187,7 @@ --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml index 66eaeeb263..4c319d0378 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml @@ -38,7 +38,11 @@ isLookupUrl the address of the lookUp service - + + nsPrefixBlacklist + + a blacklist of nsprefixes (comma separeted) + sparkDriverMemory memory for driver process @@ -113,6 +117,7 @@ --isLookupUrl${isLookupUrl} --actionclaims --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml index eea8d0a5ab..29d4269ef8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml @@ -25,7 +25,11 @@ isLookupUrl the address of the lookUp service - + + nsPrefixBlacklist + + a blacklist of nsprefixes (comma separeted) + sparkDriverMemory memory for driver process @@ -99,6 +103,7 @@ --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} @@ -117,6 +122,7 @@ --isLookupUrl${isLookupUrl} --dbschema${dbSchema} --actionclaims + --nsPrefixBlacklist${nsPrefixBlacklist} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml index 8684181526..9b68cfb053 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml @@ -28,6 +28,11 @@ isLookupUrl the address of the lookUp service + + nsPrefixBlacklist + + a blacklist of nsprefixes (comma separeted) + sparkDriverMemory memory for driver process @@ -67,6 +72,7 @@ -pguser${postgresUser} -pgpasswd${postgresPassword} -islookup${isLookupUrl} + --nsPrefixBlacklist${nsPrefixBlacklist} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml index cac3cc2bb0..17cd6c9a33 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/config-default.xml @@ -1,7 +1,7 @@ - + - - jobTracker - yarn - - - nameNode - hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 - - - hive_metastore_uris - thrift://hadoop-edge3.garr-pa1.d4science.org:9083 - - - spark2YarnHistoryServerAddress - http://hadoop-rm2.garr-pa1.d4science.org:19888 - + + + + + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml index a5035c56c6..7e63362428 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/ebi/oozie_app/workflow.xml @@ -18,7 +18,7 @@ - + @@ -48,6 +48,28 @@ + + + yarn-cluster + cluster + Create EBI DataSet + + eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=1000 + ${sparkExtraOPT} + + --workingPath${workingPath} + --masteryarn + + + + + yarn-cluster @@ -71,27 +93,7 @@ - - - yarn-cluster - cluster - Create EBI DataSet - eu.dnetlib.dhp.sx.ebi.SparkCreateEBIDataFrame - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=1000 - ${sparkExtraOPT} - - --workingPath${workingPath} - --masteryarn - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json index 1c02109d01..febcfc8981 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json @@ -1,7 +1,4 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the result data", "paramRequired": true}, - {"paramName":"td", "paramLongName":"targetDir", "paramDescription": "the name of the result data", "paramRequired": true}, - {"paramName":"e", "paramLongName":"entities", "paramDescription": "the entity type to be filtered", "paramRequired": true} + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml index d74d68663f..c94394b1ee 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml @@ -101,12 +101,17 @@ yarn-cluster cluster Import ${entity} and related entities - eu.dnetlib.dhp.sx.graph.SparkScholexplorerGraphImporter + eu.dnetlib.dhp.sx.graph.SparkXMLToOAFDataset dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} - -mt yarn-cluster + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} + + -mt yarn --sourcePath${targetXMLPath} - --targetPath${targetEntityPath} + --targetPath${workingPath}/input/OAFDataset --entity${entity} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml index 46e2dc3f98..fabe7510bb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml @@ -1,16 +1,8 @@ - sourcePath - the source path - - - targetPath - the source path - - - targetDir - the name of the path + workingPath + the working path sparkDriverMemory @@ -20,32 +12,13 @@ sparkExecutorMemory memory for individual executor - - entities - the entities to be extracted - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - - @@ -53,19 +26,18 @@ ${nameNode} yarn-cluster cluster - Extract ${entities} - eu.dnetlib.dhp.sx.graph.SparkExtractEntitiesJob + Extract DLI Entities + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 ${sparkExtraOPT} -mt yarn-cluster - --sourcePath${sourcePath} - --targetPath${targetPath} - --targetDir${targetDir} - --entities${entities} + --workingPath${workingPath} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index 559a30b1ea..e1ef847c38 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -7,6 +7,8 @@ import static org.mockito.Mockito.lenient; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.IOUtils; @@ -19,9 +21,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -62,10 +62,12 @@ public class CleaningFunctionTest { assertTrue(p_in instanceof Result); assertTrue(p_in instanceof Publication); - Publication p_out = OafCleaner.apply(p_in, mapping); + Publication p_out = OafCleaner.apply(CleanGraphSparkJob.fixVocabularyNames(p_in), mapping); assertNotNull(p_out); + assertNotNull(p_out.getPublisher()); + assertNull(p_out.getPublisher().getValue()); assertEquals("und", p_out.getLanguage().getClassid()); assertEquals("Undetermined", p_out.getLanguage().getClassname()); @@ -88,6 +90,16 @@ public class CleaningFunctionTest { Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out); assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid()); + assertNull(p_out.getPublisher()); + + getAuthorPids(p_defaults).forEach(pid -> { + System.out + .println( + String + .format( + "%s [%s - %s]", pid.getValue(), pid.getQualifier().getClassid(), + pid.getQualifier().getClassname())); + }); // TODO add more assertions to verity the cleaned values System.out.println(MAPPER.writeValueAsString(p_out)); @@ -97,7 +109,7 @@ public class CleaningFunctionTest { */ } - private Stream getAuthorPidTypes(Publication pub) { + private Stream getAuthorPidTypes(Result pub) { return pub .getAuthor() .stream() @@ -106,6 +118,14 @@ public class CleaningFunctionTest { .map(s -> s.getQualifier()); } + private Stream getAuthorPids(Result pub) { + return pub + .getAuthor() + .stream() + .map(a -> a.getPid()) + .flatMap(p -> p.stream()); + } + private List vocs() throws IOException { return IOUtils .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index dcb4c49b4c..0a513f6337 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -276,6 +276,17 @@ public class MappersTest { System.out.println("***************"); } + + @Test + void testClaimDedup() throws IOException { + final String xml = IOUtils.toString(getClass().getResourceAsStream("oaf_claim_dedup.xml")); + final List list = new OafToOafMapper(vocs, false).processMdRecord(xml); + + System.out.println("***************"); + System.out.println(new ObjectMapper().writeValueAsString(list)); + System.out.println("***************"); + } + private void assertValidId(final String id) { assertEquals(49, id.length()); assertEquals('|', id.charAt(2)); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/common/VerifyNsPrefixPredicateTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/common/VerifyNsPrefixPredicateTest.java new file mode 100644 index 0000000000..a14fb4ae33 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/common/VerifyNsPrefixPredicateTest.java @@ -0,0 +1,92 @@ + +package eu.dnetlib.dhp.oa.graph.raw.common; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Relation; + +class VerifyNsPrefixPredicateTest { + + private VerifyNsPrefixPredicate predicate; + + @BeforeEach + void setUp() throws Exception { + predicate = new VerifyNsPrefixPredicate("corda,nsf,wt"); + } + + @Test + void testTestValue() { + assertFalse(predicate.testValue("corda__2020")); + assertFalse(predicate.testValue("nsf________")); + assertFalse(predicate.testValue("nsf")); + assertFalse(predicate.testValue("corda")); + assertFalse(predicate.testValue("10|corda_______::fjkdsfjksdhfksj")); + assertFalse(predicate.testValue("20|corda_______::fjkdsfjksdhfksj")); + + assertTrue(predicate.testValue("xxxxxx_____")); + assertTrue(predicate.testValue("10|xxxxxx_____::sdasdasaddasad")); + + assertTrue(predicate.testValue(null)); + assertTrue(predicate.testValue("")); + } + + @Test + void testTest_ds_true() { + final Field prefix = new Field<>(); + prefix.setValue("xxxxxx______"); + + final Datasource ds = new Datasource(); + ds.setNamespaceprefix(prefix); + + assertTrue(predicate.test(ds)); + } + + @Test + void testTest_ds_false() { + final Field prefix = new Field<>(); + prefix.setValue("corda__2020"); + + final Datasource ds = new Datasource(); + ds.setNamespaceprefix(prefix); + + assertFalse(predicate.test(ds)); + } + + @Test + void testTest_rel_true() { + final Relation rel = new Relation(); + rel.setSource("10|yyyyyy______:sdfsfsffsdfs"); + rel.setTarget("10|xxxxxx______:sdfsfsffsdfs"); + assertTrue(predicate.test(rel)); + } + + @Test + void testTest_rel_false() { + final Relation rel = new Relation(); + rel.setSource("10|corda_______:sdfsfsffsdfs"); + rel.setTarget("10|xxxxxx______:sdfsfsffsdfs"); + assertFalse(predicate.test(rel)); + } + + @Test + void testTest_proj_true() { + final Project p = new Project(); + p.setId("10|xxxxxx______:sdfsfsffsdfs"); + assertTrue(predicate.test(p)); + } + + @Test + void testTest_proj_false() { + final Project p = new Project(); + p.setId("10|corda_____:sdfsfsffsdfs"); + assertFalse(predicate.test(p)); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index 5d0c0d1ed8..f51eed067d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -59,6 +59,28 @@ "schemename": "dnet:pid_types" }, "value": "qwerty" + }, + { + "dataInfo": { + "deletedbyinference": false, + "inferenceprovenance": "", + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:datasetarchive", + "classname": "sysimport:crosswalk:datasetarchive", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "qualifier": { + "classid": "ORCID", + "classname": "ORCID", + "schemeid": "", + "schemename": "" + }, + "value": "asdasd" } ], "rank": 2, @@ -186,6 +208,9 @@ } ], "bestaccessright": null, + "publisher": { + "value": null + }, "collectedfrom": [ { "key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_claim_dedup.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_claim_dedup.xml new file mode 100644 index 0000000000..95457fb701 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_claim_dedup.xml @@ -0,0 +1,182 @@ + + + dedup_wf_001::534276867e917fe9efe0cca10e363457 + 2020-08-02T22:55:40.866Z + openaire____ + 2020-08-02T23:53:04.582Z + + + ATLAS (IHEF, IoP, FNWI) + Doğuş Üniversitesi, Fen Edebiyat Fakültesi, Fizik Bölümü + TR3959 + Doğuş Üniversitesi, Fen Edebiyat Fakültesi, Fizik Bölümü + TR3959 + urn:issn:1748-0221 + VOLUME=7;ISSUE=1;ISSN=1748-0221;TITLE=Journal of Instrumentation + ATLAS Collaboration Mitsou, Vasiliki Fiorini, Luca Ros Martínez, Eduardo Castillo + Giménez, María Victoria Fuster Verdú, Juan A. García García, Carmen Cabrera Urbán, + Susana Martí García, Salvador Salt Cairols, José Lacasta Llácer, Carlos Valls Ferrer, + Juan Antonio Higón Rodríguez, Emilio Ferrer Soria, Antonio González de la Hoz, Santiago + Kaci, Mohammed Hernández Jiménez, Yesenia Villaplana Pérez, Miguel 2012 A study of the + material in the ATLAS inner detector using secondary hadronic interactions Journal Of + Instrumentation 7 P01013 1 41 + Journal of Instrumentation, 7(1) + Aad, G; Abbott, B; Abdallah, J; Abdelalim, AA; Abdesselam, A; Abdinov, O;  et + al.(2012). A study of the material in the ATLAS inner detector using secondary hadronic + interactions. Journal of Instrumentation, 7(1). doi: 10.1088/1748-0221/7/01/P01013. UC + Santa Cruz: Retrieved from: http://www.escholarship.org/uc/item/05j2j2br + Journal of Instrumentation, 7 + VOLUME=7;ISSN=1748-0221;TITLE=Journal of Instrumentation + 1748-0221 + Journal of Instrumentation 7, P01013 (2012). + doi:10.1088/1748-0221/7/01/P01013 + A measurement of the material in the ATLAS inner detector using secondary hadronic + interactions + und + Detector modelling and simulations I + (interaction of radiation with matter, interaction of photons with matter, interaction + of hadrons with matter, etc); Particle tracking detectors (Solid-state detectors); Si + microstrip and pad detectors; Large detector systems for particle and astroparticle + physics + of photons with matter, interaction of + hadrons with matter, etc) + Particle Physics - Experiment + Detector Modelling and + Simulations + Detector modelling and simulations I + (interaction of radiation with matter, interaction of photons with matter, interaction + of hadrons with matter, etc) + Large detector systems for particle and + astroparticle physics + Detector modelling and simulations I + (interaction of radiation with matter, interaction + Large Detector Systems + 530 + Science & Technology + :Ciências Físicas [Ciências + Naturais] + High Energy Physics - + Experiment + Detectors de radiació + Física nuclear + ddc:610 + Si microstrip and pad + detectors + Particle tracking detectors (Solid-state + detectors) + Col·lisions (Física nuclear) + Particle Tracking Detectors + IOP Publishing + application/pdf + application/pdf + application/pdf + application/pdf + application/pdf + application/pdf + 2016-05-02 + The ATLAS inner detector is used to reconstruct secondary vertices due to + hadronic interactions of primary collision products, so probing the location and amount + of material in the inner region of ATLAS. Data collected in 7 TeV pp collisions at the + LHC, with a minimum bias trigger, are used for comparisons with simulated events. The + reconstructed secondary vertices have spatial resolutions ranging from ~ 200μm to 1 mm. + The overall material description in the simulation is validated to within an + experimental uncertainty of about 7%. This will lead to a better understanding of the + reconstruction of various objects such as tracks, leptons, jets, and missing transverse + momentum. We acknowledge the support of ANPCyT, Argentina; YerPhI, Armenia; ARC, + Australia; BMWF, Austria; ANAS, Azerbaijan; SSTC, Belarus; CNPq and FAPESP, Brazil; + NSERC, NRC and CFI, Canada; CERN; CONICYT, Chile; CAS, MOST and NSFC, China; + COLCIENCIAS, Colombia; MSMT CR, MPO CR and VSC CR, Czech Republic; DNRF, DNSRC and + Lundbeck Foundation, Denmark; ARTEMIS, European Union; IN2P3-CNRS, CEA-DSM/IRFU, France; + GNAS, Georgia; BMBF, DFG, HGF, MPG and AvH Foundation, Germany; GSRT, Greece; ISF, + MINERVA, GIF, DIP and Benoziyo Center, Israel; INFN, Italy; MEXT and JSPS, Japan; CNRST, + Morocco; FOM and NWO, Netherlands; RCN, Norway; MNiSW, Poland; GRICES and FCT, Portugal; + MERYS (MECTS), Romania; MES of Russia and ROSATOM, Russian Federation; JINR; MSTD, + Serbia; MSSR, Slovakia; ARRS and MVZT, Slovenia; DST/NRF, South Africa; MICINN, Spain; + SRC and Wallenberg Foundation, Sweden; SER, SNSF and Cantons of Bern and Geneva, + Switzerland; NSC, Taiwan; TAEK, Turkey; STFC, the Royal Society and Leverhulme Trust, + United Kingdom; DOE and NSF, United States of America. + info:eu-repo/semantics/publishedVersion + NARCIS + DSpace@Dogus + Lancaster EPrints + CERN Document Server + DESY Publication Database + OpenAIRE + Publikationenserver der Georg-August-Universität Göttingen + arXiv.org e-Print Archive + CORE (RIOXX-UK Aggregator) + eScholarship - University of California + Universidade do Minho: RepositoriUM + Dokuz Eylul University Open Archive System + Repositori d'Objectes Digitals per a l'Ensenyament la Recerca i la + Cultura + info:eu-repo/semantics/altIdentifier/doi/10.1088/1748-0221/7/01/P01013 + info:eu-repo/semantics/altIdentifier/doi/10.1088/1748-0221/7/01/P01013. + Article + http://hdl.handle.net/11376/1605 + Article + http://www.escholarship.org/uc/item/05j2j2br + Unknown + http://cds.cern.ch/record/1394292 + Article + http://eprints.lancs.ac.uk/68235/ + Article + http://hdl.handle.net/10550/36188 + Article + http://eprints.gla.ac.uk/65933/1/65933.pdf + Preprint + http://arxiv.org/abs/1110.6191 + Article + http://dare.uva.nl/personal/pure/en/publications/a-study-of-the-material-in-the-atlas-inner-detector-using-secondary-hadronic-interactions(6b7667e2-04e2-4a66-92a8-ff4edbf61a17).html + Article + http://hdl.handle.net/1822/48768 + Article + http://resolver.sub.uni-goettingen.de/purl?gs-1/12231 + Article + http://bib-pubdb1.desy.de/search?p=id:%22PHPPUBDB-21212%22 + http://bib-pubdb1.desy.de/record/96807/files/CERN-PH-EP-2011-147_1110.6191v2.pdf + http://bib-pubdb1.desy.de//record/96807/files/CERN-PH-EP-2011-147_1110.6191v2.pdf + http://bib-pubdb1.desy.de/record/96807 + Article + http://arxiv.org/abs/1110.6191 + Article + http://hdl.handle.net/11376/1605 + http://dx.doi.org/10.1088/1748-0221/7/01/P01013 + Article + http://hdl.handle.net/2066/93736 + ATLAS Collaboration + 0001 + 0002 + 2016-05-02 + OPEN + und + + + + + + + + + file%3A%2F%2F%2Fsrv%2Fclaims%2Frecords%2Fpublication%2Fopenaire + + + + + + + false + false + 0.9 + + + + +