merge branch with beta
commit
1d6ac3715b
@ -0,0 +1,73 @@
|
||||
package eu.dnetlib.dhp.actionmanager.scholix
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object SparkCreateActionset {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
log.info(s"sourcePath -> $sourcePath")
|
||||
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath -> $targetPath")
|
||||
|
||||
val workingDirFolder = parser.get("workingDirFolder")
|
||||
log.info(s"workingDirFolder -> $workingDirFolder")
|
||||
|
||||
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
|
||||
implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
|
||||
|
||||
import spark.implicits._
|
||||
|
||||
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
|
||||
|
||||
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
|
||||
|
||||
|
||||
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
|
||||
|
||||
log.info("extract source and target Identifier involved in relations")
|
||||
|
||||
|
||||
log.info("save relation filtered")
|
||||
|
||||
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving entities")
|
||||
|
||||
val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
|
||||
|
||||
|
||||
entities.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result])
|
||||
entities
|
||||
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,115 @@
|
||||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.RelationIdMapping;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class PatchRelationsApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PatchRelationsApplication.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
Optional.ofNullable(
|
||||
PatchRelationsApplication.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json"))
|
||||
.orElseThrow(FileNotFoundException::new)
|
||||
));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
log.info("graphBasePath: {}", graphBasePath);
|
||||
|
||||
final String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir: {}", workingDir);
|
||||
|
||||
final String idMappingPath = parser.get("idMappingPath");
|
||||
log.info("idMappingPath: {}", idMappingPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> patchRelations(spark, graphBasePath, workingDir, idMappingPath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Substitutes the identifiers (source/target) from the set of relations part of the graphBasePath included in the
|
||||
* mapping provided by the dataset stored on idMappingPath, using workingDir as intermediate storage location.
|
||||
*
|
||||
* @param spark the SparkSession
|
||||
* @param graphBasePath base graph path providing the set of relations to patch
|
||||
* @param workingDir intermediate storage location
|
||||
* @param idMappingPath dataset providing the old -> new identifier mapping
|
||||
*/
|
||||
private static void patchRelations(final SparkSession spark, final String graphBasePath, final String workingDir, final String idMappingPath) {
|
||||
|
||||
final String relationPath = graphBasePath + "/relation";
|
||||
|
||||
final Dataset<Relation> rels = Utils.readPath(spark, relationPath, Relation.class);
|
||||
final Dataset<RelationIdMapping> idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class);
|
||||
|
||||
rels
|
||||
.joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left")
|
||||
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
|
||||
final Relation r = t._1();
|
||||
Optional.ofNullable(t._2())
|
||||
.map(RelationIdMapping::getNewId)
|
||||
.ifPresent(r::setSource);
|
||||
return r;
|
||||
}, Encoders.bean(Relation.class))
|
||||
.joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left")
|
||||
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
|
||||
final Relation r = t._1();
|
||||
Optional.ofNullable(t._2())
|
||||
.map(RelationIdMapping::getNewId)
|
||||
.ifPresent(r::setTarget);
|
||||
return r;
|
||||
}, Encoders.bean(Relation.class))
|
||||
.map(
|
||||
(MapFunction<Relation, String>) OBJECT_MAPPER::writeValueAsString,
|
||||
Encoders.STRING())
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.text(workingDir);
|
||||
|
||||
spark.read().textFile(workingDir)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.text(relationPath);
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
|
||||
public class RelationIdMapping {
|
||||
|
||||
private String oldId;
|
||||
|
||||
private String newId;
|
||||
|
||||
public String getOldId() {
|
||||
return oldId;
|
||||
}
|
||||
|
||||
public void setOldId(final String oldId) {
|
||||
this.oldId = oldId;
|
||||
}
|
||||
|
||||
public String getNewId() {
|
||||
return newId;
|
||||
}
|
||||
|
||||
public void setNewId(final String newId) {
|
||||
this.newId = newId;
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "base graph path providing the set of relations to patch",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "intermediate storage location",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "idMappingPath",
|
||||
"paramDescription": "dataset providing the old -> new identifier mapping",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
@ -0,0 +1,72 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<record xmlns="http://datacite.org/schema/kernel-4"
|
||||
xmlns:dr="http://www.driver-repository.eu/namespace/dr" xmlns:oaf="http://namespace.openaire.eu/oaf">
|
||||
<oai:header xmlns="http://namespace.openaire.eu/"
|
||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<dri:objIdentifier>enermaps____::04149ee428d07360314c2cb3ba95d41e</dri:objIdentifier>
|
||||
<dri:recordIdentifier>tgs00004</dri:recordIdentifier>
|
||||
<dri:dateOfCollection>2021-07-20T18:43:12.096+02:00</dri:dateOfCollection>
|
||||
<oaf:datasourceprefix>enermaps____</oaf:datasourceprefix>
|
||||
</oai:header>
|
||||
<metadata>
|
||||
<resource>
|
||||
<identifier identifierType="URL">https://ec.europa.eu/eurostat/web/products-datasets/-/tgs00004</identifier>
|
||||
<creators>
|
||||
<creator>
|
||||
<creatorName>Statistical Office of the European Union (Eurostat)</creatorName>
|
||||
</creator>
|
||||
</creators>
|
||||
<titles>
|
||||
<title>
|
||||
Regional GDP
|
||||
</title>
|
||||
</titles>
|
||||
<publisher>Statistical Office of the European Union (Eurostat)</publisher>
|
||||
<publicationYear>2020</publicationYear>
|
||||
<dates>
|
||||
<date dateType="Issued">2020-10-07</date>
|
||||
</dates>
|
||||
<resourceType resourceTypeGeneral="Dataset"/>
|
||||
<rightsList>
|
||||
<rights rightsURI="info:eu-repo/semantics/openAccess">OPEN</rights>
|
||||
<rights rightsURI="https://creativecommons.org/licenses/by/4.0/">Creative Commons Attribution 4.0 International</rights>
|
||||
</rightsList>
|
||||
<descriptions>
|
||||
<description descriptionType="Abstract" xml:lang="EN">GDP expressed in PPS (purchasing power standards) eliminates differences in price levels between countries. Calculations on a per inhabitant basis allow for the comparison of economies and regions significantly different in absolute size. GDP per inhabitant in PPS is the key variable for determining the eligibility of NUTS 2 regions in the framework of the European Unions structural policy.</description>
|
||||
</descriptions>
|
||||
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
|
||||
<oaf:dateAccepted>2020-10-07</oaf:dateAccepted>
|
||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||
<oaf:license>Creative Commons Attribution 4.0 International</oaf:license>
|
||||
<oaf:hostedBy
|
||||
id="openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18" name="Unknown Repository"/>
|
||||
<oaf:collectedFrom id="enermaps____::db" name="Enermaps"/>
|
||||
<oaf:concept id="enermaps::selection::tgs00004"/>
|
||||
</resource>
|
||||
</metadata>
|
||||
<about xmlns="" xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||
<originDescription altered="true" harvestDate="2021-07-20T18:43:12.096+02:00">
|
||||
<baseURL>https%3A%2F%2Flab.idiap.ch%2Fenermaps%2Fapi%2Fdatacite</baseURL>
|
||||
<identifier/>
|
||||
<datestamp/>
|
||||
<metadataNamespace/>
|
||||
</originDescription>
|
||||
</provenance>
|
||||
<oaf:datainfo>
|
||||
<oaf:inferred>false</oaf:inferred>
|
||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||
<oaf:trust>0.9</oaf:trust>
|
||||
<oaf:inferenceprovenance/>
|
||||
<oaf:provenanceaction classid="sysimport:crosswalk"
|
||||
classname="sysimport:crosswalk"
|
||||
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||
</oaf:datainfo>
|
||||
</about>
|
||||
</record>
|
@ -0,0 +1,70 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<record xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
|
||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<header xmlns="http://namespace.openaire.eu/">
|
||||
<dri:objIdentifier>jairo_______::000012e58ed836576ef2a0d38b0f726f</dri:objIdentifier>
|
||||
<dri:recordIdentifier>oai:irdb.nii.ac.jp:01221:0000010198</dri:recordIdentifier>
|
||||
<dri:dateOfCollection/>
|
||||
<dri:mdFormat/>
|
||||
<dri:mdFormatInterpretation/>
|
||||
<dri:repositoryId/>
|
||||
<dr:objectIdentifier/>
|
||||
<dr:dateOfCollection>2021-05-10T11:31:09.424Z</dr:dateOfCollection>
|
||||
<dr:dateOfTransformation>2021-06-03T01:45:42.536Z</dr:dateOfTransformation>
|
||||
<oaf:datasourceprefix>jairo_______</oaf:datasourceprefix>
|
||||
</header>
|
||||
<metadata xmlns="http://namespace.openaire.eu/">
|
||||
<dc:title>多項式GCDを用いた復号法に関する研究</dc:title>
|
||||
<dc:creator>上原, 剛</dc:creator>
|
||||
<dc:creator>甲斐, 博</dc:creator>
|
||||
<dc:creator>野田, 松太郎</dc:creator>
|
||||
<dc:format>application/pdf</dc:format>
|
||||
<dc:identifier>http://hdl.handle.net/2433/25934</dc:identifier>
|
||||
<dc:language>jpn</dc:language>
|
||||
<dc:publisher>京都大学数理解析研究所</dc:publisher>
|
||||
<dc:subject classid="ndc" classname="ndc"
|
||||
schemeid="dnet:subject_classification_typologies" schemename="dnet:subject_classification_typologies">410</dc:subject>
|
||||
<dc:type>Departmental Bulletin Paper</dc:type>
|
||||
<dr:CobjCategory type="publication">0014</dr:CobjCategory>
|
||||
<oaf:dateAccepted>2004-10-01</oaf:dateAccepted>
|
||||
<oaf:projectid/>
|
||||
<oaf:collectedDatasourceid>openaire____::554c7c2873</oaf:collectedDatasourceid>
|
||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||
<oaf:hostedBy id="openaire____::554c7c2873" name="JAIRO"/>
|
||||
<oaf:collectedFrom id="openaire____::554c7c2873" name="JAIRO"/>
|
||||
<oaf:identifier identifierType="handle">2433/25934</oaf:identifier>
|
||||
<oaf:identifier identifierType="ncid">AN00061013</oaf:identifier>
|
||||
<oaf:identifier identifierType="LandingPage">http://hdl.handle.net/2433/25934</oaf:identifier>
|
||||
<oaf:fulltext>http://repository.kulib.kyoto-u.ac.jp/dspace/bitstream/2433/25934/1/1395-16.pdf</oaf:fulltext>
|
||||
<oaf:journal ep="110" iss="" issn="1880-2818" sp="104" vol="1395">数理解析研究所講究録</oaf:journal>
|
||||
</metadata>
|
||||
<about>
|
||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||
<originDescription altered="true" harvestDate="2021-05-10T11:31:09.424Z">
|
||||
<baseURL>https%3A%2F%2Firdb.nii.ac.jp%2Foai</baseURL>
|
||||
<identifier>oai:irdb.nii.ac.jp:01221:0000010198</identifier>
|
||||
<datestamp>2021-04-13T13:36:29Z</datestamp>
|
||||
<metadataNamespace/>
|
||||
<originDescription altered="true" harvestDate="2021-04-13T13:36:29Z">
|
||||
<baseURL>http://repository.kulib.kyoto-u.ac.jp/dspace-oai/request</baseURL>
|
||||
<identifier>oai:repository.kulib.kyoto-u.ac.jp:2433/25934</identifier>
|
||||
<datestamp>2012-07-12T14:15:41Z</datestamp>
|
||||
<metadataNamespace>http://irdb.nii.ac.jp/oai</metadataNamespace>
|
||||
</originDescription>
|
||||
</originDescription>
|
||||
</provenance>
|
||||
<oaf:datainfo>
|
||||
<oaf:inferred>false</oaf:inferred>
|
||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||
<oaf:trust>0.9</oaf:trust>
|
||||
<oaf:inferenceprovenance/>
|
||||
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
|
||||
classname="sysimport:crosswalk:repository"
|
||||
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||
</oaf:datainfo>
|
||||
</about>
|
||||
</record>
|
@ -1,90 +0,0 @@
|
||||
package eu.dnetlib.dhp.sx.provision
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
|
||||
import org.apache.spark.{SparkConf, sql}
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object SparkCreateActionset {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
|
||||
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
log.info(s"sourcePath -> $sourcePath")
|
||||
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath -> $targetPath")
|
||||
|
||||
val workingDirFolder = parser.get("workingDirFolder")
|
||||
log.info(s"workingDirFolder -> $workingDirFolder")
|
||||
|
||||
implicit val oafEncoders:Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
implicit val resultEncoders:Encoder[Result] = Encoders.kryo[Result]
|
||||
implicit val relationEncoders:Encoder[Relation] = Encoders.kryo[Relation]
|
||||
|
||||
import spark.implicits._
|
||||
|
||||
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
|
||||
|
||||
relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.flatMap(r => List(r.getSource,r.getTarget)).distinct().write.save(s"$workingDirFolder/id_relation")
|
||||
|
||||
|
||||
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
|
||||
|
||||
log.info("extract source and target Identifier involved in relations")
|
||||
|
||||
|
||||
log.info("save relation filtered")
|
||||
|
||||
relation.filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving publication")
|
||||
|
||||
val publication:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/publication").as[Result].map(p => (p.getId, p))
|
||||
|
||||
publication
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving dataset")
|
||||
val dataset:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/dataset").as[Result].map(p => (p.getId, p))
|
||||
dataset
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving software")
|
||||
val software:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/software").as[Result].map(p => (p.getId, p))
|
||||
software
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
|
||||
log.info("saving Other Research product")
|
||||
val orp:Dataset[(String, Result)] = spark.read.load(s"$sourcePath/otherresearchproduct").as[Result].map(p => (p.getId, p))
|
||||
orp
|
||||
.joinWith(idRelation, publication("_1").equalTo(idRelation("value")))
|
||||
.map(p => p._1._2)
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue