refactoring

This commit is contained in:
Miriam Baglioni 2020-08-07 17:41:56 +02:00
parent 9675af7965
commit 26d2ad6ebb
1 changed files with 10 additions and 51 deletions

View File

@ -8,47 +8,22 @@ import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Community;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.Provenance;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Node;
import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
public class Extractor implements Serializable {
// public void run(Boolean isSparkSessionManaged,
// String inputPath,
// String outputPath,
// Class<? extends Result> inputClazz,
// String communityMapPath) {
//
// SparkConf conf = new SparkConf();
//
// runWithSparkSession(
// conf,
// isSparkSessionManaged,
// spark -> {
// Utils.removeOutputDir(spark, outputPath);
// extractRelationResult(
// spark, inputPath, outputPath, inputClazz, Utils.getCommunityMap(spark, communityMapPath));
// });
// }
public void run(Boolean isSparkSessionManaged,
String inputPath,
@ -68,30 +43,6 @@ public class Extractor implements Serializable {
});
}
// private static void extractRelationProjects(SparkSession spark, String inputPath, String outputPath){
// Utils.readPath(spark, inputPath, Project.class)
// .flatMap((FlatMapFunction<Project, Relation>) project ->{
// List<Relation> relList = new ArrayList<>();
// Optional.ofNullable(project.getCollectedfrom())
// .ifPresent(cfl ->
// cfl.forEach(cf -> {
// Provenance provenance = Provenance.newInstance(cf.getDataInfo().getProvenanceaction().getClassname(),
// cf.getDataInfo().getTrust());
//
// relList.add(getRelation(project.getId(), cf.getKey(),
// Constants.PROJECT_ENTITY, Constants.DATASOURCE_ENTITY, Constants.IS_FUNDED_BY,
// Constants.FUNDINGS, provenance));
// relList.add(getRelation(cf.getKey(), project.getId(),
// Constants.DATASOURCE_ENTITY, Constants.PROJECT_ENTITY, Constants.FUNDS,
// Constants.FUNDINGS, provenance));
// }));
// return relList.iterator();
// }, Encoders.bean(Relation.class))
// .write()
// .option("Compression", "gzip")
// .mode(SaveMode.Append)
// .json(outputPath);
// }
private <R extends Result> void extractRelationResult(SparkSession spark,
String inputPath,
@ -192,8 +143,16 @@ public class Extractor implements Serializable {
.newInstance(
paction.getClassid(),
dinfo.getTrust()))
.orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST)))
.orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST));
.orElse(
Provenance
.newInstance(
eu.dnetlib.dhp.oa.graph.dump.Constants.HARVESTED,
eu.dnetlib.dhp.oa.graph.dump.Constants.DEFAULT_TRUST)))
.orElse(
Provenance
.newInstance(
eu.dnetlib.dhp.oa.graph.dump.Constants.HARVESTED,
eu.dnetlib.dhp.oa.graph.dump.Constants.DEFAULT_TRUST));
Relation r = getRelation(
value.getId(),
cf.getKey(), Constants.RESULT_ENTITY, Constants.DATASOURCE_ENTITY,