forked from antonis.lempesis/dnet-hadoop
Merge branch 'beta' into 7096-fileGZip-collector-plugin
This commit is contained in:
commit
30105f0722
|
@ -391,4 +391,19 @@ public class OafMapperUtils {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static KeyValue newKeyValueInstance(String key, String value, DataInfo dataInfo) {
|
||||||
|
KeyValue kv = new KeyValue();
|
||||||
|
kv.setDataInfo(dataInfo);
|
||||||
|
kv.setKey(key);
|
||||||
|
kv.setValue(value);
|
||||||
|
return kv;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Measure newMeasureInstance(String id, String value, String key, DataInfo dataInfo) {
|
||||||
|
Measure m = new Measure();
|
||||||
|
m.setId(id);
|
||||||
|
m.setUnit(Arrays.asList(newKeyValueInstance(key, value, dataInfo)));
|
||||||
|
return m;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ public class Constants {
|
||||||
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
||||||
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
||||||
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
||||||
|
public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts";
|
||||||
|
public static final String UPDATE_KEY_USAGE_COUNTS = "count";
|
||||||
|
|
||||||
public static final String FOS_CLASS_ID = "FOS";
|
public static final String FOS_CLASS_ID = "FOS";
|
||||||
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* created the Atomic Action for each type of results
|
||||||
|
*/
|
||||||
|
public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionUsageJob.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static <I extends Result> void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkAtomicActionUsageJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
|
|
||||||
|
final String dbname = parser.get("usagestatsdb");
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
|
||||||
|
runWithSparkHiveSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
prepareResults(dbname, spark, workingPath);
|
||||||
|
prepareActionSet(spark, workingPath, outputPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void prepareResults(String db, SparkSession spark, String workingPath) {
|
||||||
|
spark
|
||||||
|
.sql(
|
||||||
|
"Select result_id, downloads, views " +
|
||||||
|
"from " + db + ".usage_stats")
|
||||||
|
.as(Encoders.bean(UsageStatsModel.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
readPath(spark, inputPath, UsageStatsModel.class)
|
||||||
|
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> {
|
||||||
|
UsageStatsModel first = it.next();
|
||||||
|
it.forEachRemaining(us -> {
|
||||||
|
first.setDownloads(first.getDownloads() + us.getDownloads());
|
||||||
|
first.setViews(first.getViews() + us.getViews());
|
||||||
|
});
|
||||||
|
|
||||||
|
Result res = new Result();
|
||||||
|
res.setId("50|" + k);
|
||||||
|
|
||||||
|
res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
|
||||||
|
return res;
|
||||||
|
}, Encoders.bean(Result.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Measure> getMeasure(Long downloads, Long views) {
|
||||||
|
DataInfo dataInfo = OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false,
|
||||||
|
UPDATE_DATA_INFO_TYPE,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
|
||||||
|
UPDATE_CLASS_NAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"");
|
||||||
|
|
||||||
|
return Arrays
|
||||||
|
.asList(
|
||||||
|
OafMapperUtils
|
||||||
|
.newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo),
|
||||||
|
OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R> Dataset<R> readPath(
|
||||||
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class UsageStatsModel implements Serializable {
|
||||||
|
private String result_id;
|
||||||
|
private Long downloads;
|
||||||
|
private Long views;
|
||||||
|
|
||||||
|
public String getResult_id() {
|
||||||
|
return result_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResult_id(String result_id) {
|
||||||
|
this.result_id = result_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getDownloads() {
|
||||||
|
return downloads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDownloads(Long downloads) {
|
||||||
|
this.downloads = downloads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getViews() {
|
||||||
|
return views;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setViews(Long views) {
|
||||||
|
this.views = views;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hmu",
|
||||||
|
"paramLongName": "hive_metastore_uris",
|
||||||
|
"paramDescription": "the URI for the hive metastore",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "sdb",
|
||||||
|
"paramLongName": "usagestatsdb",
|
||||||
|
"paramDescription": "the name of the db to be used",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "wp",
|
||||||
|
"paramLongName": "workingPath",
|
||||||
|
"paramDescription": "the workingPath where to save the content of the usage_stats table",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,30 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveJdbcUrl</name>
|
||||||
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveDbName</name>
|
||||||
|
<value>openaire</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,99 @@
|
||||||
|
<workflow-app name="UsageStatsCounts" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the path where to store the actionset</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>usagestatsdb</name>
|
||||||
|
<description>the name of the db to be used</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozieActionShareLibForSpark2</name>
|
||||||
|
<description>oozie action sharelib for spark 2.*</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
<description>spark 2.* extra listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
<description>spark 2.* sql query execution listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<description>spark 2.* yarn history server address</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<description>spark 2.* event log dir location</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
<start to="atomicactions"/>
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="atomicactions">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the usage stats count for results</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.usagestats.SparkAtomicActionUsageJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}/usageDb</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,259 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
|
public class SparkAtomicActionCountJobTest {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(SparkAtomicActionCountJobTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(SparkAtomicActionCountJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(SparkAtomicActionCountJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkAtomicActionCountJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMatch() {
|
||||||
|
String usageScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Result> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/actionSet")
|
||||||
|
.map(usm -> OBJECT_MAPPER.readValue(usm, Result.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(9, tmp.count());
|
||||||
|
|
||||||
|
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"measure:usage_counts",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassid()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"Inferred by OpenAIRE",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"count",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"5",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"2",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"6",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
|
||||||
|
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
|
||||||
|
{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
|
||||||
|
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
|
||||||
|
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
|
||||||
|
{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
|
||||||
|
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
|
||||||
|
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
|
||||||
|
{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}
|
|
@ -59,52 +59,6 @@ object SparkGenerateDoiBoost {
|
||||||
val workingDirPath = parser.get("workingPath")
|
val workingDirPath = parser.get("workingPath")
|
||||||
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
|
val openaireOrganizationPath = parser.get("openaireOrganizationPath")
|
||||||
|
|
||||||
val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable {
|
|
||||||
override def zero: Publication = new Publication
|
|
||||||
|
|
||||||
override def reduce(b: Publication, a: (String, Publication)): Publication = {
|
|
||||||
|
|
||||||
if (b == null) {
|
|
||||||
if (a != null && a._2 != null) {
|
|
||||||
a._2.setId(a._1)
|
|
||||||
return a._2
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (a != null && a._2 != null) {
|
|
||||||
b.mergeFrom(a._2)
|
|
||||||
b.setId(a._1)
|
|
||||||
val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
|
|
||||||
b.setAuthor(authors)
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
}
|
|
||||||
new Publication
|
|
||||||
}
|
|
||||||
|
|
||||||
override def merge(b1: Publication, b2: Publication): Publication = {
|
|
||||||
if (b1 == null) {
|
|
||||||
if (b2 != null)
|
|
||||||
return b2
|
|
||||||
} else {
|
|
||||||
if (b2 != null) {
|
|
||||||
b1.mergeFrom(b2)
|
|
||||||
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
|
|
||||||
b1.setAuthor(authors)
|
|
||||||
if (b2.getId != null && b2.getId.nonEmpty)
|
|
||||||
b1.setId(b2.getId)
|
|
||||||
return b1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
new Publication
|
|
||||||
}
|
|
||||||
|
|
||||||
override def finish(reduction: Publication): Publication = reduction
|
|
||||||
|
|
||||||
override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
|
||||||
|
|
||||||
override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication]
|
|
||||||
}
|
|
||||||
|
|
||||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
||||||
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||||
|
@ -175,8 +129,33 @@ object SparkGenerateDoiBoost {
|
||||||
.map(DoiBoostMappingUtil.fixPublication)
|
.map(DoiBoostMappingUtil.fixPublication)
|
||||||
.map(p => (p.getId, p))
|
.map(p => (p.getId, p))
|
||||||
.groupByKey(_._1)
|
.groupByKey(_._1)
|
||||||
.agg(crossrefAggregator.toColumn)
|
.reduceGroups((left, right) => {
|
||||||
.map(p => p._2)
|
//Check left is not null
|
||||||
|
if (left != null && left._1 != null) {
|
||||||
|
//If right is null then return left
|
||||||
|
if (right == null || right._2 == null)
|
||||||
|
left
|
||||||
|
else {
|
||||||
|
// Here Left and Right are not null
|
||||||
|
// So we have to merge
|
||||||
|
val b1 = left._2
|
||||||
|
val b2 = right._2
|
||||||
|
b1.mergeFrom(b2)
|
||||||
|
b1.mergeOAFDataInfo(b2)
|
||||||
|
val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
|
||||||
|
b1.setAuthor(authors)
|
||||||
|
if (b2.getId != null && b2.getId.nonEmpty)
|
||||||
|
b1.setId(b2.getId)
|
||||||
|
//Return publication Merged
|
||||||
|
(b1.getId, b1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Left is Null so we return right
|
||||||
|
right
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.filter(s => s != null && s._2 != null)
|
||||||
|
.map(s => s._2._2)
|
||||||
.write
|
.write
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(s"$workingDirPath/doiBoostPublicationFiltered")
|
.save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||||
|
|
|
@ -1,16 +1,13 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.clean;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import java.io.Serializable;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import java.util.List;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import java.util.Optional;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
import java.util.stream.Collectors;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
@ -21,93 +18,113 @@ import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
public class CleanContextSparkJob implements Serializable {
|
public class CleanContextSparkJob implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
CleanContextSparkJob.class
|
CleanContextSparkJob.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json"));
|
"/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json"));
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
Boolean isSparkSessionManaged = Optional
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingPath = parser.get("workingPath");
|
String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
String contextId = parser.get("contextId");
|
String contextId = parser.get("contextId");
|
||||||
log.info("contextId: {}", contextId);
|
log.info("contextId: {}", contextId);
|
||||||
|
|
||||||
String verifyParam = parser.get("verifyParam");
|
String verifyParam = parser.get("verifyParam");
|
||||||
log.info("verifyParam: {}", verifyParam);
|
log.info("verifyParam: {}", verifyParam);
|
||||||
|
|
||||||
|
String graphTableClassName = parser.get("graphTableClassName");
|
||||||
|
log.info("graphTableClassName: {}", graphTableClassName);
|
||||||
|
|
||||||
String graphTableClassName = parser.get("graphTableClassName");
|
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||||
log.info("graphTableClassName: {}", graphTableClassName);
|
|
||||||
|
|
||||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
|
||||||
|
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
|
||||||
runWithSparkSession(
|
String inputPath, Class<T> entityClazz, String workingPath) {
|
||||||
conf,
|
Dataset<T> res = spark
|
||||||
isSparkSessionManaged,
|
.read()
|
||||||
spark -> {
|
.textFile(inputPath)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
|
Encoders.bean(entityClazz));
|
||||||
|
|
||||||
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath);
|
res.map((MapFunction<T, T>) r -> {
|
||||||
});
|
if (!r
|
||||||
}
|
.getTitle()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
t -> t
|
||||||
|
.getQualifier()
|
||||||
|
.getClassid()
|
||||||
|
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
|
||||||
|
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
r
|
||||||
|
.setContext(
|
||||||
|
r
|
||||||
|
.getContext()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
c -> !c.getId().split("::")[0]
|
||||||
|
.equalsIgnoreCase(contextId))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
return r;
|
||||||
|
}, Encoders.bean(entityClazz))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingPath);
|
||||||
|
|
||||||
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam, String inputPath, Class<T> entityClazz, String workingPath) {
|
spark
|
||||||
Dataset<T> res = spark
|
.read()
|
||||||
.read()
|
.textFile(workingPath)
|
||||||
.textFile(inputPath)
|
.map(
|
||||||
.map(
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
Encoders.bean(entityClazz))
|
||||||
Encoders.bean(entityClazz));
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
res.map((MapFunction<T, T>) r -> {
|
.option("compression", "gzip")
|
||||||
if(!r.getTitle()
|
.json(inputPath);
|
||||||
.stream()
|
}
|
||||||
.filter(t -> t.getQualifier().getClassid()
|
|
||||||
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
|
|
||||||
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))){
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
r.setContext(r.getContext().stream().filter(c -> !c.getId().split("::")[0]
|
|
||||||
.equalsIgnoreCase(contextId)).collect(Collectors.toList()));
|
|
||||||
return r;
|
|
||||||
} ,Encoders.bean(entityClazz))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression","gzip")
|
|
||||||
.json(workingPath);
|
|
||||||
|
|
||||||
spark.read().textFile(workingPath).map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
|
||||||
Encoders.bean(entityClazz))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression","gzip")
|
|
||||||
.json(inputPath);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,12 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.clean;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import java.io.IOException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import java.nio.file.Files;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
import java.nio.file.Path;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
|
import java.util.List;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
|
import java.util.Locale;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.Instance;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
@ -27,133 +21,280 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import java.nio.file.Files;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.List;
|
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
||||||
import java.util.Locale;
|
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.Instance;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
|
||||||
public class CleanContextTest {
|
public class CleanContextTest {
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void beforeAll() throws IOException {
|
public static void beforeAll() throws IOException {
|
||||||
workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName());
|
workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName());
|
||||||
log.info("using work dir {}", workingDir);
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.setAppName(DumpJobTest.class.getSimpleName());
|
conf.setAppName(DumpJobTest.class.getSimpleName());
|
||||||
|
|
||||||
conf.setMaster("local[*]");
|
conf.setMaster("local[*]");
|
||||||
conf.set("spark.driver.host", "localhost");
|
conf.set("spark.driver.host", "localhost");
|
||||||
conf.set("hive.metastore.local", "true");
|
conf.set("hive.metastore.local", "true");
|
||||||
conf.set("spark.ui.enabled", "false");
|
conf.set("spark.ui.enabled", "false");
|
||||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
spark = SparkSession
|
spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(DumpJobTest.class.getSimpleName())
|
.appName(DumpJobTest.class.getSimpleName())
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void afterAll() throws IOException {
|
public static void afterAll() throws IOException {
|
||||||
FileUtils.deleteDirectory(workingDir.toFile());
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
spark.stop();
|
spark.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResultClean() throws Exception {
|
public void testResultClean() throws Exception {
|
||||||
final String sourcePath = getClass()
|
final String sourcePath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json")
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
final String prefix = "gcube ";
|
final String prefix = "gcube ";
|
||||||
|
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.textFile(sourcePath)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
||||||
|
Encoders.bean(Publication.class))
|
||||||
|
.write()
|
||||||
|
.json(workingDir.toString() + "/publication");
|
||||||
|
|
||||||
spark.read().textFile(sourcePath).map((MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class), Encoders.bean(Publication.class))
|
CleanContextSparkJob.main(new String[] {
|
||||||
.write().json(workingDir.toString() + "/publication");
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"--inputPath", workingDir.toString() + "/publication",
|
||||||
|
"-graphTableClassName", Publication.class.getCanonicalName(),
|
||||||
|
"-workingPath", workingDir.toString() + "/working",
|
||||||
|
"-contextId", "sobigdata",
|
||||||
|
"-verifyParam", "gCube "
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
JavaRDD<Publication> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/publication")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||||
|
|
||||||
CleanContextSparkJob.main(new String[] {
|
Assertions.assertEquals(7, tmp.count());
|
||||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
||||||
"--inputPath", workingDir.toString() + "/publication",
|
|
||||||
"-graphTableClassName", Publication.class.getCanonicalName(),
|
|
||||||
"-workingPath", workingDir.toString() + "/working",
|
|
||||||
"-contextId","sobigdata",
|
|
||||||
"-verifyParam","gCube "
|
|
||||||
});
|
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
// original result with sobigdata context and gcube as starting string in the main title for the publication
|
||||||
JavaRDD<Publication> tmp = sc
|
Assertions
|
||||||
.textFile(workingDir.toString() + "/publication")
|
.assertEquals(
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
0,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
|
||||||
Assertions.assertEquals(7, tmp.count());
|
// original result with sobigdata context without gcube as starting string in the main title for the publication
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"sobigdata::projects::2",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
|
||||||
//original result with sobigdata context and gcube as starting string in the main title for the publication
|
// original result with sobigdata context with gcube as starting string in the subtitle
|
||||||
Assertions.assertEquals(0,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"sobigdata::projects::2",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
List<StructuredProperty> titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(1, titles.size());
|
||||||
|
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||||
|
Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
|
||||||
|
|
||||||
//original result with sobigdata context without gcube as starting string in the main title for the publication
|
// original result with sobigdata context with gcube not as starting string in the main title
|
||||||
Assertions.assertEquals(1,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().get(0).getId() );
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"sobigdata::projects::1",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(1, titles.size());
|
||||||
|
Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||||
|
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()));
|
||||||
|
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
||||||
|
|
||||||
//original result with sobigdata context with gcube as starting string in the subtitle
|
// original result with sobigdata in context and also other contexts with gcube as starting string for the main
|
||||||
Assertions.assertEquals(1,
|
// title
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().size());
|
Assertions
|
||||||
Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().get(0).getId() );
|
.assertEquals(
|
||||||
List<StructuredProperty> titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getTitle();
|
1,
|
||||||
Assertions.assertEquals(1, titles.size());
|
tmp
|
||||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) );
|
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||||
Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"dh-ch",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(1, titles.size());
|
||||||
|
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||||
|
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
||||||
|
|
||||||
//original result with sobigdata context with gcube not as starting string in the main title
|
// original result with multiple main title one of which whith gcube as starting string and with 2 contextes
|
||||||
Assertions.assertEquals(1,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
Assertions.assertEquals("sobigdata::projects::1",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().get(0).getId() );
|
1,
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getTitle();
|
tmp
|
||||||
Assertions.assertEquals(1, titles.size());
|
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||||
Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix) );
|
.collect()
|
||||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()) );
|
.get(0)
|
||||||
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"dh-ch",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(2, titles.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
titles
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
t -> t.getQualifier().getClassid().equals("main title")
|
||||||
|
&& t.getValue().toLowerCase().startsWith(prefix)));
|
||||||
|
|
||||||
//original result with sobigdata in context and also other contexts with gcube as starting string for the main title
|
// original result without sobigdata in context with gcube as starting string for the main title
|
||||||
Assertions.assertEquals(1,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().get(0).getId() );
|
1,
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getTitle();
|
tmp
|
||||||
Assertions.assertEquals(1, titles.size());
|
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) );
|
.collect()
|
||||||
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"dh-ch",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(2, titles.size());
|
||||||
|
|
||||||
//original result with multiple main title one of which whith gcube as starting string and with 2 contextes
|
Assertions
|
||||||
Assertions.assertEquals(1,
|
.assertTrue(
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().size());
|
titles
|
||||||
Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().get(0).getId() );
|
.stream()
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getTitle();
|
.anyMatch(
|
||||||
Assertions.assertEquals(2, titles.size());
|
t -> t.getQualifier().getClassid().equals("main title")
|
||||||
Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix)) );
|
&& t.getValue().toLowerCase().startsWith(prefix)));
|
||||||
|
|
||||||
|
}
|
||||||
//original result without sobigdata in context with gcube as starting string for the main title
|
|
||||||
Assertions.assertEquals(1,
|
|
||||||
tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().size());
|
|
||||||
Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().get(0).getId() );
|
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getTitle();
|
|
||||||
Assertions.assertEquals(2, titles.size());
|
|
||||||
|
|
||||||
Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix)));
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue