refactoring
This commit is contained in:
parent
55ea485783
commit
4c9bc4c3a5
|
@ -82,14 +82,16 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
|||
});
|
||||
}
|
||||
|
||||
private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) {
|
||||
private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName,
|
||||
String resultAttributeName, String datasourceAttributeName) {
|
||||
spark
|
||||
.sql(
|
||||
String
|
||||
.format(
|
||||
"select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " +
|
||||
"from %s.%s group by %s, %s",
|
||||
resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName))
|
||||
resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName,
|
||||
datasourceAttributeName))
|
||||
.as(Encoders.bean(UsageStatsResultModel.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
|
@ -131,20 +133,22 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
|||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
|
||||
}
|
||||
|
||||
public static Measure newMeasureInstance(String id) {
|
||||
Measure m = new Measure();
|
||||
m.setId(id);
|
||||
m.setUnit(new ArrayList<>());
|
||||
return m;
|
||||
}
|
||||
|
||||
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
|
||||
|
||||
return readPath(spark, inputPath, UsageStatsResultModel.class)
|
||||
.groupByKey((MapFunction<UsageStatsResultModel, String>) usm -> usm.getId(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, UsageStatsResultModel, Result>) (k,it) -> {
|
||||
.mapGroups((MapGroupsFunction<String, UsageStatsResultModel, Result>) (k, it) -> {
|
||||
Result r = new Result();
|
||||
r.setId("50|" + k);
|
||||
//id = download or view and unit = list of key value pairs
|
||||
// id = download or view and unit = list of key value pairs
|
||||
Measure download = newMeasureInstance("downloads");
|
||||
Measure view = newMeasureInstance("views");
|
||||
UsageStatsResultModel first = it.next();
|
||||
|
@ -178,8 +182,14 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
|||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"");
|
||||
download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo));
|
||||
view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo));
|
||||
download
|
||||
.getUnit()
|
||||
.add(
|
||||
OafMapperUtils
|
||||
.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo));
|
||||
view
|
||||
.getUnit()
|
||||
.add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo));
|
||||
}
|
||||
|
||||
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||
|
||||
/**
|
||||
* @author miriam.baglioni
|
||||
* @Date 30/06/23
|
||||
*/
|
||||
public class UsageStatsResultModel extends UsageStatsModel{
|
||||
private String datasourceId ;
|
||||
public class UsageStatsResultModel extends UsageStatsModel {
|
||||
private String datasourceId;
|
||||
|
||||
public String getDatasourceId() {
|
||||
return datasourceId;
|
||||
|
|
|
@ -8,7 +8,6 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
|
@ -144,7 +144,8 @@ public class SparkAtomicActionCountJobTest {
|
|||
"Inferred by OpenAIRE",
|
||||
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||
|
||||
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
|
||||
tmp
|
||||
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
|
||||
.foreach(
|
||||
r -> ((OafEntity) r.getPayload())
|
||||
.getMeasures()
|
||||
|
@ -169,37 +170,60 @@ public class SparkAtomicActionCountJobTest {
|
|||
.equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||
.count());
|
||||
|
||||
OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first()
|
||||
OafEntity entity = (OafEntity) tmp
|
||||
.filter(
|
||||
aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||
.first()
|
||||
.getPayload();
|
||||
|
||||
entity
|
||||
.getMeasures()
|
||||
.stream()
|
||||
.forEach(
|
||||
m -> Assertions.assertEquals(3, m.getUnit().size() ));
|
||||
m -> Assertions.assertEquals(3, m.getUnit().size()));
|
||||
|
||||
Measure downloads = entity.getMeasures()
|
||||
Measure downloads = entity
|
||||
.getMeasures()
|
||||
.stream()
|
||||
.filter(m -> m.getId().equals("downloads"))
|
||||
.findFirst()
|
||||
.get();
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
String.valueOf(0),
|
||||
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
String.valueOf(0),
|
||||
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
String.valueOf(1),
|
||||
downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
|
||||
|
||||
Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
|
||||
Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
|
||||
Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
|
||||
|
||||
Measure views = entity.getMeasures()
|
||||
Measure views = entity
|
||||
.getMeasures()
|
||||
.stream()
|
||||
.filter(m -> m.getId().equals("views"))
|
||||
.findFirst()
|
||||
.get();
|
||||
|
||||
Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
|
||||
Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
|
||||
Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
String.valueOf(5),
|
||||
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
String.valueOf(1),
|
||||
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
String.valueOf(3),
|
||||
views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
|
||||
|
||||
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
|
||||
tmp
|
||||
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
|
||||
.foreach(
|
||||
r -> ((OafEntity) r.getPayload())
|
||||
.getMeasures()
|
||||
|
@ -214,7 +238,6 @@ public class SparkAtomicActionCountJobTest {
|
|||
"count",
|
||||
u.getKey()))));
|
||||
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"0",
|
||||
|
@ -413,6 +436,7 @@ public class SparkAtomicActionCountJobTest {
|
|||
.get(0)
|
||||
.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMatch() {
|
||||
String usageScoresPath = getClass()
|
||||
|
@ -490,7 +514,8 @@ public class SparkAtomicActionCountJobTest {
|
|||
"Inferred by OpenAIRE",
|
||||
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||
|
||||
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
|
||||
tmp
|
||||
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
|
||||
.foreach(
|
||||
r -> ((OafEntity) r.getPayload())
|
||||
.getMeasures()
|
||||
|
@ -505,7 +530,8 @@ public class SparkAtomicActionCountJobTest {
|
|||
"count",
|
||||
u.getKey()))));
|
||||
|
||||
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|"))
|
||||
tmp
|
||||
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|"))
|
||||
.foreach(
|
||||
r -> ((OafEntity) r.getPayload())
|
||||
.getMeasures()
|
||||
|
@ -520,7 +546,8 @@ public class SparkAtomicActionCountJobTest {
|
|||
"10|fake1",
|
||||
u.getKey()))));
|
||||
|
||||
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
|
||||
tmp
|
||||
.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
|
||||
.foreach(
|
||||
r -> ((OafEntity) r.getPayload())
|
||||
.getMeasures()
|
||||
|
|
Loading…
Reference in New Issue