forked from D-Net/dnet-hadoop
Hosted By Map - refactoring and deletion of not needed methods
This commit is contained in:
parent
8ba8c77f92
commit
1e952cccf6
|
@ -2,7 +2,7 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DatasourceInfo, EntityInfo}
|
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Journal, Publication}
|
import eu.dnetlib.dhp.schema.oaf.{Journal, Publication}
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
@ -17,8 +17,6 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
object SparkPrepareHostedByInfoToApply {
|
object SparkPrepareHostedByInfoToApply {
|
||||||
|
|
||||||
|
|
||||||
implicit val mapEncoderDSInfo: Encoder[DatasourceInfo] = Encoders.bean(classOf[DatasourceInfo])
|
|
||||||
implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
||||||
|
|
||||||
def getList(id: String, j: Journal, name: String ) : List[EntityInfo] = {
|
def getList(id: String, j: Journal, name: String ) : List[EntityInfo] = {
|
||||||
|
@ -58,33 +56,6 @@ object SparkPrepareHostedByInfoToApply {
|
||||||
toEntityItem(c.keys.head, c.values.head)
|
toEntityItem(c.keys.head, c.values.head)
|
||||||
}
|
}
|
||||||
|
|
||||||
def explodeJournalInfo(input: DatasourceInfo): List[EntityInfo] = {
|
|
||||||
var lst : List[EntityInfo] = List()
|
|
||||||
if (input.getEissn != null && !input.getEissn.equals("")){
|
|
||||||
lst = EntityInfo.newInstance(input.getId, input.getEissn, input.getOfficialname, input.getOpenAccess) :: lst
|
|
||||||
}
|
|
||||||
if (input.getLissn != null && !input.getLissn.equals("")){
|
|
||||||
lst = EntityInfo.newInstance(input.getId, input.getLissn, input.getOfficialname, input.getOpenAccess) :: lst
|
|
||||||
}
|
|
||||||
if (input.getIssn != null && !input.getIssn.equals("")){
|
|
||||||
lst = EntityInfo.newInstance(input.getId, input.getIssn, input.getOfficialname, input.getOpenAccess) :: lst
|
|
||||||
}
|
|
||||||
|
|
||||||
lst
|
|
||||||
}
|
|
||||||
|
|
||||||
def joinDsandHBM(left:Dataset[DatasourceInfo], right:Dataset[HostedByItemType]): Dataset[EntityInfo] = {
|
|
||||||
left.joinWith(right,
|
|
||||||
left.col("id").equalTo(right.col("id")), "left")
|
|
||||||
.map(t2 => {
|
|
||||||
val dsi : DatasourceInfo = t2._1
|
|
||||||
if(t2._2 != null){
|
|
||||||
val hbi : HostedByItemType = t2._2
|
|
||||||
dsi.setOpenAccess(hbi.openAccess)
|
|
||||||
}
|
|
||||||
dsi
|
|
||||||
}).flatMap(explodeJournalInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
def toEntityItem(journal_id: String , hbi: HostedByItemType): EntityInfo = {
|
def toEntityItem(journal_id: String , hbi: HostedByItemType): EntityInfo = {
|
||||||
|
|
||||||
|
@ -123,7 +94,7 @@ object SparkPrepareHostedByInfoToApply {
|
||||||
|
|
||||||
val graphPath = parser.get("graphPath")
|
val graphPath = parser.get("graphPath")
|
||||||
|
|
||||||
val outputPath = parser.get("outputPath")
|
val outputPath = parser.get("preparedInfoPath")
|
||||||
val hostedByMapPath = parser.get("hostedByMapPath")
|
val hostedByMapPath = parser.get("hostedByMapPath")
|
||||||
|
|
||||||
|
|
||||||
|
@ -139,7 +110,7 @@ object SparkPrepareHostedByInfoToApply {
|
||||||
val hostedByInfo:Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath)).map(toEntityInfo)
|
val hostedByInfo:Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath)).map(toEntityInfo)
|
||||||
|
|
||||||
//STEP2: creare la mappa publication id issn, eissn, lissn esplosa
|
//STEP2: creare la mappa publication id issn, eissn, lissn esplosa
|
||||||
val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, "$graphPath/publication")
|
val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, graphPath + "/publication")
|
||||||
|
|
||||||
//STEP3: join resultInfo con hostedByInfo sul journal_id dal result con left
|
//STEP3: join resultInfo con hostedByInfo sul journal_id dal result con left
|
||||||
// e riduzione di tutti i result con lo stesso id in una unica entry con aggiunto l'id della datasource
|
// e riduzione di tutti i result con lo stesso id in una unica entry con aggiunto l'id della datasource
|
||||||
|
|
Loading…
Reference in New Issue