[Measures] put the logic in common, no need to change the schema

This commit is contained in:
Miriam Baglioni 2022-04-21 11:27:26 +02:00
parent 5295effc96
commit c304657d91
3 changed files with 20 additions and 19 deletions

View File

@ -391,4 +391,19 @@ public class OafMapperUtils {
}
return null;
}
public static KeyValue newKeyValueInstance (String key, String value, DataInfo dataInfo){
KeyValue kv = new KeyValue();
kv.setDataInfo(dataInfo);
kv.setKey(key);
kv.setValue(value);
return kv;
}
public static Measure newMeasureInstance(String id, String value, String key, DataInfo dataInfo) {
Measure m = new Measure();
m.setId(id);
m.setUnit(Arrays.asList(newKeyValueInstance(key, value, dataInfo)));
return m;
}
}

View File

@ -3,23 +3,14 @@ package eu.dnetlib.dhp.actionmanager.usagestats;
import static eu.dnetlib.dhp.actionmanager.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.cxf.wsdl.service.factory.MethodNameSoapActionServiceConfiguration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -31,19 +22,15 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import lombok.val;
import scala.Tuple2;
/**
* created the Atomic Action for each tipe of results
@ -143,9 +130,8 @@ public class SparkAtomicActionUsageJob implements Serializable {
return Arrays
.asList(
Measure
.newInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo),
Measure.newInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo));
OafMapperUtils.newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo),
OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo));
}

View File

@ -801,7 +801,7 @@
<mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version>
<dhp-schemas.version>[2.11.34-SNAPSHOT]</dhp-schemas.version>
<dhp-schemas.version>[2.10.32]</dhp-schemas.version>
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>