diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala index 7395b2450..d342d57b0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DatasourceInfo, EntityInfo} +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo import eu.dnetlib.dhp.schema.oaf.{Journal, Publication} import org.apache.commons.io.IOUtils @@ -17,8 +17,6 @@ import org.slf4j.{Logger, LoggerFactory} object SparkPrepareHostedByInfoToApply { - - implicit val mapEncoderDSInfo: Encoder[DatasourceInfo] = Encoders.bean(classOf[DatasourceInfo]) implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) def getList(id: String, j: Journal, name: String ) : List[EntityInfo] = { @@ -58,33 +56,6 @@ object SparkPrepareHostedByInfoToApply { toEntityItem(c.keys.head, c.values.head) } - def explodeJournalInfo(input: DatasourceInfo): List[EntityInfo] = { - var lst : List[EntityInfo] = List() - if (input.getEissn != null && !input.getEissn.equals("")){ - lst = EntityInfo.newInstance(input.getId, input.getEissn, input.getOfficialname, input.getOpenAccess) :: lst - } - if (input.getLissn != null && !input.getLissn.equals("")){ - lst = EntityInfo.newInstance(input.getId, input.getLissn, input.getOfficialname, input.getOpenAccess) :: lst - } - if (input.getIssn != null && !input.getIssn.equals("")){ - lst = EntityInfo.newInstance(input.getId, input.getIssn, input.getOfficialname, input.getOpenAccess) :: lst - } - - lst - } - - def joinDsandHBM(left:Dataset[DatasourceInfo], right:Dataset[HostedByItemType]): Dataset[EntityInfo] = { - left.joinWith(right, - left.col("id").equalTo(right.col("id")), "left") - .map(t2 => { - val dsi : DatasourceInfo = t2._1 - if(t2._2 != null){ - val hbi : HostedByItemType = t2._2 - dsi.setOpenAccess(hbi.openAccess) - } - dsi - }).flatMap(explodeJournalInfo) - } def toEntityItem(journal_id: String , hbi: HostedByItemType): EntityInfo = { @@ -123,7 +94,7 @@ object SparkPrepareHostedByInfoToApply { val graphPath = parser.get("graphPath") - val outputPath = parser.get("outputPath") + val outputPath = parser.get("preparedInfoPath") val hostedByMapPath = parser.get("hostedByMapPath") @@ -139,7 +110,7 @@ object SparkPrepareHostedByInfoToApply { val hostedByInfo:Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath)).map(toEntityInfo) //STEP2: creare la mappa publication id issn, eissn, lissn esplosa - val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, "$graphPath/publication") + val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, graphPath + "/publication") //STEP3: join resultInfo con hostedByInfo sul journal_id dal result con left // e riduzione di tutti i result con lo stesso id in una unica entry con aggiunto l'id della datasource