From fbc28ee8c35a4c30d4cc97d01557ab569cf907f4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 7 Feb 2022 18:32:08 +0100 Subject: [PATCH] [OpenCitation] change the integration logic to consider dois with commas inside --- .../CreateActionSetSparkJob.java | 33 ++++-- .../actionmanager/opencitations/ReadCOCI.java | 111 ++++++++++++++++++ .../opencitations/model/COCI.java | 89 ++++++++++++++ .../input_readcoci_parameters.json | 36 ++++++ .../opencitations/oozie_app/workflow.xml | 27 +++++ .../CreateOpenCitationsASTest.java | 22 ++-- .../opencitations/ReadCOCITest.java | 94 +++++++++++++++ .../opencitations/COCI/input1/_SUCCESS | 0 ...b-77f8-4059-91c0-5521309823f8-c000.json.gz | Bin 0 -> 346 bytes .../opencitations/COCI/input2/_SUCCESS | 0 ...6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz | Bin 0 -> 306 bytes .../opencitations/COCI/input3/_SUCCESS | 0 ...c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz | Bin 0 -> 316 bytes .../opencitations/COCI/input4/_SUCCESS | 0 ...b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz | Bin 0 -> 137 bytes .../opencitations/inputFiles/input4 | 2 + .../raw/AbstractMdRecordToOafMapper.java | 4 +- dhp-workflows/pom.xml | 2 +- 18 files changed, 394 insertions(+), 26 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/part-00000-b05c4abb-77f8-4059-91c0-5521309823f8-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/part-00000-6831e1e6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/part-00000-2ec9f31c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/part-00000-2ba3f17b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4 diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index ea5fea96f..c16f8eeea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -83,10 +85,16 @@ public class CreateActionSetSparkJob implements Serializable { private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { spark - .sqlContext() - .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) + .read() + .textFile(inputPath + "/*") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), + Encoders.bean(COCI.class)) +// spark +// .sqlContext() +// .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) .flatMap( - (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), + (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), Encoders.bean(Relation.class)) .filter((FilterFunction) value -> value != null) .toJavaRDD() @@ -98,15 +106,14 @@ public class CreateActionSetSparkJob implements Serializable { } - private static List createRelation(String value, boolean duplicate) { - String[] line = value.split(","); - if (!line[1].startsWith("10.")) { - return new ArrayList<>(); - } + private static List createRelation(COCI value, boolean duplicate) { + List relationList = new ArrayList<>(); - String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); - final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + String citing = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); + final String cited = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); relationList .addAll( @@ -114,9 +121,11 @@ public class CreateActionSetSparkJob implements Serializable { citing, cited)); - if (duplicate && line[1].endsWith(".refs")) { + if (duplicate && value.getCiting().endsWith(".refs")) { citing = ID_PREFIX + IdentifierFactory - .md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); + .md5( + CleaningFunctions + .normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); relationList.addAll(getRelations(citing, cited)); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java new file mode 100644 index 000000000..7ac1f2de3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -0,0 +1,111 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; +import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ReadCOCI implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ReadCOCI.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + SparkConf sconf = new SparkConf(); + + final String delimiter = Optional + .ofNullable(parser.get("delimiter")) + .orElse(DEFAULT_DELIMITER); + + runWithSparkSession( + sconf, + isSparkSessionManaged, + spark -> { + doRead( + spark, + fileSystem, + inputPath, + outputPath, + delimiter); + }); + } + + public static void doRead(SparkSession spark, FileSystem fileSystem, String inputPath, String outputPath, + String delimiter) throws IOException { + + RemoteIterator iterator = fileSystem + .listFiles( + new Path(inputPath), true); + + while (iterator.hasNext()) { + LocatedFileStatus fileStatus = iterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + Dataset cociData = spark + .read() + .format("csv") + .option("sep", delimiter) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(p_string); + + cociData.map((MapFunction) row -> { + COCI coci = new COCI(); + coci.setOci(row.getString(0)); + coci.setCiting(row.getString(1)); + coci.setCited(row.getString(2)); + return coci; + }, Encoders.bean(COCI.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + p_string.substring(p_string.lastIndexOf("/") + 1)); + } + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java new file mode 100644 index 000000000..a7b3330ea --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java @@ -0,0 +1,89 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations.model; + +import java.io.Serializable; + +import com.opencsv.bean.CsvBindByPosition; + +public class COCI implements Serializable { + @CsvBindByPosition(position = 0) +// @CsvBindByName(column = "doi") + private String oci; + + @CsvBindByPosition(position = 1) +// @CsvBindByName(column = "level1") + private String citing; + + @CsvBindByPosition(position = 2) +// @CsvBindByName(column = "level2") + private String cited; + + @CsvBindByPosition(position = 3) +// @CsvBindByName(column = "level3") + private String creation; + + @CsvBindByPosition(position = 4) + private String timespan; + + @CsvBindByPosition(position = 5) + private String journal_sc; + + @CsvBindByPosition(position = 6) + private String author_sc; + + public String getOci() { + return oci; + } + + public void setOci(String oci) { + this.oci = oci; + } + + public String getCiting() { + return citing; + } + + public void setCiting(String citing) { + this.citing = citing; + } + + public String getCited() { + return cited; + } + + public void setCited(String cited) { + this.cited = cited; + } + + public String getCreation() { + return creation; + } + + public void setCreation(String creation) { + this.creation = creation; + } + + public String getTimespan() { + return timespan; + } + + public void setTimespan(String timespan) { + this.timespan = timespan; + } + + public String getJournal_sc() { + return journal_sc; + } + + public void setJournal_sc(String journal_sc) { + this.journal_sc = journal_sc; + } + + public String getAuthor_sc() { + return author_sc; + } + + public void setAuthor_sc(String author_sc) { + this.author_sc = author_sc; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json new file mode 100644 index 000000000..14c20f762 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -0,0 +1,36 @@ +[ + { + "paramName": "sp", + "paramLongName": "sourcePath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + + { + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "the hdfs name node", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "d", + "paramLongName": "delimiter", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +] + + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index d052791a3..7276d2d3e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -26,6 +26,7 @@ ${wf:conf('resumeFrom') eq 'DownloadDump'} ${wf:conf('resumeFrom') eq 'ExtractContent'} + ${wf:conf('resumeFrom') eq 'ReadContent'} @@ -60,6 +61,32 @@ --inputFile${inputFile} --workingPath${workingPath} + + + + + + + yarn + cluster + Produces the AS for OC + eu.dnetlib.dhp.actionmanager.opencitations.ReadCOCI + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingPath}/COCI + --outputPath${workingDir}/COCI + --nameNode${nameNode} + --delimiter${delimiter} + diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index 5153c412f..3e4ce750e 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -99,7 +99,7 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(60, tmp.count()); + assertEquals(62, tmp.count()); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); @@ -110,7 +110,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -131,7 +131,7 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(44, tmp.count()); + assertEquals(46, tmp.count()); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); @@ -142,7 +142,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -175,7 +175,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -215,7 +215,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -240,8 +240,8 @@ public class CreateOpenCitationsASTest { assertEquals("citation", r.getSubRelType()); assertEquals("resultResult", r.getRelType()); }); - assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); - assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); + assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); + assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); } @@ -250,7 +250,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -295,7 +295,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java new file mode 100644 index 000000000..e1b9c4d23 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java @@ -0,0 +1,94 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.schema.oaf.Dataset; + +public class ReadCOCITest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(ReadCOCITest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(ReadCOCITest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(ReadCOCITest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(ReadCOCITest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testReadCOCI() throws Exception { + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + ReadCOCI + .doRead( + spark, FileSystem.getLocal(new Configuration()), inputPath, + workingDir.toString() + "/COCI", DEFAULT_DELIMITER); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/COCI/*/") + .map(item -> OBJECT_MAPPER.readValue(item, COCI.class)); + + Assertions.assertEquals(23, tmp.count()); + + Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count()); + + Assertions.assertEquals(8, tmp.filter(c -> c.getCiting().indexOf(".refs") > -1).count()); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/part-00000-b05c4abb-77f8-4059-91c0-5521309823f8-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/part-00000-b05c4abb-77f8-4059-91c0-5521309823f8-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..c55dcd71c6171ba9cf8c61bb0d51852c8c5104e8 GIT binary patch literal 346 zcmV-g0j2&QiwFP!000000L7EdYQr!PgztR}pIWGt{gGsC-^RG1bs;GvITS+Qy|YRP z2}vluxE9E3X=HypjQ8C*?Ut|IOUD!$*Wx2`&K$(JL?Nn?Bw_yQo?SYv-;P?Mez9e$ zIR)YzwA2(_^f^vY5RO8EtJfm6)s-@(qU1Xnccp?gtf+>Az6~vG+PemWp%XGZ6|6~n zV?{-%)1g-Fz904%eO$1mAdJc8XyCp+wMO_@*)=j3SU)Z|!)jXZ;5=(2i&SRfc7vPo zES#ig65%Q;cC?*Io9))-a;t+gF0x?88q$B74hpxrrAJYvo2C=ltN;K2 literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/part-00000-6831e1e6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/part-00000-6831e1e6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..ae7886e8c57b098106a3c52959788b361cadc034 GIT binary patch literal 306 zcmV-20nPp&iwFP!000000KJq=PQx$|h4&mIE08dG^J9;P>y!%0f=UDlq>8&U<5Hvo z-NuRHjNcpkiyiwgZFXS%ZH}Z=hfkyk=neFG%cMfUxRMK z)=*>}SV9~A=srf((E7;a+EcBLV`*^<(Ff%U{DaK%^L}$z&1@}FmPXD>EEXA-*Ke*$uJhA& zSqgK}s#apRtuU1}+o1tyq}(|b2FFuF(~*1Jf0w6Hs;+b%oKK|;!aN7Q0BvPIpR5D` E0R117`Tzg` literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/part-00000-2ec9f31c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/part-00000-2ec9f31c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..8374019193373d40ded63283b49baf2bedbc8271 GIT binary patch literal 316 zcmV-C0mJ?uiwFP!000000L@cDPQ)+_yz`8nX*#x>xZyj?g319T7SKv7{?0gEAwWgq z)TKSNNyhfrGdXp;`DK6G-E@MKP0N505cBRjPUgq^s!t+P&7rF4d4G2x>S;F< z0%a89Q7fo$OkBrXVO()PUUPLBCF|5jMCO^IFal8)k%Q>&{p$3U7mLd9kc9mDnsH1F z^#qvIblimKf)Fb$sVu~cF(#9SMWL3{&D+-^pSjs;D{qI_Bx<>C%iTmLL*uW+XVjnU z76Vv?)^T$K#54m{F|P9($=c-t#?;1+bNhoa#V9TD|H_!WRcF~3cdi+es5w#sIM1pojL%bM;0 literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/part-00000-2ba3f17b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/part-00000-2ba3f17b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..0436b10ffee3c5aa7b224d814fd1b437aed14221 GIT binary patch literal 137 zcmb2|=3syT(>_N&CPNO_?{bS@OV&=A7PbEJOrKXy;=7FR#lF&Dae7euDSqMC=et_G z1^Sw%TxM2fzRJCMRic#7{#<*FM-x(dj|T1B=DOTO+3#+2c?*Y;l&kK%+H?Odx=M5^ og)Y*lQ<`a|`gxtiR_%m0?d<>T_9e~}<$Zsc{V1()); // NOT PRESENT IN MDSTORES r - .setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); + .setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); r - .setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); + .setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); r.setInstance(instances); r.setBestaccessright(OafMapperUtils.createBestAccessRights(instances)); diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 53d029467..143178560 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -44,7 +44,7 @@ iis-releases iis releases plugin repository - http://maven.ceon.pl/artifactory/iis-releases + https://maven.ceon.pl/artifactory/iis-releases default