From fbc28ee8c35a4c30d4cc97d01557ab569cf907f4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 7 Feb 2022 18:32:08 +0100 Subject: [PATCH 1/5] [OpenCitation] change the integration logic to consider dois with commas inside --- .../CreateActionSetSparkJob.java | 33 ++++-- .../actionmanager/opencitations/ReadCOCI.java | 111 ++++++++++++++++++ .../opencitations/model/COCI.java | 89 ++++++++++++++ .../input_readcoci_parameters.json | 36 ++++++ .../opencitations/oozie_app/workflow.xml | 27 +++++ .../CreateOpenCitationsASTest.java | 22 ++-- .../opencitations/ReadCOCITest.java | 94 +++++++++++++++ .../opencitations/COCI/input1/_SUCCESS | 0 ...b-77f8-4059-91c0-5521309823f8-c000.json.gz | Bin 0 -> 346 bytes .../opencitations/COCI/input2/_SUCCESS | 0 ...6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz | Bin 0 -> 306 bytes .../opencitations/COCI/input3/_SUCCESS | 0 ...c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz | Bin 0 -> 316 bytes .../opencitations/COCI/input4/_SUCCESS | 0 ...b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz | Bin 0 -> 137 bytes .../opencitations/inputFiles/input4 | 2 + .../raw/AbstractMdRecordToOafMapper.java | 4 +- dhp-workflows/pom.xml | 2 +- 18 files changed, 394 insertions(+), 26 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/part-00000-b05c4abb-77f8-4059-91c0-5521309823f8-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/part-00000-6831e1e6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/part-00000-2ec9f31c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/part-00000-2ba3f17b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4 diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index ea5fea96f..c16f8eeea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -83,10 +85,16 @@ public class CreateActionSetSparkJob implements Serializable { private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { spark - .sqlContext() - .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) + .read() + .textFile(inputPath + "/*") + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), + Encoders.bean(COCI.class)) +// spark +// .sqlContext() +// .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) .flatMap( - (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), + (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), Encoders.bean(Relation.class)) .filter((FilterFunction) value -> value != null) .toJavaRDD() @@ -98,15 +106,14 @@ public class CreateActionSetSparkJob implements Serializable { } - private static List createRelation(String value, boolean duplicate) { - String[] line = value.split(","); - if (!line[1].startsWith("10.")) { - return new ArrayList<>(); - } + private static List createRelation(COCI value, boolean duplicate) { + List relationList = new ArrayList<>(); - String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1])); - final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2])); + String citing = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); + final String cited = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); relationList .addAll( @@ -114,9 +121,11 @@ public class CreateActionSetSparkJob implements Serializable { citing, cited)); - if (duplicate && line[1].endsWith(".refs")) { + if (duplicate && value.getCiting().endsWith(".refs")) { citing = ID_PREFIX + IdentifierFactory - .md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs")))); + .md5( + CleaningFunctions + .normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); relationList.addAll(getRelations(citing, cited)); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java new file mode 100644 index 000000000..7ac1f2de3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -0,0 +1,111 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; +import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class ReadCOCI implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ReadCOCI.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String inputPath = parser.get("sourcePath"); + log.info("input path : {}", inputPath); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + SparkConf sconf = new SparkConf(); + + final String delimiter = Optional + .ofNullable(parser.get("delimiter")) + .orElse(DEFAULT_DELIMITER); + + runWithSparkSession( + sconf, + isSparkSessionManaged, + spark -> { + doRead( + spark, + fileSystem, + inputPath, + outputPath, + delimiter); + }); + } + + public static void doRead(SparkSession spark, FileSystem fileSystem, String inputPath, String outputPath, + String delimiter) throws IOException { + + RemoteIterator iterator = fileSystem + .listFiles( + new Path(inputPath), true); + + while (iterator.hasNext()) { + LocatedFileStatus fileStatus = iterator.next(); + + Path p = fileStatus.getPath(); + String p_string = p.toString(); + Dataset cociData = spark + .read() + .format("csv") + .option("sep", delimiter) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(p_string); + + cociData.map((MapFunction) row -> { + COCI coci = new COCI(); + coci.setOci(row.getString(0)); + coci.setCiting(row.getString(1)); + coci.setCited(row.getString(2)); + return coci; + }, Encoders.bean(COCI.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + p_string.substring(p_string.lastIndexOf("/") + 1)); + } + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java new file mode 100644 index 000000000..a7b3330ea --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java @@ -0,0 +1,89 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations.model; + +import java.io.Serializable; + +import com.opencsv.bean.CsvBindByPosition; + +public class COCI implements Serializable { + @CsvBindByPosition(position = 0) +// @CsvBindByName(column = "doi") + private String oci; + + @CsvBindByPosition(position = 1) +// @CsvBindByName(column = "level1") + private String citing; + + @CsvBindByPosition(position = 2) +// @CsvBindByName(column = "level2") + private String cited; + + @CsvBindByPosition(position = 3) +// @CsvBindByName(column = "level3") + private String creation; + + @CsvBindByPosition(position = 4) + private String timespan; + + @CsvBindByPosition(position = 5) + private String journal_sc; + + @CsvBindByPosition(position = 6) + private String author_sc; + + public String getOci() { + return oci; + } + + public void setOci(String oci) { + this.oci = oci; + } + + public String getCiting() { + return citing; + } + + public void setCiting(String citing) { + this.citing = citing; + } + + public String getCited() { + return cited; + } + + public void setCited(String cited) { + this.cited = cited; + } + + public String getCreation() { + return creation; + } + + public void setCreation(String creation) { + this.creation = creation; + } + + public String getTimespan() { + return timespan; + } + + public void setTimespan(String timespan) { + this.timespan = timespan; + } + + public String getJournal_sc() { + return journal_sc; + } + + public void setJournal_sc(String journal_sc) { + this.journal_sc = journal_sc; + } + + public String getAuthor_sc() { + return author_sc; + } + + public void setAuthor_sc(String author_sc) { + this.author_sc = author_sc; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json new file mode 100644 index 000000000..14c20f762 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -0,0 +1,36 @@ +[ + { + "paramName": "sp", + "paramLongName": "sourcePath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + + { + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "the hdfs name node", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "d", + "paramLongName": "delimiter", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +] + + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index d052791a3..7276d2d3e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -26,6 +26,7 @@ ${wf:conf('resumeFrom') eq 'DownloadDump'} ${wf:conf('resumeFrom') eq 'ExtractContent'} + ${wf:conf('resumeFrom') eq 'ReadContent'} @@ -60,6 +61,32 @@ --inputFile${inputFile} --workingPath${workingPath} + + + + + + + yarn + cluster + Produces the AS for OC + eu.dnetlib.dhp.actionmanager.opencitations.ReadCOCI + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingPath}/COCI + --outputPath${workingDir}/COCI + --nameNode${nameNode} + --delimiter${delimiter} + diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index 5153c412f..3e4ce750e 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -76,7 +76,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -99,7 +99,7 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(60, tmp.count()); + assertEquals(62, tmp.count()); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); @@ -110,7 +110,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -131,7 +131,7 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(44, tmp.count()); + assertEquals(46, tmp.count()); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); @@ -142,7 +142,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -175,7 +175,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -215,7 +215,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -240,8 +240,8 @@ public class CreateOpenCitationsASTest { assertEquals("citation", r.getSubRelType()); assertEquals("resultResult", r.getRelType()); }); - assertEquals(22, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); - assertEquals(22, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); + assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); + assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); } @@ -250,7 +250,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob @@ -295,7 +295,7 @@ public class CreateOpenCitationsASTest { String inputPath = getClass() .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + "/eu/dnetlib/dhp/actionmanager/opencitations/COCI") .getPath(); CreateActionSetSparkJob diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java new file mode 100644 index 000000000..e1b9c4d23 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java @@ -0,0 +1,94 @@ + +package eu.dnetlib.dhp.actionmanager.opencitations; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; +import eu.dnetlib.dhp.schema.oaf.Dataset; + +public class ReadCOCITest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(ReadCOCITest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(ReadCOCITest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(ReadCOCITest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(ReadCOCITest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testReadCOCI() throws Exception { + String inputPath = getClass() + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); + + ReadCOCI + .doRead( + spark, FileSystem.getLocal(new Configuration()), inputPath, + workingDir.toString() + "/COCI", DEFAULT_DELIMITER); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/COCI/*/") + .map(item -> OBJECT_MAPPER.readValue(item, COCI.class)); + + Assertions.assertEquals(23, tmp.count()); + + Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count()); + + Assertions.assertEquals(8, tmp.filter(c -> c.getCiting().indexOf(".refs") > -1).count()); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/part-00000-b05c4abb-77f8-4059-91c0-5521309823f8-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input1/part-00000-b05c4abb-77f8-4059-91c0-5521309823f8-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..c55dcd71c6171ba9cf8c61bb0d51852c8c5104e8 GIT binary patch literal 346 zcmV-g0j2&QiwFP!000000L7EdYQr!PgztR}pIWGt{gGsC-^RG1bs;GvITS+Qy|YRP z2}vluxE9E3X=HypjQ8C*?Ut|IOUD!$*Wx2`&K$(JL?Nn?Bw_yQo?SYv-;P?Mez9e$ zIR)YzwA2(_^f^vY5RO8EtJfm6)s-@(qU1Xnccp?gtf+>Az6~vG+PemWp%XGZ6|6~n zV?{-%)1g-Fz904%eO$1mAdJc8XyCp+wMO_@*)=j3SU)Z|!)jXZ;5=(2i&SRfc7vPo zES#ig65%Q;cC?*Io9))-a;t+gF0x?88q$B74hpxrrAJYvo2C=ltN;K2 literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/part-00000-6831e1e6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input2/part-00000-6831e1e6-f472-40fa-985a-a4f3c74f9b53-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..ae7886e8c57b098106a3c52959788b361cadc034 GIT binary patch literal 306 zcmV-20nPp&iwFP!000000KJq=PQx$|h4&mIE08dG^J9;P>y!%0f=UDlq>8&U<5Hvo z-NuRHjNcpkiyiwgZFXS%ZH}Z=hfkyk=neFG%cMfUxRMK z)=*>}SV9~A=srf((E7;a+EcBLV`*^<(Ff%U{DaK%^L}$z&1@}FmPXD>EEXA-*Ke*$uJhA& zSqgK}s#apRtuU1}+o1tyq}(|b2FFuF(~*1Jf0w6Hs;+b%oKK|;!aN7Q0BvPIpR5D` E0R117`Tzg` literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/part-00000-2ec9f31c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input3/part-00000-2ec9f31c-5b5c-4c65-92b7-7a6928da5cdb-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..8374019193373d40ded63283b49baf2bedbc8271 GIT binary patch literal 316 zcmV-C0mJ?uiwFP!000000L@cDPQ)+_yz`8nX*#x>xZyj?g319T7SKv7{?0gEAwWgq z)TKSNNyhfrGdXp;`DK6G-E@MKP0N505cBRjPUgq^s!t+P&7rF4d4G2x>S;F< z0%a89Q7fo$OkBrXVO()PUUPLBCF|5jMCO^IFal8)k%Q>&{p$3U7mLd9kc9mDnsH1F z^#qvIblimKf)Fb$sVu~cF(#9SMWL3{&D+-^pSjs;D{qI_Bx<>C%iTmLL*uW+XVjnU z76Vv?)^T$K#54m{F|P9($=c-t#?;1+bNhoa#V9TD|H_!WRcF~3cdi+es5w#sIM1pojL%bM;0 literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/part-00000-2ba3f17b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input4/part-00000-2ba3f17b-f97d-449d-bd08-04a9b935bfd2-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..0436b10ffee3c5aa7b224d814fd1b437aed14221 GIT binary patch literal 137 zcmb2|=3syT(>_N&CPNO_?{bS@OV&=A7PbEJOrKXy;=7FR#lF&Dae7euDSqMC=et_G z1^Sw%TxM2fzRJCMRic#7{#<*FM-x(dj|T1B=DOTO+3#+2c?*Y;l&kK%+H?Odx=M5^ og)Y*lQ<`a|`gxtiR_%m0?d<>T_9e~}<$Zsc{V1()); // NOT PRESENT IN MDSTORES r - .setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); + .setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); r - .setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); + .setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); r.setInstance(instances); r.setBestaccessright(OafMapperUtils.createBestAccessRights(instances)); diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 53d029467..143178560 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -44,7 +44,7 @@ iis-releases iis releases plugin repository - http://maven.ceon.pl/artifactory/iis-releases + https://maven.ceon.pl/artifactory/iis-releases default From b071f8e4154d6100db1acfeff9f6ba34efabc21d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 8 Feb 2022 15:37:28 +0100 Subject: [PATCH 2/5] [OpenCitation] change to extract in json format each folder just onece --- .../actionmanager/opencitations/ReadCOCI.java | 34 +++++--------- .../input_readcoci_parameters.json | 19 ++++---- .../opencitations/oozie_app/workflow.xml | 8 ++-- .../opencitations/ReadCOCITest.java | 47 +++++++++++++++++-- 4 files changed, 70 insertions(+), 38 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 7ac1f2de3..fd83f7072 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -41,18 +41,14 @@ public class ReadCOCI implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String hdfsNameNode = parser.get("nameNode"); - log.info("nameNode: {}", hdfsNameNode); - - final String inputPath = parser.get("sourcePath"); - log.info("input path : {}", inputPath); + final String[] inputFile = parser.get("inputFile").split(";"); + log.info("inputFile {}", inputFile.toString()); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsNameNode); + final String workingPath = parser.get("workingPath"); + log.info("workingPath {}", workingPath); - FileSystem fileSystem = FileSystem.get(conf); SparkConf sconf = new SparkConf(); final String delimiter = Optional @@ -65,25 +61,20 @@ public class ReadCOCI implements Serializable { spark -> { doRead( spark, - fileSystem, - inputPath, + workingPath, + inputFile, outputPath, delimiter); }); } - public static void doRead(SparkSession spark, FileSystem fileSystem, String inputPath, String outputPath, + private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, + String outputPath, String delimiter) throws IOException { - RemoteIterator iterator = fileSystem - .listFiles( - new Path(inputPath), true); + for(String inputFile : inputFiles){ + String p_string = workingPath + "/" + inputFile ; - while (iterator.hasNext()) { - LocatedFileStatus fileStatus = iterator.next(); - - Path p = fileStatus.getPath(); - String p_string = p.toString(); Dataset cociData = spark .read() .format("csv") @@ -91,7 +82,8 @@ public class ReadCOCI implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(p_string); + .load(p_string) + .repartition(100); cociData.map((MapFunction) row -> { COCI coci = new COCI(); @@ -103,7 +95,7 @@ public class ReadCOCI implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + p_string.substring(p_string.lastIndexOf("/") + 1)); + .json(outputPath + inputFile); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index 14c20f762..b57cb5d9a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -1,17 +1,12 @@ [ { - "paramName": "sp", - "paramLongName": "sourcePath", + "paramName": "wp", + "paramLongName": "workingPath", "paramDescription": "the zipped opencitations file", "paramRequired": true }, - { - "paramName": "nn", - "paramLongName": "nameNode", - "paramDescription": "the hdfs name node", - "paramRequired": true - }, + { "paramName": "issm", "paramLongName": "isSparkSessionManaged", @@ -28,7 +23,13 @@ "paramName": "op", "paramLongName": "outputPath", "paramDescription": "the hdfs name node", - "paramRequired": false + "paramRequired": true + }, + { + "paramName": "if", + "paramLongName": "inputFile", + "paramDescription": "the hdfs name node", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index 7276d2d3e..aee2559ee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -82,10 +82,10 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --sourcePath${workingPath}/COCI - --outputPath${workingDir}/COCI - --nameNode${nameNode} + --workingPath${workingPath}/COCI + --outputPath${workingPath}/COCI_JSON --delimiter${delimiter} + --inputFile${inputFileCoci} @@ -108,7 +108,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${workingPath}/COCI + --inputPath${workingPath}/COCI_JSON --outputPath${outputPath} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java index e1b9c4d23..27627f9f6 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java @@ -10,6 +10,7 @@ import java.nio.file.Path; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -73,15 +74,53 @@ public class ReadCOCITest { "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") .getPath(); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input1") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input1")); + + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input2") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input2")); + + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input3") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input3")); + + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input4") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4")); + ReadCOCI - .doRead( - spark, FileSystem.getLocal(new Configuration()), inputPath, - workingDir.toString() + "/COCI", DEFAULT_DELIMITER); + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-workingPath", + workingDir.toString() + "/COCI", + "-outputPath", + workingDir.toString() + "/COCI_json/", + "-inputFile", "input1;input2;input3;input4" + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/COCI/*/") + .textFile(workingDir.toString() + "/COCI_json/*/") .map(item -> OBJECT_MAPPER.readValue(item, COCI.class)); Assertions.assertEquals(23, tmp.count()); From 759ed519f251e976ce34571bd351713bb6c429a8 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 8 Feb 2022 16:15:34 +0100 Subject: [PATCH 3/5] [OpenCitation] added logic to avoid the genration of self citations relations --- .../CreateActionSetSparkJob.java | 24 ++++++++++-------- .../opencitations/ReadCOCITest.java | 11 ++++++-- .../opencitations/COCI/input5/_SUCCESS | 0 ...e-90e3-4791-821a-b84636bc13e2-c000.json.gz | Bin 0 -> 20 bytes ...e-90e3-4791-821a-b84636bc13e2-c000.json.gz | Bin 0 -> 108 bytes .../opencitations/inputFiles/input5 | 2 ++ 6 files changed, 24 insertions(+), 13 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/_SUCCESS create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/part-00000-d6d1dc6e-90e3-4791-821a-b84636bc13e2-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/part-00061-d6d1dc6e-90e3-4791-821a-b84636bc13e2-c000.json.gz create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input5 diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index c16f8eeea..4051bc6f0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -115,18 +115,20 @@ public class CreateActionSetSparkJob implements Serializable { final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); - relationList - .addAll( - getRelations( - citing, - cited)); + if(!citing.equals(cited)){ + relationList + .addAll( + getRelations( + citing, + cited)); - if (duplicate && value.getCiting().endsWith(".refs")) { - citing = ID_PREFIX + IdentifierFactory - .md5( - CleaningFunctions - .normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); - relationList.addAll(getRelations(citing, cited)); + if (duplicate && value.getCiting().endsWith(".refs")) { + citing = ID_PREFIX + IdentifierFactory + .md5( + CleaningFunctions + .normalizePidValue("doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); + relationList.addAll(getRelations(citing, cited)); + } } return relationList; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java index 27627f9f6..53af074e1 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCITest.java @@ -103,6 +103,13 @@ public class ReadCOCITest { .getPath()), new org.apache.hadoop.fs.Path(workingDir + "/COCI/input4")); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles/input5") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir + "/COCI/input5")); + ReadCOCI .main( new String[] { @@ -112,7 +119,7 @@ public class ReadCOCITest { workingDir.toString() + "/COCI", "-outputPath", workingDir.toString() + "/COCI_json/", - "-inputFile", "input1;input2;input3;input4" + "-inputFile", "input1;input2;input3;input4;input5" }); @@ -123,7 +130,7 @@ public class ReadCOCITest { .textFile(workingDir.toString() + "/COCI_json/*/") .map(item -> OBJECT_MAPPER.readValue(item, COCI.class)); - Assertions.assertEquals(23, tmp.count()); + Assertions.assertEquals(24, tmp.count()); Assertions.assertEquals(1, tmp.filter(c -> c.getCiting().equals("10.1207/s15327647jcd3,4-01")).count()); diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/_SUCCESS b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/_SUCCESS new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/part-00000-d6d1dc6e-90e3-4791-821a-b84636bc13e2-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/part-00000-d6d1dc6e-90e3-4791-821a-b84636bc13e2-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..001322f84b053326cd87d0b2f1df13fddaa4da35 GIT binary patch literal 20 Rcmb2|=3syTW+=_T000et0JZ=C literal 0 HcmV?d00001 diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/part-00061-d6d1dc6e-90e3-4791-821a-b84636bc13e2-c000.json.gz b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/opencitations/COCI/input5/part-00061-d6d1dc6e-90e3-4791-821a-b84636bc13e2-c000.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..12968af39e578a2c9afae321fee5a918e8ad8ee1 GIT binary patch literal 108 zcmb2|=3sz;si$`HHW=`*T$pDVBW!W}XT%y|0WW4&5s!|$hB^%e#n Date: Tue, 8 Feb 2022 16:23:05 +0100 Subject: [PATCH 4/5] [OpenCitation] refactoring --- .../actionmanager/opencitations/CreateActionSetSparkJob.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index 4051bc6f0..f230a7fd7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -90,9 +90,6 @@ public class CreateActionSetSparkJob implements Serializable { .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, COCI.class), Encoders.bean(COCI.class)) -// spark -// .sqlContext() -// .createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING()) .flatMap( (FlatMapFunction) value -> createRelation(value, shouldDuplicateRels).iterator(), Encoders.bean(Relation.class)) From 1490867cc7886746b5b9925b9bd5a3ebd7499cb5 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Feb 2022 14:52:12 +0100 Subject: [PATCH 5/5] [OpenCitation] cleaning of the COCI model --- .../opencitations/model/COCI.java | 50 +------------------ 1 file changed, 1 insertion(+), 49 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java index a7b3330ea..bad4a5a3b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/model/COCI.java @@ -6,30 +6,12 @@ import java.io.Serializable; import com.opencsv.bean.CsvBindByPosition; public class COCI implements Serializable { - @CsvBindByPosition(position = 0) -// @CsvBindByName(column = "doi") private String oci; - @CsvBindByPosition(position = 1) -// @CsvBindByName(column = "level1") private String citing; - @CsvBindByPosition(position = 2) -// @CsvBindByName(column = "level2") private String cited; - @CsvBindByPosition(position = 3) -// @CsvBindByName(column = "level3") - private String creation; - - @CsvBindByPosition(position = 4) - private String timespan; - - @CsvBindByPosition(position = 5) - private String journal_sc; - - @CsvBindByPosition(position = 6) - private String author_sc; public String getOci() { return oci; @@ -55,35 +37,5 @@ public class COCI implements Serializable { this.cited = cited; } - public String getCreation() { - return creation; - } - - public void setCreation(String creation) { - this.creation = creation; - } - - public String getTimespan() { - return timespan; - } - - public void setTimespan(String timespan) { - this.timespan = timespan; - } - - public String getJournal_sc() { - return journal_sc; - } - - public void setJournal_sc(String journal_sc) { - this.journal_sc = journal_sc; - } - - public String getAuthor_sc() { - return author_sc; - } - - public void setAuthor_sc(String author_sc) { - this.author_sc = author_sc; - } + }