Compare commits

...

2 Commits

5 changed files with 601 additions and 1 deletions

View File

@ -0,0 +1,123 @@
package eu.dnetlib.dhp.oa.graph.clean;
/**
* @author miriam.baglioni
* @Date 28/06/22
*/
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Result;
public class CleanContextSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CleanContextSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
String contextId = parser.get("contextId");
log.info("contextId: {}", contextId);
String verifyParam = parser.get("verifyParam");
log.info("verifyParam: {}", verifyParam);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath);
});
}
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
String inputPath, Class<T> entityClazz, String workingPath) {
Dataset<T> res = spark
.read()
.textFile(inputPath)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz));
res.map((MapFunction<T, T>) r -> {
if (!r
.getTitle()
.stream()
.filter(
t -> t
.getQualifier()
.getClassid()
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) {
return r;
}
r
.setContext(
r
.getContext()
.stream()
.filter(
c -> !c.getId().split("::")[0]
.equalsIgnoreCase(contextId))
.collect(Collectors.toList()));
return r;
}, Encoders.bean(entityClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath);
spark
.read()
.textFile(workingPath)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath);
}
}

View File

@ -13,6 +13,23 @@
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>shouldCleanContext</name>
<description>true if the context have to be cleaned</description>
</property>
<property>
<name>contextId</name>
<value>sobigdata</value>
<description>It is the context id that should be removed from the result if the condition is matched.
Now it is just sobigdata. In a futere implementation I plan to have the contextId as value in a json
where to specify also the constraints that should be verified to remove the context from the result</description>
</property>
<property>
<name>verifyParam</name>
<value>gcube </value>
<description>It is the constrint to be verified. This time is hardcoded as gcube and it is searched for in
the title. If title starts with gcube than the context sobigdata will be removed by the result if present</description>
</property>
<property>
<name>sparkDriverMemory</name>
@ -275,7 +292,131 @@
<error to="Kill"/>
</action>
<join name="wait_clean" to="End"/>
<join name="wait_clean" to="clean_context"/>
<decision name="clean_context">
<switch>
<case to="fork_clean_context">${wf:conf('shouldCleanContext') eq true}</case>
<default to="End"/>
</switch>
</decision>
<fork name="fork_clean_context">
<path start="clean_publication_context"/>
<path start="clean_dataset_context"/>
<path start="clean_otherresearchproduct_context"/>
<path start="clean_software_context"/>
</fork>
<action name="clean_publication_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean publications context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_dataset_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean datasets Context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_otherresearchproduct_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean otherresearchproducts context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<action name="clean_software_context">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean softwares context</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
</spark>
<ok to="wait_clean_context"/>
<error to="Kill"/>
</action>
<join name="wait_clean_context" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,37 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "ci",
"paramLongName": "contextId",
"paramDescription": "the id of the context to be removed",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
},{
"paramName": "vf",
"paramLongName": "verifyParam",
"paramDescription": "the parameter to be verified to remove the context",
"paramRequired": true
}
]

View File

@ -0,0 +1,292 @@
package eu.dnetlib.dhp.oa.graph.clean;
/**
* @author miriam.baglioni
* @Date 28/06/22
*/
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class CleanContextTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(DumpJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(DumpJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testResultClean() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json")
.getPath();
final String prefix = "gcube ";
spark
.read()
.textFile(sourcePath)
.map(
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
Encoders.bean(Publication.class))
.write()
.json(workingDir.toString() + "/publication");
CleanContextSparkJob.main(new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--inputPath", workingDir.toString() + "/publication",
"-graphTableClassName", Publication.class.getCanonicalName(),
"-workingPath", workingDir.toString() + "/working",
"-contextId", "sobigdata",
"-verifyParam", "gCube "
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(7, tmp.count());
// original result with sobigdata context and gcube as starting string in the main title for the publication
Assertions
.assertEquals(
0,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
.collect()
.get(0)
.getContext()
.size());
// original result with sobigdata context without gcube as starting string in the main title for the publication
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
.collect()
.get(0)
.getContext()
.size());
Assertions
.assertEquals(
"sobigdata::projects::2",
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
.collect()
.get(0)
.getContext()
.get(0)
.getId());
// original result with sobigdata context with gcube as starting string in the subtitle
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
.collect()
.get(0)
.getContext()
.size());
Assertions
.assertEquals(
"sobigdata::projects::2",
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
.collect()
.get(0)
.getContext()
.get(0)
.getId());
List<StructuredProperty> titles = tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
.collect()
.get(0)
.getTitle();
Assertions.assertEquals(1, titles.size());
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
// original result with sobigdata context with gcube not as starting string in the main title
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
.collect()
.get(0)
.getContext()
.size());
Assertions
.assertEquals(
"sobigdata::projects::1",
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
.collect()
.get(0)
.getContext()
.get(0)
.getId());
titles = tmp
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
.collect()
.get(0)
.getTitle();
Assertions.assertEquals(1, titles.size());
Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix));
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()));
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
// original result with sobigdata in context and also other contexts with gcube as starting string for the main
// title
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
.collect()
.get(0)
.getContext()
.size());
Assertions
.assertEquals(
"dh-ch",
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
.collect()
.get(0)
.getContext()
.get(0)
.getId());
titles = tmp
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
.collect()
.get(0)
.getTitle();
Assertions.assertEquals(1, titles.size());
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
// original result with multiple main title one of which whith gcube as starting string and with 2 contextes
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
.collect()
.get(0)
.getContext()
.size());
Assertions
.assertEquals(
"dh-ch",
tmp
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
.collect()
.get(0)
.getContext()
.get(0)
.getId());
titles = tmp
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
.collect()
.get(0)
.getTitle();
Assertions.assertEquals(2, titles.size());
Assertions
.assertTrue(
titles
.stream()
.anyMatch(
t -> t.getQualifier().getClassid().equals("main title")
&& t.getValue().toLowerCase().startsWith(prefix)));
// original result without sobigdata in context with gcube as starting string for the main title
Assertions
.assertEquals(
1,
tmp
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
.collect()
.get(0)
.getContext()
.size());
Assertions
.assertEquals(
"dh-ch",
tmp
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
.collect()
.get(0)
.getContext()
.get(0)
.getId());
titles = tmp
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
.collect()
.get(0)
.getTitle();
Assertions.assertEquals(2, titles.size());
Assertions
.assertTrue(
titles
.stream()
.anyMatch(
t -> t.getQualifier().getClassid().equals("main title")
&& t.getValue().toLowerCase().startsWith(prefix)));
}
}