diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java index dd5af6998..d3741d3e8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/GetDatasourceFromCountry.java @@ -65,7 +65,6 @@ public class GetDatasourceFromCountry implements Serializable { conf, isSparkSessionManaged, spark -> { - getDatasourceFromCountry(spark, country, inputPath, workingPath); }); } @@ -83,7 +82,6 @@ public class GetDatasourceFromCountry implements Serializable { (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && o.getCountry().getClassid().length() > 0 && o.getCountry().getClassid().equals(country)); - ; // filtering of the relations taking the non deleted by inference and those with IsProvidedBy as relclass Dataset relation = spark @@ -97,7 +95,7 @@ public class GetDatasourceFromCountry implements Serializable { !rel.getDataInfo().getDeletedbyinference()); organization - .joinWith(relation, organization.col("id").equalTo(relation.col("target")), "left") + .joinWith(relation, organization.col("id").equalTo(relation.col("target"))) .map((MapFunction, String>) t2 -> t2._2().getSource(), Encoders.STRING()) .write() .mode(SaveMode.Overwrite)