changed logic for save in s3 directly

This commit is contained in:
Miriam Baglioni 2020-08-03 18:06:18 +02:00
parent 627c1dc73a
commit aa9f3d9698
8 changed files with 118 additions and 25 deletions

View File

@ -24,7 +24,7 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class DumpProducts implements Serializable {
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap,
public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath,
Class<? extends OafEntity> inputClazz,
Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result> outputClazz,
boolean graph) {
@ -36,19 +36,60 @@ public class DumpProducts implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// , dumpClazz);
execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, graph);// ,
// dumpClazz);
});
}
// public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, CommunityMap communityMap,
// Class<? extends OafEntity> inputClazz,
// Class<? extends eu.dnetlib.dhp.schema.dump.oaf.Result> outputClazz,
// boolean graph) {
//
// SparkConf conf = new SparkConf();
//
// runWithSparkSession(
// conf,
// isSparkSessionManaged,
// spark -> {
// Utils.removeOutputDir(spark, outputPath);
// execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// ,
// // dumpClazz);
// });
// }
// public static <I extends OafEntity, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(
// SparkSession spark,
// String inputPath,
// String outputPath,
// CommunityMap communityMap,
// Class<I> inputClazz,
// Class<O> outputClazz,
// boolean graph) {
//
// // CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
//
// Utils
// .readPath(spark, inputPath, inputClazz)
// .map(value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz))
// .filter(Objects::nonNull)
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(outputPath);
//
// }
public static <I extends OafEntity, O extends eu.dnetlib.dhp.schema.dump.oaf.Result> void execDump(
SparkSession spark,
String inputPath,
String outputPath,
CommunityMap communityMap,
String communityMapPath,
Class<I> inputClazz,
Class<O> outputClazz,
boolean graph) throws ClassNotFoundException {
boolean graph) {
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Utils
.readPath(spark, inputPath, inputClazz)

View File

@ -7,8 +7,10 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.graph.Constants;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
@ -40,4 +42,10 @@ public class Utils {
"%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(id));
}
public static CommunityMap getCommunityMap(SparkSession spark, String communityMapPath) {
return new Gson().fromJson(spark.read().textFile(communityMapPath).collectAsList().get(0), CommunityMap.class);
}
}

View File

@ -100,7 +100,7 @@ public class CreateContextEntities implements Serializable {
protected <R extends ResearchInitiative> void writeEntity(final R r) {
try {
writer.write(Utils.OBJECT_MAPPER.writeValueAsString(r));
//log.info("writing context : {}", new Gson().toJson(r));
// log.info("writing context : {}", new Gson().toJson(r));
writer.newLine();
} catch (final Exception e) {
throw new RuntimeException(e);

View File

@ -35,13 +35,19 @@ public class DumpGraphEntities implements Serializable {
String inputPath,
String outputPath,
Class<? extends OafEntity> inputClazz,
CommunityMap communityMap) {
String communityMapPath) {
// CommunityMap communityMap) {
SparkConf conf = new SparkConf();
switch (ModelSupport.idPrefixMap.get(inputClazz)) {
case "50":
DumpProducts d = new DumpProducts();
d.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, Result.class, true);
d
.run(
isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, Result.class,
true);
// d.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, Result.class, true);
break;
case "40":
runWithSparkSession(

View File

@ -8,13 +8,17 @@ import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.zenodo.Community;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.Provenance;
@ -28,6 +32,24 @@ import eu.dnetlib.dhp.schema.oaf.Result;
public class Extractor implements Serializable {
// public void run(Boolean isSparkSessionManaged,
// String inputPath,
// String outputPath,
// Class<? extends Result> inputClazz,
// String communityMapPath) {
//
// SparkConf conf = new SparkConf();
//
// runWithSparkSession(
// conf,
// isSparkSessionManaged,
// spark -> {
// Utils.removeOutputDir(spark, outputPath);
// extractRelationResult(
// spark, inputPath, outputPath, inputClazz, Utils.getCommunityMap(spark, communityMapPath));
// });
// }
public void run(Boolean isSparkSessionManaged,
String inputPath,
String outputPath,
@ -41,7 +63,8 @@ public class Extractor implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
extractRelationResult(spark, inputPath, outputPath, inputClazz, communityMap);
extractRelationResult(
spark, inputPath, outputPath, inputClazz, communityMap);
});
}

View File

@ -56,24 +56,33 @@ public class Process implements Serializable {
String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2));
String contextId = Utils.getContextId(ci.getId());
relationList.add(Relation.newInstance(
Node.newInstance(contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY),
Node.newInstance(ds, nodeType),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
.newInstance(
relationList
.add(
Relation
.newInstance(
Node
.newInstance(
contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY),
Node.newInstance(ds, nodeType),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
.newInstance(
eu.dnetlib.dhp.oa.graph.dump.graph.Constants.USER_CLAIM,
eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST)
));
eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST)));
relationList.add(Relation.newInstance(Node.newInstance(ds, nodeType),
Node.newInstance(contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
.newInstance(
relationList
.add(
Relation
.newInstance(
Node.newInstance(ds, nodeType),
Node
.newInstance(
contextId, eu.dnetlib.dhp.schema.dump.oaf.graph.Constants.CONTEXT_ENTITY),
RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP),
Provenance
.newInstance(
eu.dnetlib.dhp.oa.graph.dump.graph.Constants.USER_CLAIM,
eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST)
));
eu.dnetlib.dhp.oa.graph.dump.graph.Constants.DEFAULT_TRUST)));
});

View File

@ -42,6 +42,8 @@ public class SparkDumpEntitiesJob implements Serializable {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String communityMapPath = parser.get("communityMapPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
@ -52,7 +54,8 @@ public class SparkDumpEntitiesJob implements Serializable {
CommunityMap communityMap = queryInformationSystem.getCommunityMap();
DumpGraphEntities dg = new DumpGraphEntities();
dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap);
dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath);
// dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap);
}

View File

@ -42,6 +42,8 @@ public class SparkExtractRelationFromEntities implements Serializable {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
// final String communityMapPath = parser.get("communityMapPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
@ -52,6 +54,7 @@ public class SparkExtractRelationFromEntities implements Serializable {
CommunityMap communityMap = queryInformationSystem.getCommunityMap();
Extractor extractor = new Extractor();
// extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMapPath);
extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap);
}