master #11
|
@ -61,12 +61,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
|
||||||
log.info("resultType: {}", resultType);
|
log.info("resultType: {}", resultType);
|
||||||
|
|
||||||
final Boolean writeUpdates = Optional
|
|
||||||
.ofNullable(parser.get("writeUpdate"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("writeUpdate: {}", writeUpdates);
|
|
||||||
|
|
||||||
final Boolean saveGraph = Optional
|
final Boolean saveGraph = Optional
|
||||||
.ofNullable(parser.get("saveGraph"))
|
.ofNullable(parser.get("saveGraph"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
|
@ -93,7 +87,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
outputPath,
|
outputPath,
|
||||||
resultClazz,
|
resultClazz,
|
||||||
resultType,
|
resultType,
|
||||||
writeUpdates,
|
|
||||||
saveGraph);
|
saveGraph);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -106,7 +99,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
String outputPath,
|
String outputPath,
|
||||||
Class<? extends Result> resultClazz,
|
Class<? extends Result> resultClazz,
|
||||||
String resultType,
|
String resultType,
|
||||||
Boolean writeUpdates,
|
|
||||||
Boolean saveGraph) {
|
Boolean saveGraph) {
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
@ -124,10 +116,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
broadcast_datasourceorganizationassoc)
|
broadcast_datasourceorganizationassoc)
|
||||||
.as(Encoders.bean(ResultOrganizationSet.class));
|
.as(Encoders.bean(ResultOrganizationSet.class));
|
||||||
|
|
||||||
if (writeUpdates) {
|
|
||||||
createUpdateForRelationWrite(potentialUpdates, outputPath + "/" + resultType);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (saveGraph) {
|
if (saveGraph) {
|
||||||
getNewRelations(
|
getNewRelations(
|
||||||
spark
|
spark
|
||||||
|
@ -229,48 +217,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
|
||||||
Encoders.bean(DatasourceOrganization.class));
|
Encoders.bean(DatasourceOrganization.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void createUpdateForRelationWrite(
|
|
||||||
Dataset<ResultOrganizationSet> toupdaterelation, String outputPath) {
|
|
||||||
toupdaterelation
|
|
||||||
.flatMap(
|
|
||||||
s -> {
|
|
||||||
List<Relation> relationList = new ArrayList<>();
|
|
||||||
List<String> orgs = s.getOrganizationSet();
|
|
||||||
String resId = s.getResultId();
|
|
||||||
for (String org : orgs) {
|
|
||||||
relationList
|
|
||||||
.add(
|
|
||||||
getRelation(
|
|
||||||
org,
|
|
||||||
resId,
|
|
||||||
RELATION_ORGANIZATION_RESULT_REL_CLASS,
|
|
||||||
RELATION_RESULTORGANIZATION_REL_TYPE,
|
|
||||||
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
|
|
||||||
PROPAGATION_DATA_INFO_TYPE,
|
|
||||||
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
|
|
||||||
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
|
|
||||||
relationList
|
|
||||||
.add(
|
|
||||||
getRelation(
|
|
||||||
resId,
|
|
||||||
org,
|
|
||||||
RELATION_RESULT_ORGANIZATION_REL_CLASS,
|
|
||||||
RELATION_RESULTORGANIZATION_REL_TYPE,
|
|
||||||
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
|
|
||||||
PROPAGATION_DATA_INFO_TYPE,
|
|
||||||
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
|
|
||||||
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
|
|
||||||
}
|
|
||||||
return relationList.iterator();
|
|
||||||
},
|
|
||||||
Encoders.bean(Relation.class))
|
|
||||||
.toJSON()
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.text(outputPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static org.apache.spark.sql.Dataset<ResultOrganizationSet> organizationPropagationAssoc(
|
private static org.apache.spark.sql.Dataset<ResultOrganizationSet> organizationPropagationAssoc(
|
||||||
SparkSession spark,
|
SparkSession spark,
|
||||||
Broadcast<org.apache.spark.sql.Dataset<DatasourceOrganization>> broadcast_datasourceorganizationassoc) {
|
Broadcast<org.apache.spark.sql.Dataset<DatasourceOrganization>> broadcast_datasourceorganizationassoc) {
|
||||||
|
|
Loading…
Reference in New Issue