[UsageCount] make it as an action set as it should be, plus changed the test to make them work as well now

This commit is contained in:
Miriam Baglioni 2022-05-09 12:51:35 +02:00
parent 658450d9a3
commit a056f59c6e
2 changed files with 21 additions and 17 deletions

View File

@ -9,7 +9,10 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
@ -29,6 +32,7 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/** /**
* created the Atomic Action for each type of results * created the Atomic Action for each type of results
@ -73,7 +77,7 @@ public class SparkAtomicActionUsageJob implements Serializable {
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
prepareResults(dbname, spark, workingPath); prepareResults(dbname, spark, workingPath);
prepareActionSet(spark, workingPath, outputPath); writeActionSet(spark, workingPath, outputPath);
}); });
} }
@ -89,7 +93,7 @@ public class SparkAtomicActionUsageJob implements Serializable {
.json(workingPath); .json(workingPath);
} }
public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) { public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
readPath(spark, inputPath, UsageStatsModel.class) readPath(spark, inputPath, UsageStatsModel.class)
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING()) .groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> { .mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> {
@ -105,10 +109,13 @@ public class SparkAtomicActionUsageJob implements Serializable {
res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
return res; return res;
}, Encoders.bean(Result.class)) }, Encoders.bean(Result.class))
.write() .toJavaRDD()
.mode(SaveMode.Overwrite) .map(p -> new AtomicAction(p.getClass(), p))
.option("compression", "gzip") .mapToPair(
.json(outputPath); aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
private static List<Measure> getMeasure(Long downloads, Long views) { private static List<Measure> getMeasure(Long downloads, Long views) {

View File

@ -8,14 +8,12 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -26,8 +24,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkAtomicActionCountJobTest { public class SparkAtomicActionCountJobTest {
@ -72,19 +68,20 @@ public class SparkAtomicActionCountJobTest {
@Test @Test
void testMatch() { void testMatch() {
String usageScoresPath = getClass() String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
.getPath(); .getPath();
SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc JavaRDD<Result> tmp = sc.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.textFile(workingDir.toString() + "/actionSet") .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class))
.map(usm -> OBJECT_MAPPER.readValue(usm, Result.class)); .map(aa -> (Result) aa.getPayload());
Assertions.assertEquals(9, tmp.count()); Assertions.assertEquals(9, tmp.count());
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
tmp tmp
.foreach( .foreach(