[UsageCount] split the count for result at the level of the datasource. for each indicator one unit is specified for each datasource contrinuting to that indicator value. The datasource key is the value of the key element in the unit for the measure, while the count for that datasource is in the value

This commit is contained in:
Miriam Baglioni 2023-06-30 18:39:30 +02:00
parent 9963fd6d29
commit 55ea485783
10 changed files with 604 additions and 128 deletions

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@ -14,6 +15,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -73,13 +75,28 @@ public class SparkAtomicActionUsageJob implements Serializable {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id");
prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id");
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id");
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id");
writeActionSet(spark, workingPath, outputPath);
});
}
private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) {
spark
.sql(
String
.format(
"select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " +
"from %s.%s group by %s, %s",
resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName))
.as(Encoders.bean(UsageStatsResultModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
}
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName,
String attribute_name) {
spark
@ -114,16 +131,55 @@ public class SparkAtomicActionUsageJob implements Serializable {
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
public static Measure newMeasureInstance(String id) {
Measure m = new Measure();
m.setId(id);
m.setUnit(new ArrayList<>());
return m;
}
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, UsageStatsModel.class)
.map((MapFunction<UsageStatsModel, Result>) usm -> {
return readPath(spark, inputPath, UsageStatsResultModel.class)
.groupByKey((MapFunction<UsageStatsResultModel, String>) usm -> usm.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, UsageStatsResultModel, Result>) (k,it) -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
r.setId("50|" + k);
//id = download or view and unit = list of key value pairs
Measure download = newMeasureInstance("downloads");
Measure view = newMeasureInstance("views");
UsageStatsResultModel first = it.next();
addCountForDatasource(download, first, view);
it.forEachRemaining(usm -> {
addCountForDatasource(download, usm, view);
});
r.setMeasures(Arrays.asList(download, view));
return r;
}, Encoders.bean(Result.class));
}, Encoders.bean(Result.class))
// .map((MapFunction<UsageStatsResultModel, Result>) usm -> {
// Result r = new Result();
// r.setId("50|" + usm.getId());
// r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
// return r;
// }, Encoders.bean(Result.class));
;
}
private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) {
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"");
download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo));
view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo));
}
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {

View File

@ -0,0 +1,17 @@
package eu.dnetlib.dhp.actionmanager.usagestats;
/**
* @author miriam.baglioni
* @Date 30/06/23
*/
public class UsageStatsResultModel extends UsageStatsModel{
private String datasourceId ;
public String getDatasourceId() {
return datasourceId;
}
public void setDatasourceId(String datasourceId) {
this.datasourceId = datasourceId;
}
}

View File

@ -8,6 +8,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.Measure;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
@ -66,10 +67,356 @@ public class SparkAtomicActionCountJobTest {
spark.stop();
}
@Test
void testUsageStatsDb2() {
String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test2")
.getPath();
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<AtomicAction> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class));
// .map(aa -> (Result) aa.getPayload());
Assertions.assertEquals(7, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count());
Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count());
Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count());
tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size()));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"measure:usage_counts",
u.getDataInfo().getProvenanceaction().getClassid()))));
tmp
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"Inferred by OpenAIRE",
u.getDataInfo().getProvenanceaction().getClassname()))));
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
Assertions
.assertEquals(
1,
tmp
.filter(
r -> ((OafEntity) r.getPayload())
.getId()
.equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.count());
OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first()
.getPayload();
entity
.getMeasures()
.stream()
.forEach(
m -> Assertions.assertEquals(3, m.getUnit().size() ));
Measure downloads = entity.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.findFirst()
.get();
Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
Measure views = entity.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.findFirst()
.get();
Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue());
Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue());
Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue());
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"5",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"1",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"2",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"6",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"5",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"1",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"2",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"6",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
}
@Test
void testMatch() {
String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats")
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1")
.getPath();
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
@ -143,7 +490,37 @@ public class SparkAtomicActionCountJobTest {
"Inferred by OpenAIRE",
u.getDataInfo().getProvenanceaction().getClassname()))));
tmp
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"10|fake1",
u.getKey()))));
tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|"))
.foreach(
r -> ((OafEntity) r.getPayload())
.getMeasures()
@ -465,5 +842,4 @@ public class SparkAtomicActionCountJobTest {
.get(0)
.getValue());
}
}

View File

@ -0,0 +1,9 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5}
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","datasourceId":"10|fake1","downloads":0,"views":1}
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","datasourceId":"10|fake1","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3}

View File

@ -0,0 +1,9 @@
{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -0,0 +1,9 @@
{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -0,0 +1,9 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5}
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake2","downloads":0,"views":1}
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake3","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3}

View File

@ -1,9 +0,0 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5}
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}