diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/model/DOAJModel.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/model/DOAJModel.java index 4b5dc22a61..c3b6f1f30f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/model/DOAJModel.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/model/DOAJModel.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap.model; import java.io.Serializable; +import java.util.List; import com.opencsv.bean.CsvBindByName; @@ -17,7 +18,17 @@ public class DOAJModel implements Serializable { private String eissn; @CsvBindByName(column = "Review process") - private String reviewProcess; + private List reviewProcess; + + private Integer oaStart; + + public Integer getOaStart() { + return oaStart; + } + + public void setOaStart(Integer oaStart) { + this.oaStart = oaStart; + } public String getJournalTitle() { return journalTitle; @@ -43,11 +54,11 @@ public class DOAJModel implements Serializable { this.eissn = eissn; } - public String getReviewProcess() { + public List getReviewProcess() { return reviewProcess; } - public void setReviewProcess(String reviewProcess) { + public void setReviewProcess(List reviewProcess) { this.reviewProcess = reviewProcess; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkProduceHostedByMap.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkProduceHostedByMap.scala index 8d8965866f..bdf0861ae2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkProduceHostedByMap.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkProduceHostedByMap.scala @@ -2,9 +2,10 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DOAJModel, UnibiGoldModel} import eu.dnetlib.dhp.schema.oaf.Datasource -import org.apache.commons.io.IOUtils +import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.compress.GzipCodec @@ -13,7 +14,8 @@ import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.json4s.DefaultFormats import org.slf4j.{Logger, LoggerFactory} -import java.io.PrintWriter +import java.io.{File, PrintWriter} +import scala.collection.JavaConverters._ object SparkProduceHostedByMap { @@ -34,7 +36,9 @@ object SparkProduceHostedByMap { openaire.journal_id, "", "", - isOpenAccess + isOpenAccess, + -1, + List[String]() ) case Constants.EISSN => HostedByItemType( @@ -43,7 +47,9 @@ object SparkProduceHostedByMap { "", openaire.journal_id, "", - isOpenAccess + isOpenAccess, + -1, + List[String]() ) case Constants.ISSNL => HostedByItemType( @@ -52,7 +58,9 @@ object SparkProduceHostedByMap { "", "", openaire.journal_id, - isOpenAccess + isOpenAccess, + -1, + List[String]() ) // catch the default with a variable so you can print it @@ -77,34 +85,36 @@ object SparkProduceHostedByMap { issn: String, eissn: String, issnl: String, - oa: Boolean + oa: Boolean, + oaDate: Int, + reviewProcess: List[String] ): HostedByItemType = { if (issn != null) { if (eissn != null) { if (issnl != null) { - HostedByItemType(id, officialname, issn, eissn, issnl, oa) + HostedByItemType(id, officialname, issn, eissn, issnl, oa, oaDate, reviewProcess) } else { - HostedByItemType(id, officialname, issn, eissn, "", oa) + HostedByItemType(id, officialname, issn, eissn, "", oa, oaDate, reviewProcess) } } else { if (issnl != null) { - HostedByItemType(id, officialname, issn, "", issnl, oa) + HostedByItemType(id, officialname, issn, "", issnl, oa, oaDate, reviewProcess) } else { - HostedByItemType(id, officialname, issn, "", "", oa) + HostedByItemType(id, officialname, issn, "", "", oa, oaDate, reviewProcess) } } } else { if (eissn != null) { if (issnl != null) { - HostedByItemType(id, officialname, "", eissn, issnl, oa) + HostedByItemType(id, officialname, "", eissn, issnl, oa, oaDate, reviewProcess) } else { - HostedByItemType(id, officialname, "", eissn, "", oa) + HostedByItemType(id, officialname, "", eissn, "", oa, oaDate, reviewProcess) } } else { if (issnl != null) { - HostedByItemType(id, officialname, "", "", issnl, oa) + HostedByItemType(id, officialname, "", "", issnl, oa, oaDate, reviewProcess) } else { - HostedByItemType("", "", "", "", "", oa) + HostedByItemType("", "", "", "", "", oa, oaDate, reviewProcess) } } } @@ -119,10 +129,12 @@ object SparkProduceHostedByMap { dats.getJournal.getIssnPrinted, dats.getJournal.getIssnOnline, dats.getJournal.getIssnLinking, - false + false, + -1, + List[String]() ) } - HostedByItemType("", "", "", "", "", false) + HostedByItemType("", "", "", "", "", false, -1, List[String]()) } def oaHostedByDataset(spark: SparkSession, datasourcePath: String): Dataset[HostedByItemType] = { @@ -148,7 +160,9 @@ object SparkProduceHostedByMap { gold.getIssn, "", gold.getIssnL, - true + true, + -1, + List[String]() ) } @@ -171,14 +185,27 @@ object SparkProduceHostedByMap { } def doajToHostedbyItemType(doaj: DOAJModel): HostedByItemType = { - + if (doaj.getOaStart == null) { + return getHostedByItemType( + Constants.DOAJ, + doaj.getJournalTitle, + doaj.getIssn, + doaj.getEissn, + "", + true, + -1, + doaj.getReviewProcess.asScala.toList + ) + } return getHostedByItemType( Constants.DOAJ, doaj.getJournalTitle, doaj.getIssn, doaj.getEissn, "", - true + true, + doaj.getOaStart, + doaj.getReviewProcess.asScala.toList ) } @@ -256,6 +283,8 @@ object SparkProduceHostedByMap { logger.info("Getting the Datasources") + HdfsSupport.remove(outputPath, spark.sparkContext.hadoopConfiguration) + Aggregators .explodeHostedByItemType( oaHostedByDataset(spark, datasourcePath)