[stats wf] indicators across stats dbs & updates in the org ids #248

Closed
dimitris.pierrakos wants to merge 1742 commits from beta into beta2master_sept_2022
1 changed files with 1 additions and 3 deletions
Showing only changes of commit ece40adc09 - Show all commits

View File

@ -65,7 +65,6 @@ public class GetDatasourceFromCountry implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
getDatasourceFromCountry(spark, country, inputPath, workingPath);
});
}
@ -83,7 +82,6 @@ public class GetDatasourceFromCountry implements Serializable {
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference() &&
o.getCountry().getClassid().length() > 0 &&
o.getCountry().getClassid().equals(country));
;
// filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass
Dataset<Relation> relation = spark
@ -97,7 +95,7 @@ public class GetDatasourceFromCountry implements Serializable {
!rel.getDataInfo().getDeletedbyinference());
organization
.joinWith(relation, organization.col("id").equalTo(relation.col("target")), "left")
.joinWith(relation, organization.col("id").equalTo(relation.col("target")))
.map((MapFunction<Tuple2<Organization, Relation>, String>) t2 -> t2._2().getSource(), Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)