Oozie workflow for cleancontext #216
|
@ -1,16 +1,13 @@
|
||||||
|
|
||||||
claudio.atzori marked this conversation as resolved
|
|||||||
package eu.dnetlib.dhp.oa.graph.clean;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import java.io.Serializable;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import java.util.List;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import java.util.Optional;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
import java.util.stream.Collectors;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
@ -21,93 +18,113 @@ import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
|
||||||
public class CleanContextSparkJob implements Serializable {
|
public class CleanContextSparkJob implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
CleanContextSparkJob.class
|
CleanContextSparkJob.class
|
||||||
.getResourceAsStream(
|
.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json"));
|
"/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json"));
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
Boolean isSparkSessionManaged = Optional
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
.map(Boolean::valueOf)
|
.map(Boolean::valueOf)
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
String inputPath = parser.get("inputPath");
|
String inputPath = parser.get("inputPath");
|
||||||
log.info("inputPath: {}", inputPath);
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
String workingPath = parser.get("workingPath");
|
String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
String contextId = parser.get("contextId");
|
String contextId = parser.get("contextId");
|
||||||
log.info("contextId: {}", contextId);
|
log.info("contextId: {}", contextId);
|
||||||
|
|
||||||
String verifyParam = parser.get("verifyParam");
|
String verifyParam = parser.get("verifyParam");
|
||||||
log.info("verifyParam: {}", verifyParam);
|
log.info("verifyParam: {}", verifyParam);
|
||||||
|
|
||||||
|
String graphTableClassName = parser.get("graphTableClassName");
|
||||||
|
log.info("graphTableClassName: {}", graphTableClassName);
|
||||||
|
|
||||||
String graphTableClassName = parser.get("graphTableClassName");
|
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
||||||
log.info("graphTableClassName: {}", graphTableClassName);
|
|
||||||
|
|
||||||
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
|
||||||
|
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
|
||||||
runWithSparkSession(
|
String inputPath, Class<T> entityClazz, String workingPath) {
|
||||||
conf,
|
Dataset<T> res = spark
|
||||||
isSparkSessionManaged,
|
.read()
|
||||||
spark -> {
|
.textFile(inputPath)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
|
Encoders.bean(entityClazz));
|
||||||
|
|
||||||
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath);
|
res.map((MapFunction<T, T>) r -> {
|
||||||
});
|
if (!r
|
||||||
}
|
.getTitle()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
t -> t
|
||||||
|
.getQualifier()
|
||||||
|
.getClassid()
|
||||||
|
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
|
||||||
|
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
r
|
||||||
|
.setContext(
|
||||||
|
r
|
||||||
|
.getContext()
|
||||||
|
.stream()
|
||||||
|
.filter(
|
||||||
|
c -> !c.getId().split("::")[0]
|
||||||
|
.equalsIgnoreCase(contextId))
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
return r;
|
||||||
|
}, Encoders.bean(entityClazz))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingPath);
|
||||||
|
|
||||||
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam, String inputPath, Class<T> entityClazz, String workingPath) {
|
spark
|
||||||
Dataset<T> res = spark
|
.read()
|
||||||
.read()
|
.textFile(workingPath)
|
||||||
.textFile(inputPath)
|
.map(
|
||||||
.map(
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
Encoders.bean(entityClazz))
|
||||||
Encoders.bean(entityClazz));
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
res.map((MapFunction<T, T>) r -> {
|
.option("compression", "gzip")
|
||||||
if(!r.getTitle()
|
.json(inputPath);
|
||||||
.stream()
|
}
|
||||||
.filter(t -> t.getQualifier().getClassid()
|
|
||||||
.equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid()))
|
|
||||||
.anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))){
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
r.setContext(r.getContext().stream().filter(c -> !c.getId().split("::")[0]
|
|
||||||
.equalsIgnoreCase(contextId)).collect(Collectors.toList()));
|
|
||||||
return r;
|
|
||||||
} ,Encoders.bean(entityClazz))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression","gzip")
|
|
||||||
.json(workingPath);
|
|
||||||
|
|
||||||
spark.read().textFile(workingPath).map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
|
|
||||||
Encoders.bean(entityClazz))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression","gzip")
|
|
||||||
.json(inputPath);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>shouldCleanContext</name>
|
<name>shouldCleanContext</name>
|
||||||
<value>false</value>
|
<description>true if the context have to be cleaned</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
claudio.atzori
commented
It should be better to include a description for this parameter, to explain its purpose and if it possible to include multiple contentIds, how they should be formatted. It should be better to include a description for this parameter, to explain its purpose and if it possible to include multiple contentIds, how they should be formatted.
miriam.baglioni
commented
This is just the first naive implementation of the context cleaning. I have no idea how it will be once done properly This is just the first naive implementation of the context cleaning. I have no idea how it will be once done properly
claudio.atzori
commented
It might be the 1st naive implementation, but looking at the oozie workflow, it is not obvious what a parameter plays when it is not accompanied by any description. It might be the 1st naive implementation, but looking at the oozie workflow, it is not obvious what a parameter plays when it is not accompanied by any description.
miriam.baglioni
commented
extended extended
|
|||||||
|
@ -312,9 +312,9 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/publication</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -339,9 +339,9 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/dataset</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -366,9 +366,9 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/otherresearchproduct</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -393,9 +393,9 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
|
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingDir}/working</arg>
|
<arg>--workingPath</arg><arg>${workingDir}/working/software</arg>
|
||||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
@ -404,6 +404,6 @@
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_clean_context" to="End"/>
|
<join name="wait_clean_context" to="End"/>
|
||||||
|
|
||||||
<end name="End"/>
|
<end name="End"/>
|
||||||
</workflow-app>
|
</workflow-app>
|
|
@ -1,18 +1,12 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.clean;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import java.io.IOException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import java.nio.file.Files;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
import java.nio.file.Path;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
|
import java.util.List;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
|
import java.util.Locale;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.Instance;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute;
|
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
@ -27,133 +21,280 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import java.nio.file.Files;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.List;
|
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
||||||
import java.util.Locale;
|
import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.dump.DumpProducts;
|
||||||
|
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.Instance;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
|
||||||
public class CleanContextTest {
|
public class CleanContextTest {
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class);
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void beforeAll() throws IOException {
|
public static void beforeAll() throws IOException {
|
||||||
workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName());
|
workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName());
|
||||||
log.info("using work dir {}", workingDir);
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.setAppName(DumpJobTest.class.getSimpleName());
|
conf.setAppName(DumpJobTest.class.getSimpleName());
|
||||||
|
|
||||||
conf.setMaster("local[*]");
|
conf.setMaster("local[*]");
|
||||||
conf.set("spark.driver.host", "localhost");
|
conf.set("spark.driver.host", "localhost");
|
||||||
conf.set("hive.metastore.local", "true");
|
conf.set("hive.metastore.local", "true");
|
||||||
conf.set("spark.ui.enabled", "false");
|
conf.set("spark.ui.enabled", "false");
|
||||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
spark = SparkSession
|
spark = SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(DumpJobTest.class.getSimpleName())
|
.appName(DumpJobTest.class.getSimpleName())
|
||||||
.config(conf)
|
.config(conf)
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void afterAll() throws IOException {
|
public static void afterAll() throws IOException {
|
||||||
FileUtils.deleteDirectory(workingDir.toFile());
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
spark.stop();
|
spark.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResultClean() throws Exception {
|
public void testResultClean() throws Exception {
|
||||||
final String sourcePath = getClass()
|
final String sourcePath = getClass()
|
||||||
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json")
|
.getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
final String prefix = "gcube ";
|
final String prefix = "gcube ";
|
||||||
|
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.textFile(sourcePath)
|
||||||
|
.map(
|
||||||
|
(MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class),
|
||||||
|
Encoders.bean(Publication.class))
|
||||||
|
.write()
|
||||||
|
.json(workingDir.toString() + "/publication");
|
||||||
|
|
||||||
spark.read().textFile(sourcePath).map((MapFunction<String, Publication>) r -> OBJECT_MAPPER.readValue(r, Publication.class), Encoders.bean(Publication.class))
|
CleanContextSparkJob.main(new String[] {
|
||||||
.write().json(workingDir.toString() + "/publication");
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"--inputPath", workingDir.toString() + "/publication",
|
||||||
|
"-graphTableClassName", Publication.class.getCanonicalName(),
|
||||||
|
"-workingPath", workingDir.toString() + "/working",
|
||||||
|
"-contextId", "sobigdata",
|
||||||
|
"-verifyParam", "gCube "
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
JavaRDD<Publication> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/publication")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||||
|
|
||||||
CleanContextSparkJob.main(new String[] {
|
Assertions.assertEquals(7, tmp.count());
|
||||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
|
||||||
"--inputPath", workingDir.toString() + "/publication",
|
|
||||||
"-graphTableClassName", Publication.class.getCanonicalName(),
|
|
||||||
"-workingPath", workingDir.toString() + "/working",
|
|
||||||
"-contextId","sobigdata",
|
|
||||||
"-verifyParam","gCube "
|
|
||||||
});
|
|
||||||
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
// original result with sobigdata context and gcube as starting string in the main title for the publication
|
||||||
JavaRDD<Publication> tmp = sc
|
Assertions
|
||||||
.textFile(workingDir.toString() + "/publication")
|
.assertEquals(
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
0,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
|
||||||
Assertions.assertEquals(7, tmp.count());
|
// original result with sobigdata context without gcube as starting string in the main title for the publication
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"sobigdata::projects::2",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
|
||||||
//original result with sobigdata context and gcube as starting string in the main title for the publication
|
// original result with sobigdata context with gcube as starting string in the subtitle
|
||||||
Assertions.assertEquals(0,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"sobigdata::projects::2",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
List<StructuredProperty> titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(1, titles.size());
|
||||||
|
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||||
|
Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
|
||||||
|
|
||||||
//original result with sobigdata context without gcube as starting string in the main title for the publication
|
// original result with sobigdata context with gcube not as starting string in the main title
|
||||||
Assertions.assertEquals(1,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().get(0).getId() );
|
1,
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"sobigdata::projects::1",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(1, titles.size());
|
||||||
|
Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||||
|
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()));
|
||||||
|
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
||||||
|
|
||||||
//original result with sobigdata context with gcube as starting string in the subtitle
|
// original result with sobigdata in context and also other contexts with gcube as starting string for the main
|
||||||
Assertions.assertEquals(1,
|
// title
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().size());
|
Assertions
|
||||||
Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().get(0).getId() );
|
.assertEquals(
|
||||||
List<StructuredProperty> titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getTitle();
|
1,
|
||||||
Assertions.assertEquals(1, titles.size());
|
tmp
|
||||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) );
|
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||||
Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"dh-ch",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(1, titles.size());
|
||||||
|
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
|
||||||
|
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
||||||
|
|
||||||
//original result with sobigdata context with gcube not as starting string in the main title
|
// original result with multiple main title one of which whith gcube as starting string and with 2 contextes
|
||||||
Assertions.assertEquals(1,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
Assertions.assertEquals("sobigdata::projects::1",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().get(0).getId() );
|
1,
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getTitle();
|
tmp
|
||||||
Assertions.assertEquals(1, titles.size());
|
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||||
Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix) );
|
.collect()
|
||||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()) );
|
.get(0)
|
||||||
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"dh-ch",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(2, titles.size());
|
||||||
|
Assertions
|
||||||
|
.assertTrue(
|
||||||
|
titles
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
t -> t.getQualifier().getClassid().equals("main title")
|
||||||
|
&& t.getValue().toLowerCase().startsWith(prefix)));
|
||||||
|
|
||||||
//original result with sobigdata in context and also other contexts with gcube as starting string for the main title
|
// original result without sobigdata in context with gcube as starting string for the main title
|
||||||
Assertions.assertEquals(1,
|
Assertions
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().size());
|
.assertEquals(
|
||||||
Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().get(0).getId() );
|
1,
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getTitle();
|
tmp
|
||||||
Assertions.assertEquals(1, titles.size());
|
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||||
Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) );
|
.collect()
|
||||||
Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid());
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.size());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"dh-ch",
|
||||||
|
tmp
|
||||||
|
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getContext()
|
||||||
|
.get(0)
|
||||||
|
.getId());
|
||||||
|
titles = tmp
|
||||||
|
.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getTitle();
|
||||||
|
Assertions.assertEquals(2, titles.size());
|
||||||
|
|
||||||
//original result with multiple main title one of which whith gcube as starting string and with 2 contextes
|
Assertions
|
||||||
Assertions.assertEquals(1,
|
.assertTrue(
|
||||||
tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().size());
|
titles
|
||||||
Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().get(0).getId() );
|
.stream()
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getTitle();
|
.anyMatch(
|
||||||
Assertions.assertEquals(2, titles.size());
|
t -> t.getQualifier().getClassid().equals("main title")
|
||||||
Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix)) );
|
&& t.getValue().toLowerCase().startsWith(prefix)));
|
||||||
|
|
||||||
|
}
|
||||||
//original result without sobigdata in context with gcube as starting string for the main title
|
|
||||||
Assertions.assertEquals(1,
|
|
||||||
tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().size());
|
|
||||||
Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().get(0).getId() );
|
|
||||||
titles = tmp.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getTitle();
|
|
||||||
Assertions.assertEquals(2, titles.size());
|
|
||||||
|
|
||||||
Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix)));
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
At first glance, this class doesn't seem to include any significant change. If it was not changed, please revert to its original formatting. Otherwise the diff just creates noise.
This comment is outdated. It seems you did not issue
git pull
before introducing these further changes thus you did not get the reformatted file CleanContextSparkJob.java.