diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index d74b55475..1d5b35cff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -111,12 +111,12 @@ public class SparkAtomicActionUsageJob implements Serializable { resultModel .joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left") .map((MapFunction, UsageStatsResultModel>) t2 -> { + UsageStatsResultModel usrm = t2._1(); if(Optional.ofNullable(t2._2()).isPresent()) usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue()); else usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND"); return usrm; - return usrm; }, Encoders.bean(UsageStatsResultModel.class)) .write() .mode(SaveMode.Overwrite)