forked from D-Net/dnet-hadoop
minor modifications
This commit is contained in:
parent
418595fec2
commit
65a5d67b8b
|
@ -70,7 +70,8 @@ public class SparkResultToCommunityFromOrganizationJob2 {
|
||||||
if (isTest(parser)) {
|
if (isTest(parser)) {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
}
|
}
|
||||||
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
|
if(saveGraph)
|
||||||
|
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,8 +58,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
final String resultClassName = parser.get("resultTableName");
|
final String resultClassName = parser.get("resultTableName");
|
||||||
log.info("resultTableName: {}", resultClassName);
|
log.info("resultTableName: {}", resultClassName);
|
||||||
|
|
||||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
|
||||||
log.info("resultType: {}", resultType);
|
|
||||||
|
|
||||||
final Boolean saveGraph = Optional
|
final Boolean saveGraph = Optional
|
||||||
.ofNullable(parser.get("saveGraph"))
|
.ofNullable(parser.get("saveGraph"))
|
||||||
|
@ -79,16 +78,15 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
if (isTest(parser)) {
|
if (isTest(parser)) {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
}
|
}
|
||||||
execPropagation(
|
if(saveGraph)
|
||||||
spark,
|
execPropagation(
|
||||||
datasourceorganization,
|
spark,
|
||||||
alreadylinked,
|
datasourceorganization,
|
||||||
inputPath,
|
alreadylinked,
|
||||||
outputPath,
|
inputPath,
|
||||||
resultClazz,
|
outputPath,
|
||||||
resultType,
|
resultClazz);
|
||||||
saveGraph);
|
});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void execPropagation(
|
private static void execPropagation(
|
||||||
|
@ -97,10 +95,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
String alreadylinked,
|
String alreadylinked,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
String outputPath,
|
String outputPath,
|
||||||
Class<? extends Result> resultClazz,
|
Class<? extends Result> resultClazz) {
|
||||||
String resultType,
|
|
||||||
|
|
||||||
Boolean saveGraph) {
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<DatasourceOrganization> datasourceorganizationassoc = readAssocDatasourceOrganization(
|
org.apache.spark.sql.Dataset<DatasourceOrganization> datasourceorganizationassoc = readAssocDatasourceOrganization(
|
||||||
|
@ -117,7 +112,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
broadcast_datasourceorganizationassoc)
|
broadcast_datasourceorganizationassoc)
|
||||||
.as(Encoders.bean(ResultOrganizationSet.class));
|
.as(Encoders.bean(ResultOrganizationSet.class));
|
||||||
|
|
||||||
if (saveGraph) {
|
|
||||||
getNewRelations(
|
getNewRelations(
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -133,7 +128,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.text(outputPath);
|
.text(outputPath);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Relation> getNewRelations(
|
private static Dataset<Relation> getNewRelations(
|
||||||
|
|
Loading…
Reference in New Issue