diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index 3fa5fcbab..17482c019 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -94,7 +94,13 @@ public class AuthorMerger { if (r.getPid() == null) { r.setPid(new ArrayList<>()); } - r.getPid().add(a._1()); + + // TERRIBLE HACK but for some reason when we create and Array with Arrays.asList, + // it creates of fixed size, and the add method raise UnsupportedOperationException at + // java.util.AbstractList.add + final List tmp = new ArrayList<>(r.getPid()); + tmp.add(a._1()); + r.setPid(tmp); } } }); diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java index b27fc9267..d759f0d55 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java @@ -39,15 +39,15 @@ public class ModelConstants { public static final String IS_SUPPLEMENT_TO = "isSupplementTo"; public static final String IS_SUPPLEMENTED_BY = "isSupplementedBy"; public static final String PART = "part"; - public static final String IS_PART_OF = "IsPartOf"; - public static final String HAS_PARTS = "HasParts"; + public static final String IS_PART_OF = "isPartOf"; + public static final String HAS_PARTS = "hasParts"; public static final String RELATIONSHIP = "relationship"; public static final String CITATION = "citation"; public static final String CITES = "cites"; - public static final String IS_CITED_BY = "IsCitedBy"; + public static final String IS_CITED_BY = "isCitedBy"; public static final String REVIEW = "review"; public static final String REVIEWS = "reviews"; - public static final String IS_REVIEWED_BY = "IsReviewedBy"; + public static final String IS_REVIEWED_BY = "isReviewedBy"; public static final String RESULT_PROJECT = "resultProject"; public static final String OUTCOME = "outcome"; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java index 7f0ca983f..b2d3253d5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -11,6 +12,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -175,43 +177,54 @@ public class PrepareProgramme { return csvProgramme; }); - prepareClassification(h2020Programmes); + // prepareClassification(h2020Programmes); - h2020Programmes - .map(csvProgramme -> OBJECT_MAPPER.writeValueAsString(csvProgramme)) + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1); + rdd + .map(csvProgramme -> { + String tmp = OBJECT_MAPPER.writeValueAsString(csvProgramme); + return tmp; + }) .saveAsTextFile(outputPath); } - private static void prepareClassification(JavaRDD h2020Programmes) { + private static List prepareClassification(JavaRDD h2020Programmes) { Object[] codedescription = h2020Programmes - .map(value -> new Tuple2<>(value.getCode(), value.getTitle())) + .map( + value -> new Tuple2<>(value.getCode(), + new Tuple2(value.getTitle(), value.getShortTitle()))) .collect() .toArray(); for (int i = 0; i < codedescription.length - 1; i++) { for (int j = i + 1; j < codedescription.length; j++) { - Tuple2 t2i = (Tuple2) codedescription[i]; - Tuple2 t2j = (Tuple2) codedescription[j]; + Tuple2> t2i = (Tuple2>) codedescription[i]; + Tuple2> t2j = (Tuple2>) codedescription[j]; if (t2i._1().compareTo(t2j._1()) > 0) { - Tuple2 temp = t2i; + Tuple2> temp = t2i; codedescription[i] = t2j; codedescription[j] = temp; } } } - Map map = new HashMap<>(); + Map> map = new HashMap<>(); for (int j = 0; j < codedescription.length; j++) { - Tuple2 entry = (Tuple2) codedescription[j]; + Tuple2> entry = (Tuple2>) codedescription[j]; String ent = entry._1(); if (ent.contains("Euratom-")) { ent = ent.replace("-Euratom-", ".Euratom."); } String[] tmp = ent.split("\\."); if (tmp.length <= 2) { - map.put(entry._1(), entry._2()); - + if (StringUtils.isEmpty(entry._2()._2())) { + map.put(entry._1(), new Tuple2(entry._2()._1(), entry._2()._1())); + } else { + map.put(entry._1(), entry._2()); + } } else { if (ent.endsWith(".")) { ent = ent.substring(0, ent.length() - 1); @@ -224,14 +237,14 @@ public class PrepareProgramme { key = key.substring(0, key.length() - 1); } } - String current = entry._2(); + String current = entry._2()._1(); if (!ent.contains("Euratom")) { String parent; String tmp_key = tmp[0] + "."; for (int i = 1; i < tmp.length - 1; i++) { tmp_key += tmp[i] + "."; - parent = map.get(tmp_key).toLowerCase().trim(); + parent = map.get(tmp_key)._1().toLowerCase().trim(); if (parent.contains("|")) { parent = parent.substring(parent.lastIndexOf("|") + 1).trim(); } @@ -246,18 +259,29 @@ public class PrepareProgramme { } } - map.put(ent + ".", map.get(key) + " | " + current); + String shortTitle = entry._2()._2(); + if (StringUtils.isEmpty(shortTitle)) { + shortTitle = current; + } + Tuple2 newEntry = new Tuple2<>(map.get(key)._1() + " | " + current, + map.get(key)._2() + " | " + shortTitle); + map.put(ent + ".", newEntry); } } - h2020Programmes.foreach(csvProgramme -> { - if (!csvProgramme.getCode().endsWith(".") && !csvProgramme.getCode().contains("Euratom") - && !csvProgramme.getCode().equals("H2020-EC")) - csvProgramme.setClassification(map.get(csvProgramme.getCode() + ".")); - else - csvProgramme.setClassification(map.get(csvProgramme.getCode())); - }); + return h2020Programmes.map(csvProgramme -> { + + String code = csvProgramme.getCode(); + if (!code.endsWith(".") && !code.contains("Euratom") + && !code.equals("H2020-EC")) + code += "."; + + csvProgramme.setClassification(map.get(code)._1()); + csvProgramme.setClassification_short(map.get(code)._2()); + + return csvProgramme; + }).collect(); } public static Dataset readPath( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index f2375e799..a583b7bfa 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; @@ -138,7 +137,8 @@ public class SparkAtomicActionJob { pm.setCode(csvProject.getProgramme()); h2020classification.setClassification(ocsvProgramme.get().getClassification()); h2020classification.setH2020Programme(pm); - setLevelsAndProgramme(h2020classification, ocsvProgramme.get().getClassification()); + setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short()); + // setProgramme(h2020classification, ocsvProgramme.get().getClassification()); pp.setH2020classification(Arrays.asList(h2020classification)); return pp; @@ -177,8 +177,8 @@ public class SparkAtomicActionJob { } - private static void setLevelsAndProgramme(H2020Classification h2020Classification, String classification) { - String[] tmp = classification.split(" \\| "); + private static void setLevelsandProgramme(H2020Classification h2020Classification, String classification_short) { + String[] tmp = classification_short.split(" \\| "); h2020Classification.setLevel1(tmp[0]); if (tmp.length > 1) { h2020Classification.setLevel2(tmp[1]); @@ -189,6 +189,12 @@ public class SparkAtomicActionJob { h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]); } +// private static void setProgramme(H2020Classification h2020Classification, String classification) { +// String[] tmp = classification.split(" \\| "); +// +// h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]); +// } + public static Dataset readPath( SparkSession spark, String inputPath, Class clazz) { return spark diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java index 6f9a59087..f991a4297 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java @@ -22,6 +22,15 @@ public class CSVProgramme implements Serializable { private String shortTitle; private String language; private String classification; + private String classification_short; + + public String getClassification_short() { + return classification_short; + } + + public void setClassification_short(String classification_short) { + this.classification_short = classification_short; + } public String getClassification() { return classification; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java index fdc577687..59b536cd5 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java @@ -9,12 +9,14 @@ import java.util.List; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException; import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; +@Disabled public class EXCELParserTest { private static Path workingDir; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java index c5801bccc..256dc0521 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java @@ -92,6 +92,8 @@ public class PrepareH2020ProgrammeTest { Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count()); + // tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); + Assertions .assertEquals( "Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft", diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java index cfda7e718..42e494681 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java @@ -78,7 +78,7 @@ public class SparkUpdateProjectTest { "-programmePath", getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz") + "/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz") .getPath(), "-projectPath", getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/prepared_projects.json").getPath(), @@ -124,7 +124,7 @@ public class SparkUpdateProjectTest { .getString(0)); Assertions .assertEquals( - "Societal challenges", + "Societal Challenges", execverification .filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'") .select("classification.level1") @@ -133,7 +133,7 @@ public class SparkUpdateProjectTest { .getString(0)); Assertions .assertEquals( - "Smart, Green And Integrated Transport", + "Transport", execverification .filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'") .select("classification.level2") @@ -188,7 +188,7 @@ public class SparkUpdateProjectTest { .getString(0)); Assertions .assertEquals( - "Nurturing excellence by means of cross-border and cross-sector mobility", + "MSCA Mobility", execverification .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .select("classification.h2020Programme.description") @@ -197,7 +197,7 @@ public class SparkUpdateProjectTest { .getString(0)); Assertions .assertEquals( - "Excellent science", + "Excellent Science", execverification .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .select("classification.level1") @@ -206,7 +206,7 @@ public class SparkUpdateProjectTest { .getString(0)); Assertions .assertEquals( - "Marie Skłodowska-Curie Actions", + "Marie-Sklodowska-Curie Actions", execverification .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .select("classification.level2") @@ -215,7 +215,7 @@ public class SparkUpdateProjectTest { .getString(0)); Assertions .assertEquals( - "Nurturing excellence by means of cross-border and cross-sector mobility", + "MSCA Mobility", execverification .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .select("classification.level3") diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java index 8bcf08906..9370efdb3 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -6,8 +6,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.ssl.SSLContextBuilder; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; + +@Disabled public class HttpConnectorTest { private static final Log log = LogFactory.getLog(HttpConnectorTest.class); diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz deleted file mode 100644 index d40bac8c6..000000000 Binary files a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz and /dev/null differ diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz index 1afa73061..01e804ff5 100644 Binary files a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz differ diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/whole_programme.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/whole_programme.json.gz index 60c3bf05a..8b1982dee 100644 Binary files a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/whole_programme.json.gz and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/project/whole_programme.json.gz differ diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 772af010f..a29809fc0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -1,13 +1,15 @@ package eu.dnetlib.doiboost import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset, Organization} +import eu.dnetlib.dhp.oa.merge.AuthorMerger +import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} + import scala.collection.JavaConverters._ object SparkGenerateDoiBoost { @@ -49,6 +51,7 @@ object SparkGenerateDoiBoost { val otherPub = item._2._2 if (otherPub != null) { crossrefPub.mergeFrom(otherPub) + crossrefPub.setAuthor(AuthorMerger.mergeAuthor(crossrefPub.getAuthor, otherPub.getAuthor)) } } crossrefPub diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala index 243719549..b3402ee9f 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/QueryTest.scala @@ -38,6 +38,8 @@ class QueryTest { def myQuery(spark:SparkSession, sc:SparkContext): Unit = { implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] + + val mapper = new ObjectMapper() mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala index d0df28b2d..822b16263 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkSplitOafTODLIEntities.scala @@ -4,7 +4,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown} import eu.dnetlib.dhp.sx.ebi.EBIAggregator -import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import org.apache.commons.io.IOUtils import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.LoggerFactory @@ -18,38 +17,38 @@ object SparkSplitOafTODLIEntities { } - def main(args: Array[String]): Unit = { - val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) - val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) - parser.parseArgument(args) - val workingPath: String = parser.get("workingPath") - logger.info(s"Working dir path = $workingPath") + def extract_dataset(spark:SparkSession, workingPath:String) :Unit = { + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000) + + val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000) + + + OAFDataset + .filter(s => s != null && s.isInstanceOf[DLIDataset]) + .map(s =>s.asInstanceOf[DLIDataset]) + .union(ebi_dataset) + .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) + .map(p => p._2) + .repartition(2000) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") + + } + + def extract_publication(spark:SparkSession, workingPath:String) :Unit = { implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] - implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] - implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] - implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] - - - - val spark:SparkSession = SparkSession - .builder() - .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .master(parser.get("master")) - .getOrCreate() - - - val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] - val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset] - val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication] - val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation] - + val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000) OAFDataset @@ -60,20 +59,17 @@ object SparkSplitOafTODLIEntities { .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .map(p => p._2) - .repartition(1000) + .repartition(2000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication") - OAFDataset - .filter(s => s != null && s.isInstanceOf[DLIDataset]) - .map(s =>s.asInstanceOf[DLIDataset]) - .union(ebi_dataset) - .map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder)) - .groupByKey(_._1)(Encoders.STRING) - .agg(EBIAggregator.getDLIDatasetAggregator().toColumn) - .map(p => p._2) - .repartition(1000) - .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset") + } + def extract_unknown(spark:SparkSession, workingPath:String) :Unit = { + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] OAFDataset .filter(s => s != null && s.isInstanceOf[DLIUnknown]) @@ -82,9 +78,18 @@ object SparkSplitOafTODLIEntities { .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getDLIUnknownAggregator().toColumn) .map(p => p._2) - .repartition(1000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown") + } + + + def extract_relations(spark:SparkSession, workingPath:String) :Unit = { + + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] + + val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf] + val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000) OAFDataset .filter(s => s != null && s.isInstanceOf[Relation]) @@ -94,9 +99,35 @@ object SparkSplitOafTODLIEntities { .groupByKey(_._1)(Encoders.STRING) .agg(EBIAggregator.getRelationAggregator().toColumn) .map(p => p._2) - .repartition(1000) + .repartition(4000) .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") + } + + + def main(args: Array[String]): Unit = { + val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) + val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass) + parser.parseArgument(args) + + val workingPath: String = parser.get("workingPath") + val entity:String = parser.get("entity") + logger.info(s"Working dir path = $workingPath") + + val spark:SparkSession = SparkSession + .builder() + .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .master(parser.get("master")) + .getOrCreate() + + + entity match { + case "publication" => extract_publication(spark, workingPath) + case "dataset" => extract_dataset(spark,workingPath) + case "relation" => extract_relations(spark, workingPath) + case "unknown" => extract_unknown(spark, workingPath) + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json index febcfc898..7878931af 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json @@ -1,4 +1,5 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true} + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}, + {"paramName":"e", "paramLongName":"entity", "paramDescription": "the work dir path", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml index fabe7510b..9d06c42d6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml @@ -14,30 +14,103 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + ${jobTracker} ${nameNode} yarn-cluster cluster - Extract DLI Entities + Extract DLI Entities (Publication) eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 ${sparkExtraOPT} -mt yarn-cluster --workingPath${workingPath} + -epublication + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract DLI Entities (Dataset) + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=5000 + ${sparkExtraOPT} + + -mt yarn-cluster + --workingPath${workingPath} + -edataset + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract DLI Entities (Unknown) + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=5000 + ${sparkExtraOPT} + + -mt yarn-cluster + --workingPath${workingPath} + -eunknown + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Extract DLI Entities (Relation) + eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=5000 + ${sparkExtraOPT} + + -mt yarn-cluster + --workingPath${workingPath} + -erelation diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala index 4d83057f2..373683190 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/graph/SparkScholexplorerAggregationTest.scala @@ -30,7 +30,7 @@ class SparkScholexplorerAggregationTest { implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] - val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").getOrCreate() + val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate() val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication] diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml index 05ca7d4ce..b287e9c88 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/pom.xml @@ -62,6 +62,10 @@ dhp-schemas ${project.version} + + org.apache.httpcomponents + httpmime + org.elasticsearch diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/MakeTar.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/MakeTar.java new file mode 100644 index 000000000..95bea74a2 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/MakeTar.java @@ -0,0 +1,111 @@ + +package eu.dnetlib.dhp.export.zenodo; + +import java.io.*; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class MakeTar implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(MakeTar.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + MakeTar.class + .getResourceAsStream( + "/eu/dnetlib/dhp/export/input_maketar_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("targetPath"); + log.info("hdfsPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + makeTArArchive(fileSystem, inputPath, outputPath); + + } + + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath) throws IOException { + + RemoteIterator dir_iterator = fileSystem.listLocatedStatus(new Path(inputPath)); + + while (dir_iterator.hasNext()) { + LocatedFileStatus fileStatus = dir_iterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + String entity = p_string.substring(p_string.lastIndexOf("/") + 1); + + write(fileSystem, p_string, outputPath + "/" + entity + ".tar", entity); + } + + } + + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + throws IOException { + + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); + + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); + + TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name + ".json.gz"); + entry.setSize(fileStatus.getLen()); + ar.putArchiveEntry(entry); + + InputStream is = fileSystem.open(fileStatus.getPath()); + + BufferedInputStream bis = new BufferedInputStream(is); + + int count; + byte data[] = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + + } + + } + + ar.close(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/SendToZenodoHDFS.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/SendToZenodoHDFS.java new file mode 100644 index 000000000..1dcbf6ccc --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/export/zenodo/SendToZenodoHDFS.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.export.zenodo; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.api.MissingConceptDoiException; +import eu.dnetlib.dhp.common.api.ZenodoAPIClient; + +public class SendToZenodoHDFS implements Serializable { + + private static final Log log = LogFactory.getLog(SendToZenodoHDFS.class); + + public static void main(final String[] args) throws Exception, MissingConceptDoiException { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SendToZenodoHDFS.class + .getResourceAsStream( + "/eu/dnetlib/dhp/export/upload_zenodo.json"))); + + parser.parseArgument(args); + + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("nameNode"); + final String access_token = parser.get("accessToken"); + final String connection_url = parser.get("connectionUrl"); + final String metadata = parser.get("metadata"); + final Boolean newDeposition = Boolean.valueOf(parser.get("newDeposition")); + final String concept_rec_id = Optional + .ofNullable(parser.get("conceptRecordId")) + .orElse(null); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(hdfsPath), true); + ZenodoAPIClient zenodoApiClient = new ZenodoAPIClient(connection_url, access_token); + if (newDeposition) { + zenodoApiClient.newDeposition(); + } else { + if (concept_rec_id == null) { + throw new MissingConceptDoiException("No concept record id has been provided"); + } + zenodoApiClient.newVersion(concept_rec_id); + } + + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + // String tmp = p_string.substring(0, p_string.lastIndexOf("/")); + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + log.info("Sending information for community: " + name); + FSDataInputStream inputStream = fileSystem.open(p); + zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen()); + + } + + } + + zenodoApiClient.sendMretadata(metadata); + zenodoApiClient.publish(); + + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/input_maketar_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/input_maketar_parameters.json new file mode 100644 index 000000000..6d90ced2c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/input_maketar_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "n", + "paramLongName": "nameNode", + "paramDescription": "the Name Node", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the target path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/upload_zenodo.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/upload_zenodo.json new file mode 100644 index 000000000..66676005e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/export/upload_zenodo.json @@ -0,0 +1,45 @@ + +[ + { + "paramName":"nd", + "paramLongName":"newDeposition", + "paramDescription": "if it is a new deposition (true) or a new version (false)", + "paramRequired": true + }, + { + "paramName":"cri", + "paramLongName":"conceptRecordId", + "paramDescription": "The id of the concept record for a new version", + "paramRequired": false + }, + { + "paramName":"hdfsp", + "paramLongName":"hdfsPath", + "paramDescription": "the path of the folder tofind files to send to Zenodo", + "paramRequired": true + }, + { + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "at", + "paramLongName": "accessToken", + "paramDescription": "the access token for the deposition", + "paramRequired": false + }, + { + "paramName":"cu", + "paramLongName":"connectionUrl", + "paramDescription": "the url to connect to deposit", + "paramRequired": false + }, + { + "paramName":"m", + "paramLongName":"metadata", + "paramDescription": "metadata associated to the deposition", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/config-default.xml new file mode 100644 index 000000000..3b9aaca2a --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/config-default.xml @@ -0,0 +1,48 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + oozie.wf.rerun.failnodes + false + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/workflow.xml new file mode 100644 index 000000000..6d7056503 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/sx/zenodo/oozie_app/workflow.xml @@ -0,0 +1,53 @@ + + + + sourcePath + the source path + + + targetPath + the target path + + + metadata + the metadata + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.export.zenodo.MakeTar + -t${targetPath} + -n${nameNode} + -s${sourcePath} + + + + + + + + + eu.dnetlib.dhp.export.zenodo.SendToZenodoHDFS + --hdfsPath/user/dnet.scholexplorer/scholix/provision/scholix.tar/scholix-2020-10-16.tar + --nameNode${nameNode} + --accessTokenb6ddrY6b77WxcDEevn9gqVE5sL5sDNjdUijt75W3o7cQo5vpFFI48dMiu8Gv + --connectionUrlhttps://zenodo.org/api/deposit/depositions + --metadata${metadata} + --conceptRecordId1200252 + --newDepositionfalse + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala index cb04cf9e9..c62d169bc 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/export/ExportDLITOOAFTest.scala @@ -3,14 +3,15 @@ package eu.dnetlib.dhp.export import java.time.LocalDateTime import java.time.format.DateTimeFormatter +import eu.dnetlib.dhp.provision.scholix.Scholix +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary import eu.dnetlib.dhp.schema.oaf.Relation import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication} - import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.junit.jupiter.api.Test import scala.io.Source - +import scala.collection.JavaConverters._ class ExportDLITOOAFTest { val mapper = new ObjectMapper() @@ -22,12 +23,27 @@ class ExportDLITOOAFTest { } + def extractDatasources(s:Scholix):List[String]= { + s.getTarget.getCollectedFrom.asScala.map(c => c.getProvider.getName)(collection.breakOut) + } + + + def extractDatasources(s:ScholixSummary):List[String] = { + + s.getDatasources.asScala.map(c => c.getDatasourceName)(collection.breakOut) + + + } + + @Test def testMappingRele():Unit = { val r:Relation = new Relation r.setSource("60|fbff1d424e045eecf24151a5fe3aa738") r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877") + r.setRelType("IsReferencedBy") + val r1 =DLIToOAF.convertDLIRelation(r) println(r1.getSource, r1.getTarget)