changed the way to read the file with info on resource or relation. From sequenceFile to textFile

This commit is contained in:
Miriam Baglioni 2020-03-17 16:32:05 +01:00
parent b4652d018c
commit 67ea3cf3ed
1 changed files with 21 additions and 45 deletions

View File

@ -6,6 +6,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@ -14,6 +16,7 @@ import org.apache.spark.sql.*;
import scala.Tuple2;
import java.io.File;
import java.io.IOException;
import java.util.*;
import static eu.dnetlib.dhp.PropagationConstant.*;
@ -34,45 +37,43 @@ public class SparkCountryPropagationJob {
final String inputPath = parser.get("sourcePath");
final String outputPath = "/tmp/provision/propagation/countrytoresultfrominstitutionalrepositories";
File directory = new File(outputPath);
createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
if(!directory.exists()){
directory.mkdirs();
}
List<String> whitelist = Arrays.asList(parser.get("whitelist").split(";"));
List<String> allowedtypes = Arrays.asList(parser.get("allowedtypes").split(";"));
JavaPairRDD<String, TypedRow> organizations = sc.sequenceFile(inputPath + "/organization", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Organization.class))
JavaPairRDD<String, TypedRow> organizations = sc.textFile(inputPath + "/organization")
.map(item -> new ObjectMapper().readValue(item, Organization.class))
.filter(org -> !org.getDataInfo().getDeletedbyinference())
.map(org -> new TypedRow().setSourceId(org.getId()).setValue(org.getCountry().getClassid()))
.mapToPair(toPair());
JavaPairRDD<String, TypedRow> organization_datasource = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class))
JavaPairRDD<String, TypedRow> organization_datasource =
sc.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class))
.filter(r -> !r.getDataInfo().getDeletedbyinference())
.filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_ORGANIZATION_DATASOURCE_REL_CLASS.equals(r.getRelType()))
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
.mapToPair(toPair()); //id is the organization identifier
JavaPairRDD<String, TypedRow> datasources = sc.sequenceFile(inputPath + "/datasource", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Datasource.class))
JavaPairRDD<String, TypedRow> datasources = sc.textFile(inputPath + "/datasource")
.map(item -> new ObjectMapper().readValue(item, Datasource.class))
.filter(ds -> whitelist.contains(ds.getId()) || allowedtypes.contains(ds.getDatasourcetype().getClassid()))
.map(ds -> new TypedRow().setSourceId(ds.getId()))
.mapToPair(toPair());
JavaRDD<Publication> publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class));
JavaRDD<Dataset> datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class));
JavaRDD<Software> software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class));
JavaRDD<OtherResearchProduct> other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
JavaRDD<Publication> publications = sc.textFile(inputPath + "/publication")
.map(item -> new ObjectMapper().readValue(item, Publication.class));
JavaRDD<Dataset> datasets = sc.textFile(inputPath + "/dataset")
.map(item -> new ObjectMapper().readValue(item, Dataset.class));
JavaRDD<Software> software = sc.textFile(inputPath + "/software")
.map(item -> new ObjectMapper().readValue(item, Software.class));
JavaRDD<OtherResearchProduct> other = sc.textFile(inputPath + "/otherresearchproduct")
.map(item -> new ObjectMapper().readValue(item, OtherResearchProduct.class));
JavaPairRDD<String, TypedRow> datasource_results = publications
.map(oaf -> getTypedRowsDatasourceResult(oaf))
@ -147,6 +148,8 @@ public class SparkCountryPropagationJob {
}
private static void updateResult(JavaPairRDD<String, Result> results, JavaPairRDD<String, TypedRow> toupdateresult, String outputPath, String type) {
results.leftOuterJoin(toupdateresult)
.map(c -> {
@ -177,33 +180,6 @@ public class SparkCountryPropagationJob {
private static JavaPairRDD<String, TypedRow> getResults(JavaSparkContext sc , String inputPath){
return
sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class))
.filter(ds -> !ds.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("dataset").setSourceId(oaf.getId()))
.mapToPair(toPair())
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class))
.filter(o -> !o.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("otherresearchproduct").setSourceId(oaf.getId()))
.mapToPair(toPair()))
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class))
.filter(s -> !s.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("software").setSourceId(oaf.getId()))
.mapToPair(toPair()))
.union(sc.sequenceFile(inputPath + "/publication", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class))
.filter(p -> !p.getDataInfo().getDeletedbyinference())
.map(oaf -> new TypedRow().setType("publication").setSourceId(oaf.getId()))
.mapToPair(toPair()));
}