[UsageCount] addition of usagecount for Projects and datasources. Extention of the action set created for the results with new entities for projects and datasources. Extention of the resource set and modification of the testing class

This commit is contained in:
Miriam Baglioni 2023-02-09 18:59:45 +01:00
parent d05ca53a14
commit 85e53fad00
9 changed files with 143 additions and 67 deletions

View File

@ -9,6 +9,8 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.schema.common.MainEntityType;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -28,9 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2; import scala.Tuple2;
@ -76,41 +75,37 @@ public class SparkAtomicActionUsageJob implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
prepareResults(dbname, spark, workingPath); prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats");
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats");
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats");
writeActionSet(spark, workingPath, outputPath); writeActionSet(spark, workingPath, outputPath);
}); });
} }
public static void prepareResults(String db, SparkSession spark, String workingPath) { private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName) {
spark spark
.sql( .sql(
"Select result_id, downloads, views " + "Select result_id, downloads, views " +
"from " + db + ".usage_stats") "from " + dbname + "." + tableName)
.as(Encoders.bean(UsageStatsModel.class)) .as(Encoders.bean(UsageStatsModel.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath); .json(workingPath);
} }
public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
readPath(spark, inputPath, UsageStatsModel.class) getFinalIndicatorsResult(spark, inputPath+ "/usageDb").
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING()) toJavaRDD().
.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> { map(p -> new AtomicAction(p.getClass(),p))
UsageStatsModel first = it.next(); .union(getFinalIndicatorsProject(spark, inputPath + "/projectDb")
it.forEachRemaining(us -> { .toJavaRDD()
first.setDownloads(first.getDownloads() + us.getDownloads()); .map(p -> new AtomicAction(p.getClass(), p )))
first.setViews(first.getViews() + us.getViews()); .union(getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb")
}); .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)))
Result res = new Result();
res.setId("50|" + k);
res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
return res;
}, Encoders.bean(Result.class))
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))))
@ -118,6 +113,54 @@ public class SparkAtomicActionUsageJob implements Serializable {
} }
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
return getUsageStatsModelDataset(spark, inputPath)
.map((MapFunction<UsageStatsModel, Result>) usm -> {
Result r = new Result();
r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
return r;
}, Encoders.bean(Result.class));
}
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {
return getUsageStatsModelDataset(spark, inputPath)
.map((MapFunction<UsageStatsModel, Project>) usm -> {
Project r = new Project();
r.setId("40|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
return r;
}, Encoders.bean(Project.class));
}
private static Dataset<Datasource> getFinalIndicatorsDatasource(SparkSession spark, String inputPath) {
return getUsageStatsModelDataset(spark, inputPath)
.map((MapFunction<UsageStatsModel, Datasource>) usm -> {
Datasource r = new Datasource();
r.setId("10|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
return r;
}, Encoders.bean(Datasource.class));
}
private static Dataset<UsageStatsModel> getUsageStatsModelDataset(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, UsageStatsModel.class)
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, UsageStatsModel, UsageStatsModel>) (k, it) -> {
UsageStatsModel first = it.next();
it.forEachRemaining(us -> {
first.setDownloads(first.getDownloads() + us.getDownloads());
first.setViews(first.getViews() + us.getViews());
});
first.setId(k);
return first;
}, Encoders.bean(UsageStatsModel.class));
}
private static List<Measure> getMeasure(Long downloads, Long views) { private static List<Measure> getMeasure(Long downloads, Long views) {
DataInfo dataInfo = OafMapperUtils DataInfo dataInfo = OafMapperUtils
.dataInfo( .dataInfo(

View File

@ -4,16 +4,16 @@ package eu.dnetlib.dhp.actionmanager.usagestats;
import java.io.Serializable; import java.io.Serializable;
public class UsageStatsModel implements Serializable { public class UsageStatsModel implements Serializable {
private String result_id; private String id;
private Long downloads; private Long downloads;
private Long views; private Long views;
public String getResult_id() { public String getId() {
return result_id; return id;
} }
public void setResult_id(String result_id) { public void setId(String id) {
this.result_id = result_id; this.id = id;
} }
public Long getDownloads() { public Long getDownloads() {

View File

@ -89,7 +89,7 @@
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg> <arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
<arg>--workingPath</arg><arg>${workingDir}/usageDb</arg> <arg>--workingPath</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -8,6 +8,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -68,24 +69,26 @@ public class SparkAtomicActionCountJobTest {
@Test @Test
void testMatch() { void testMatch() {
String usageScoresPath = getClass() String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") .getResource("/eu/dnetlib/dhp/actionmanager/usagestats")
.getPath(); .getPath();
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc JavaRDD<AtomicAction> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)) .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class));
.map(aa -> (Result) aa.getPayload()); //.map(aa -> (Result) aa.getPayload());
Assertions.assertEquals(9, tmp.count()); Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count());
Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count());
Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count());
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity)r.getPayload()).getMeasures().size()));
tmp tmp
.foreach( .foreach(
r -> r r -> ((OafEntity)r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -95,14 +98,14 @@ public class SparkAtomicActionCountJobTest {
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
tmp tmp
.foreach( .foreach(
r -> r r -> ((OafEntity)r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
tmp tmp
.foreach( .foreach(
r -> r r -> ((OafEntity)r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -113,7 +116,7 @@ public class SparkAtomicActionCountJobTest {
tmp tmp
.foreach( .foreach(
r -> r r -> ((OafEntity)r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -127,7 +130,7 @@ public class SparkAtomicActionCountJobTest {
u.getDataInfo().getProvenanceaction().getClassid())))); u.getDataInfo().getProvenanceaction().getClassid()))));
tmp tmp
.foreach( .foreach(
r -> r r -> ((OafEntity)r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -142,7 +145,7 @@ public class SparkAtomicActionCountJobTest {
tmp tmp
.foreach( .foreach(
r -> r r -> ((OafEntity)r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -157,12 +160,13 @@ public class SparkAtomicActionCountJobTest {
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); 1, tmp.filter(r -> ((OafEntity)r.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
Assertions Assertions
.assertEquals( .assertEquals(
"0", "0",
tmp tmp
.map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.collect() .collect()
.get(0) .get(0)
@ -178,7 +182,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"5", "5",
tmp tmp
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -194,7 +199,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"0", "0",
tmp tmp
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -209,7 +215,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"1", "1",
tmp tmp
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -225,7 +232,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"2", "2",
tmp tmp
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -240,7 +248,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"6", "6",
tmp tmp
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()

View File

@ -0,0 +1,12 @@
{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -0,0 +1,12 @@
{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -0,0 +1,12 @@
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -1,12 +0,0 @@
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}

View File

@ -807,7 +807,7 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[3.15.0]</dhp-schemas.version> <dhp-schemas.version>[3.15.1-SNAPSHOT]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version> <dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version> <dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version> <dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>