[UsageCount] refactoring

This commit is contained in:
Miriam Baglioni 2022-05-09 14:43:27 +02:00
parent a056f59c6e
commit 89657a0b78
2 changed files with 14 additions and 14 deletions

View File

@ -9,7 +9,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -27,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Measure;
@ -109,12 +109,12 @@ public class SparkAtomicActionUsageJob implements Serializable {
res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
return res;
}, Encoders.bean(Result.class))
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}

View File

@ -8,7 +8,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkAtomicActionCountJobTest {
@ -68,20 +68,20 @@ public class SparkAtomicActionCountJobTest {
@Test
void testMatch() {
String usageScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
.getPath();
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
.getPath();
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class))
.map(aa -> (Result) aa.getPayload());
JavaRDD<Result> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class))
.map(aa -> (Result) aa.getPayload());
Assertions.assertEquals(9, tmp.count());
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
tmp
.foreach(