Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Miriam Baglioni 2020-04-17 12:34:31 +02:00
commit adc11c97a7
92 changed files with 1695 additions and 765 deletions

View File

@ -0,0 +1,15 @@
package eu.dnetlib.dhp.schema.scholexplorer;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class DLIRelation extends Relation {
private String dateOfCollection;
public String getDateOfCollection() {
return dateOfCollection;
}
public void setDateOfCollection(String dateOfCollection) {
this.dateOfCollection = dateOfCollection;
}
}

View File

@ -53,5 +53,10 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,126 @@
package eu.dnetlib.dhp.actionmanager;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import eu.dnetlib.actionmanager.rmi.ActionManagerException;
import eu.dnetlib.actionmanager.set.ActionManagerSet;
import eu.dnetlib.actionmanager.set.ActionManagerSet.ImpactTypes;
import eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJob;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
public class ISClient implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PartitionActionSetsByPayloadTypeJob.class);
private static final String INPUT_ACTION_SET_ID_SEPARATOR = ",";
private ISLookUpService isLookup;
public ISClient(String isLookupUrl) {
isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
}
public List<String> getLatestRawsetPaths(String setIds) {
List<String> ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR)
.omitEmptyStrings()
.trimResults()
.split(setIds));
return ids.stream()
.map(id -> getSet(isLookup, id))
.map(as -> as.getPathToLatest())
.collect(Collectors.toCollection(ArrayList::new));
}
private ActionManagerSet getSet(ISLookUpService isLookup, final String setId) {
final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
+ "where $x//SET/@id = '" + setId + "' return $x";
try {
final String basePath = getBasePathHDFS(isLookup);
final String setProfile = isLookup.getResourceProfileByQuery(q);
return getActionManagerSet(basePath, setProfile);
} catch (ISLookUpException | ActionManagerException e) {
throw new RuntimeException("Error accessing Sets, using query: " + q);
}
}
private ActionManagerSet getActionManagerSet(final String basePath, final String profile) throws ActionManagerException {
final SAXReader reader = new SAXReader();
final ActionManagerSet set = new ActionManagerSet();
try {
final Document doc = reader.read(new StringReader(profile));
set.setId(doc.valueOf("//SET/@id").trim());
set.setName(doc.valueOf("//SET").trim());
set.setImpact(ImpactTypes.valueOf(doc.valueOf("//IMPACT").trim()));
set.setLatest(doc.valueOf("//RAW_SETS/LATEST/@id"), doc.valueOf("//RAW_SETS/LATEST/@creationDate"), doc.valueOf("//RAW_SETS/LATEST/@lastUpdate"));
set.setDirectory(doc.valueOf("//SET/@directory"));
final List expiredNodes = doc.selectNodes("//RAW_SETS/EXPIRED");
if (expiredNodes != null) {
for (int i = 0; i < expiredNodes.size(); i++) {
Element ex = (Element) expiredNodes.get(i);
set.addExpired(ex.attributeValue("id"), ex.attributeValue("creationDate"), ex.attributeValue("lastUpdate"));
}
}
final StringBuilder sb = new StringBuilder();
sb.append(basePath);
sb.append("/");
sb.append(doc.valueOf("//SET/@directory"));
sb.append("/");
sb.append(doc.valueOf("//RAW_SETS/LATEST/@id"));
set.setPathToLatest(sb.toString());
return set;
} catch (Exception e) {
throw new ActionManagerException("Error creating set from profile: " + profile, e);
}
}
private String getBasePathHDFS(ISLookUpService isLookup) throws ActionManagerException {
return queryServiceProperty(isLookup, "basePath");
}
private String queryServiceProperty(ISLookUpService isLookup, final String propertyName) throws ActionManagerException {
final String q = "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ActionManagerServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
+ propertyName + "']/@value/string()";
log.debug("quering for service property: " + q);
try {
final List<String> value = isLookup.quickSearchProfile(q);
return Iterables.getOnlyElement(value);
} catch (ISLookUpException e) {
String msg = "Error accessing service profile, using query: " + q;
log.error(msg, e);
throw new ActionManagerException(msg, e);
} catch (NoSuchElementException e) {
String msg = "missing service property: " + propertyName;
log.error(msg, e);
throw new ActionManagerException(msg, e);
} catch (IllegalArgumentException e) {
String msg = "found more than one service property: " + propertyName;
log.error(msg, e);
throw new ActionManagerException(msg, e);
}
}
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.actionmanager.partition;
import eu.dnetlib.dhp.actionmanager.ISClient;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -40,7 +41,14 @@ public class PartitionActionSetsByPayloadTypeJob {
StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())
));
private static final String INPUT_ACTION_SET_PATHS_SEPARATOR = ",";
private ISClient isClient;
public PartitionActionSetsByPayloadTypeJob(String isLookupUrl) {
this.isClient = new ISClient(isLookupUrl);
}
public PartitionActionSetsByPayloadTypeJob() {
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(
@ -55,21 +63,30 @@ public class PartitionActionSetsByPayloadTypeJob {
.orElse(Boolean.TRUE);
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputActionSetPaths = parser.get("inputActionSetPaths");
logger.info("inputActionSetPaths: {}", inputActionSetPaths);
String inputActionSetIds = parser.get("inputActionSetIds");
logger.info("inputActionSetIds: {}", inputActionSetIds);
String outputPath = parser.get("outputPath");
logger.info("outputPath: {}", outputPath);
String isLookupUrl = parser.get("isLookupUrl");
logger.info("isLookupUrl: {}", isLookupUrl);
new PartitionActionSetsByPayloadTypeJob(isLookupUrl).run(isSparkSessionManaged, inputActionSetIds, outputPath);
}
protected void run(Boolean isSparkSessionManaged, String inputActionSetIds, String outputPath) {
List<String> inputActionSetPaths = getIsClient().getLatestRawsetPaths(inputActionSetIds);
logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths));
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
readAndWriteActionSetsFromPaths(spark,
Arrays.asList(inputActionSetPaths.split(INPUT_ACTION_SET_PATHS_SEPARATOR)),
outputPath);
readAndWriteActionSetsFromPaths(spark, inputActionSetPaths, outputPath);
});
}
@ -92,21 +109,15 @@ public class PartitionActionSetsByPayloadTypeJob {
String path) {
logger.info("Reading actions from path: {}", path);
List<String> files = HdfsSupport.listFiles(path, spark.sparkContext().hadoopConfiguration());
logger.info("Found files: {}", String.join(",", files));
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return files
.stream()
.map(file -> {
JavaRDD<Row> rdd = sc
.sequenceFile(file, Text.class, Text.class)
.sequenceFile(path, Text.class, Text.class)
.map(x -> RowFactory.create(x._1().toString(), x._2().toString()));
return spark.createDataFrame(rdd, KV_SCHEMA)
.withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA))
.select(expr("atomic_action.*"));
})
.reduce(spark.createDataFrame(Collections.emptyList(), ATOMIC_ACTION_SCHEMA), Dataset::union);
}
private static void saveActions(Dataset<Row> actionDS,
@ -118,4 +129,12 @@ public class PartitionActionSetsByPayloadTypeJob {
.mode(SaveMode.Append)
.parquet(path);
}
public ISClient getIsClient() {
return isClient;
}
public void setIsClient(ISClient isClient) {
this.isClient = isClient;
}
}

View File

@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -119,10 +120,17 @@ public class PromoteActionPayloadForGraphTableJob {
String path,
Class<G> rowClazz) {
logger.info("Reading graph table from path: {}", path);
return spark
.read()
return spark.read()
.textFile(path)
.map((MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz));
/*
return spark
.read()
.parquet(path)
.as(Encoders.bean(rowClazz));
*/
}
private static <A extends Oaf> Dataset<A> readActionPayload(SparkSession spark,

View File

@ -6,9 +6,9 @@
"paramRequired": false
},
{
"paramName": "iasp",
"paramLongName": "inputActionSetPaths",
"paramDescription": "comma separated list of action sets to partition by payload type",
"paramName": "iasi",
"paramLongName": "inputActionSetIds",
"paramDescription": "comma separated list of action set ids to partition by payload type",
"paramRequired": true
},
{
@ -16,5 +16,11 @@
"paramLongName": "outputPath",
"paramDescription": "root output location for partitioned action sets",
"paramRequired": true
},
{
"paramName": "is",
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
}
]

View File

@ -118,6 +118,9 @@
<action name="SkipPromoteDatasetActionPayloadForDatasetTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${workingDir}/dataset"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/dataset</arg>
<arg>${workingDir}/dataset</arg>
@ -166,6 +169,9 @@
<action name="SkipPromoteResultActionPayloadForDatasetTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/dataset"/>
</prepare>
<arg>-pb</arg>
<arg>${workingDir}/dataset</arg>
<arg>${outputGraphRootPath}/dataset</arg>

View File

@ -113,6 +113,9 @@
<action name="SkipPromoteDatasourceActionPayloadForDatasourceTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/datasource"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/datasource</arg>
<arg>${outputGraphRootPath}/datasource</arg>

View File

@ -41,8 +41,12 @@
<description>root location of input materialized graph</description>
</property>
<property>
<name>inputActionSetPaths</name>
<description>comma separated list of action sets to promote</description>
<name>isLookupUrl</name>
<description>URL of the ISLookupService</description>
</property>
<property>
<name>inputActionSetIds</name>
<description>comma separated list of action set ids to promote</description>
</property>
<property>
<name>outputGraphRootPath</name>
@ -121,8 +125,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--inputActionSetPaths</arg><arg>${inputActionSetPaths}</arg>
<arg>--inputActionSetIds</arg><arg>${inputActionSetIds}</arg>
<arg>--outputPath</arg><arg>${workingDir}/action_payload_by_type</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="ForkPromote"/>
<error to="Kill"/>

View File

@ -113,6 +113,9 @@
<action name="SkipPromoteOrganizationActionPayloadForOrganizationTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/organization"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/organization</arg>
<arg>${outputGraphRootPath}/organization</arg>

View File

@ -117,6 +117,9 @@
<action name="SkipPromoteOtherResearchProductActionPayloadForOtherResearchProductTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${workingDir}/otherresearchproduct"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>${workingDir}/otherresearchproduct</arg>
@ -165,6 +168,9 @@
<action name="SkipPromoteResultActionPayloadForOtherResearchProductTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/otherresearchproduct"/>
</prepare>
<arg>-pb</arg>
<arg>${workingDir}/otherresearchproduct</arg>
<arg>${outputGraphRootPath}/otherresearchproduct</arg>

View File

@ -113,6 +113,9 @@
<action name="SkipPromoteProjectActionPayloadForProjectTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/project"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/project</arg>
<arg>${outputGraphRootPath}/project</arg>

View File

@ -118,6 +118,9 @@
<action name="SkipPromotePublicationActionPayloadForPublicationTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${workingDir}/publication"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/publication</arg>
<arg>${workingDir}/publication</arg>
@ -166,6 +169,9 @@
<action name="SkipPromoteResultActionPayloadForPublicationTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/publication"/>
</prepare>
<arg>-pb</arg>
<arg>${workingDir}/publication</arg>
<arg>${outputGraphRootPath}/publication</arg>

View File

@ -114,6 +114,9 @@
<action name="SkipPromoteRelationActionPayloadForRelationTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/relation"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/relation</arg>
<arg>${outputGraphRootPath}/relation</arg>

View File

@ -117,6 +117,9 @@
<action name="SkipPromoteSoftwareActionPayloadForSoftwareTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${workingDir}/software"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/software</arg>
<arg>${workingDir}/software</arg>
@ -165,6 +168,9 @@
<action name="SkipPromoteResultActionPayloadForSoftwareTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/software"/>
</prepare>
<arg>-pb</arg>
<arg>${workingDir}/software</arg>
<arg>${outputGraphRootPath}/software</arg>

View File

@ -1,6 +1,8 @@
package eu.dnetlib.dhp.actionmanager.partition;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.actionmanager.ISClient;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.hadoop.conf.Configuration;
@ -15,7 +17,11 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import scala.Tuple2;
import scala.collection.mutable.Seq;
@ -31,6 +37,7 @@ import static org.apache.spark.sql.functions.*;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static scala.collection.JavaConversions.mutableSeqAsJavaList;
@ExtendWith(MockitoExtension.class)
public class PartitionActionSetsByPayloadTypeJobTest {
private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
@ -64,20 +71,29 @@ public class PartitionActionSetsByPayloadTypeJobTest {
@Nested
class Main {
@Mock
private ISClient isClient;
@Test
public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
// given
Path inputActionSetsDir = workingDir.resolve("input").resolve("action_sets");
Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets");
Path outputDir = workingDir.resolve("output");
Map<String, List<String>> oafsByClassName = createActionSets(inputActionSetsDir);
Map<String, List<String>> oafsByClassName = createActionSets(inputActionSetsBaseDir);
List<String> inputActionSetsPaths = resolveInputActionSetPaths(inputActionSetsBaseDir);
// when
PartitionActionSetsByPayloadTypeJob.main(new String[]{
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputActionSetPaths", inputActionSetsDir.toString(),
"-outputPath", outputDir.toString()
});
Mockito.when(isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(inputActionSetsPaths);
PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob();
job.setIsClient(isClient);
job.run(
Boolean.FALSE,
"", // it can be empty we're mocking the response from isClient to resolve the paths
outputDir.toString()
);
// then
Files.exists(outputDir);
@ -94,10 +110,19 @@ public class PartitionActionSetsByPayloadTypeJobTest {
}
}
private List<String> resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException {
Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
return Files
.list(inputActionSetJsonDumpsDir)
.map(path -> {
String inputActionSetId = path.getFileName().toString();
return inputActionSetsBaseDir.resolve(inputActionSetId).toString();
})
.collect(Collectors.toCollection(ArrayList::new));
}
private static Map<String, List<String>> createActionSets(Path inputActionSetsDir) throws IOException {
Path inputActionSetJsonDumpsDir = Paths
.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
.getFile());
Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
Map<String, List<String>> oafsByType = new HashMap<>();
Files
@ -138,6 +163,12 @@ public class PartitionActionSetsByPayloadTypeJobTest {
return oafsByType;
}
private static Path getInputActionSetJsonDumpsDir() {
return Paths
.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))
.getFile());
}
private static Dataset<String> readActionsFromJsonDump(String path) {
return spark
.read()

View File

@ -100,23 +100,11 @@
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.10</version>
</dependency>
</dependencies>
</project>

View File

@ -1,71 +0,0 @@
package eu.dnetlib.dhp.migration.step3;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
public class DispatchEntitiesApplication {
private static final Log log = LogFactory.getLog(DispatchEntitiesApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json")));
parser.parseArgument(args);
try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
final String sourcePath = parser.get("sourcePath");
final String targetPath = parser.get("graphRawPath");
processEntity(sc, Publication.class, sourcePath, targetPath);
processEntity(sc, Dataset.class, sourcePath, targetPath);
processEntity(sc, Software.class, sourcePath, targetPath);
processEntity(sc, OtherResearchProduct.class, sourcePath, targetPath);
processEntity(sc, Datasource.class, sourcePath, targetPath);
processEntity(sc, Organization.class, sourcePath, targetPath);
processEntity(sc, Project.class, sourcePath, targetPath);
processEntity(sc, Relation.class, sourcePath, targetPath);
}
}
private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
return SparkSession
.builder()
.appName(DispatchEntitiesApplication.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
}
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final String sourcePath, final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase();
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
sc.textFile(sourcePath)
.filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|"))
.saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
}
private static boolean isEntityType(final String line, final String type) {
return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type);
}
}

View File

@ -1,197 +0,0 @@
<workflow-app name="import regular entities as Graph (all steps)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<value>/tmp/dhp_migration</value>
<description>the base path to store temporary intermediate data</description>
</property>
<property>
<name>graphBasePath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>reuseContent</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="ReuseContent"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="ReuseContent">
<switch>
<case to="ResetWorkingPath">${wf:conf('reuseContent') eq false}</case>
<case to="ResetAllEntitiesPath">${wf:conf('reuseContent') eq true}</case>
<default to="ResetWorkingPath"/>
</switch>
</decision>
<action name="ResetWorkingPath">
<fs>
<delete path="${workingPath}"/>
<mkdir path="${workingPath}"/>
</fs>
<ok to="ImportDB"/>
<error to="Kill"/>
</action>
<action name="ImportDB">
<java>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${workingPath}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>
</action>
<action name="ImportODF">
<java>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/odf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="ImportOAF"/>
<error to="Kill"/>
</action>
<action name="ImportOAF">
<java>
<main-class>eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${workingPath}/oaf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="ResetAllEntitiesPath"/>
<error to="Kill"/>
</action>
<action name="ResetAllEntitiesPath">
<fs>
<delete path="${workingPath}/all_entities"/>
</fs>
<ok to="GenerateEntities"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${workingPath}/db_records,${workingPath}/oaf_records,${workingPath}/odf_records</arg>
<arg>-t</arg><arg>${workingPath}/all_entities</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</spark>
<ok to="ResetGraphPath"/>
<error to="Kill"/>
</action>
<action name="ResetGraphPath">
<fs>
<delete path="${graphBasePath}/graph_raw"/>
<mkdir path="${graphBasePath}/graph_raw"/>
</fs>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${workingPath}/all_entities</arg>
<arg>-g</arg><arg>${graphBasePath}/graph_raw</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -40,6 +40,7 @@
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
@ -48,6 +49,18 @@
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies>

View File

@ -1,45 +1,15 @@
package eu.dnetlib.dhp.migration.step2;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.keyValue;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
package eu.dnetlib.dhp.oa.graph.raw;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import java.util.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
public abstract class AbstractMdRecordToOafMapper {

View File

@ -0,0 +1,94 @@
package eu.dnetlib.dhp.oa.graph.raw;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class DispatchEntitiesApplication {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dispatch_entities_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String sourcePath = parser.get("sourcePath");
final String targetPath = parser.get("graphRawPath");
SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, targetPath);
processEntity(spark, Publication.class, sourcePath, targetPath);
processEntity(spark, Dataset.class, sourcePath, targetPath);
processEntity(spark, Software.class, sourcePath, targetPath);
processEntity(spark, OtherResearchProduct.class, sourcePath, targetPath);
processEntity(spark, Datasource.class, sourcePath, targetPath);
processEntity(spark, Organization.class, sourcePath, targetPath);
processEntity(spark, Project.class, sourcePath, targetPath);
processEntity(spark, Relation.class, sourcePath, targetPath);
});
}
private static <T extends Oaf> void processEntity(final SparkSession spark, final Class<T> clazz, final String sourcePath, final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase();
log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
/*
spark.read()
.textFile(sourcePath)
.filter((FilterFunction<String>) value -> isEntityType(value, type))
.map((MapFunction<String, String>) value -> StringUtils.substringAfter(value, "|"), Encoders.STRING())
.map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.parquet(targetPath + "/" + type);
*/
JavaSparkContext.fromSparkContext(spark.sparkContext())
.textFile(sourcePath)
.filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|"))
.saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
}
private static boolean isEntityType(final String line, final String type) {
return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -1,13 +1,10 @@
package eu.dnetlib.dhp.migration.step2;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
package eu.dnetlib.dhp.oa.graph.raw;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -16,37 +13,38 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.migration.utils.DbClient;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class GenerateEntitiesApplication {
private static final Log log = LogFactory.getLog(GenerateEntitiesApplication.class);
private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/migration/generate_entities_parameters.json")));
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String sourcePaths = parser.get("sourcePaths");
final String targetPath = parser.get("targetPath");
@ -56,31 +54,27 @@ public class GenerateEntitiesApplication {
final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword);
try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
generateEntities(sc, code2name, existingSourcePaths, targetPath);
}
SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, targetPath);
generateEntities(spark, code2name, sourcePaths, targetPath);
});
}
private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
return SparkSession
.builder()
.appName(GenerateEntitiesApplication.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
}
private static void generateEntities(final JavaSparkContext sc,
private static void generateEntities(final SparkSession spark,
final Map<String, String> code2name,
final List<String> sourcePaths,
final String sourcePaths,
final String targetPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
log.info("Generate entities from files:");
sourcePaths.forEach(log::info);
existingSourcePaths.forEach(log::info);
JavaRDD<String> inputRdd = sc.emptyRDD();
for (final String sp : sourcePaths) {
for (final String sp : existingSourcePaths) {
inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> convertToListOaf(k._1(), k._2(), code2name))
@ -88,7 +82,8 @@ public class GenerateEntitiesApplication {
.map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf)));
}
inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
inputRdd
.saveAsTextFile(targetPath, GzipCodec.class);
}
@ -163,11 +158,15 @@ public class GenerateEntitiesApplication {
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
try {
final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration());
final Path path = new Path(pathToFile);
return hdfs.exists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,159 @@
package eu.dnetlib.dhp.oa.graph.raw;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
public class MergeClaimsApplication {
private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String rawGraphPath = parser.get("rawGraphPath");
log.info("rawGraphPath: {}", rawGraphPath);
final String claimsGraphPath = parser.get("claimsGraphPath");
log.info("claimsGraphPath: {}", claimsGraphPath);
final String outputRawGaphPath = parser.get("outputRawGaphPath");
log.info("outputRawGaphPath: {}", outputRawGaphPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Oaf> clazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
String type = clazz.getSimpleName().toLowerCase();
String rawPath = rawGraphPath + "/" + type;
String claimPath = claimsGraphPath + "/" + type;
String outPath = outputRawGaphPath + "/" + type;
removeOutputDir(spark, outPath);
mergeByType(spark, rawPath, claimPath, outPath, clazz);
});
}
private static <T extends Oaf> void mergeByType(SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) {
Dataset<Tuple2<String, T>> raw = readFromPath(spark, rawPath, clazz)
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Tuple2<String, T>> claim = jsc.broadcast(readFromPath(spark, claimPath, clazz))
.getValue()
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
/*
Dataset<Tuple2<String, T>> claim = readFromPath(spark, claimPath, clazz)
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
*/
raw.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
.map((MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>) value -> {
Optional<Tuple2<String, T>> opRaw = Optional.ofNullable(value._1());
Optional<Tuple2<String, T>> opClaim = Optional.ofNullable(value._2());
return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null;
}, Encoders.bean(clazz))
.filter(Objects::nonNull)
.map((MapFunction<T, String>) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outPath);
}
private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) {
return spark.read()
.textFile(path)
.map((MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
/*
return spark.read()
.load(path)
.as(Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
*/
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(source -> Optional.ofNullable(r.getTarget())
.map(target -> Optional.ofNullable(r.getRelType())
.map(relType -> Optional.ofNullable(r.getSubRelType())
.map(subRelType -> Optional.ofNullable(r.getRelClass())
.map(relClass -> String.join(source, target, relType, subRelType, relClass))
.orElse(String.join(source, target, relType, subRelType))
)
.orElse(String.join(source, target, relType))
)
.orElse(String.join(source, target))
)
.orElse(source)
)
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
}

View File

@ -1,14 +1,13 @@
package eu.dnetlib.dhp.migration.step1;
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.asString;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Closeable;
import java.io.IOException;
@ -22,31 +21,7 @@ import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication;
import eu.dnetlib.dhp.migration.utils.DbClient;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable {
@ -61,7 +36,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json")));
IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json")));
parser.parseArgument(args);
@ -111,7 +86,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer) throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/migration/sql/" + sqlFile));
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));

View File

@ -1,18 +1,17 @@
package eu.dnetlib.dhp.migration.step1;
package eu.dnetlib.dhp.oa.graph.raw;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.MdstoreClient;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication;
import eu.dnetlib.dhp.migration.utils.MdstoreClient;
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {
private static final Log log = LogFactory.getLog(MigrateMongoMdstoresApplication.class);
@ -21,7 +20,7 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json")));
IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json")));
parser.parseArgument(args);
final String mongoBaseUrl = parser.get("mongoBaseUrl");

View File

@ -1,27 +1,17 @@
package eu.dnetlib.dhp.migration.step2;
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
import eu.dnetlib.dhp.oa.graph.raw.common.PacePerson;
import eu.dnetlib.dhp.schema.oaf.*;
import org.dom4j.Document;
import org.dom4j.Node;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.dom4j.Document;
import org.dom4j.Node;
import eu.dnetlib.dhp.migration.utils.PacePerson;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
public class OafToOafMapper extends AbstractMdRecordToOafMapper {

View File

@ -1,28 +1,16 @@
package eu.dnetlib.dhp.migration.step2;
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Node;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Node;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
public class OdfToOafMapper extends AbstractMdRecordToOafMapper {

View File

@ -1,9 +1,6 @@
package eu.dnetlib.dhp.migration.utils;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
package eu.dnetlib.dhp.oa.graph.raw.common;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -12,7 +9,9 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.codehaus.jackson.map.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
public class AbstractMigrationApplication implements Closeable {

View File

@ -1,18 +1,14 @@
package eu.dnetlib.dhp.migration.utils;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.function.Consumer;
package eu.dnetlib.dhp.oa.graph.raw.common;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Closeable;
import java.io.IOException;
import java.sql.*;
import java.util.function.Consumer;
public class DbClient implements Closeable {
private static final Log log = LogFactory.getLog(DbClient.class);

View File

@ -1,4 +1,14 @@
package eu.dnetlib.dhp.migration.utils;
package eu.dnetlib.dhp.oa.graph.raw.common;
import com.google.common.collect.Iterables;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document;
import java.io.Closeable;
import java.io.IOException;
@ -7,17 +17,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document;
import com.google.common.collect.Iterables;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public class MdstoreClient implements Closeable {
private final MongoClient client;

View File

@ -1,4 +1,8 @@
package eu.dnetlib.dhp.migration.utils;
package eu.dnetlib.dhp.oa.graph.raw.common;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -6,19 +10,6 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {
public static KeyValue keyValue(final String k, final String v) {

View File

@ -1,19 +1,18 @@
package eu.dnetlib.dhp.migration.utils;
import java.nio.charset.Charset;
import java.text.Normalizer;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.text.WordUtils;
package eu.dnetlib.dhp.oa.graph.raw.common;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.text.WordUtils;
import java.nio.charset.Charset;
import java.text.Normalizer;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class PacePerson {
@ -105,7 +104,7 @@ public class PacePerson {
private List<String> splitTerms(final String s) {
if (particles == null) {
particles = loadFromClasspath("/eu/dnetlib/dhp/migration/pace/name_particles.txt");
particles = loadFromClasspath("/eu/dnetlib/dhp/oa/graph/pace/name_particles.txt");
}
final List<String> list = Lists.newArrayList();

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.sx.graph;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
@ -49,15 +50,15 @@ public class SparkSXGeneratePidSimlarity {
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct();
JavaRDD<Relation> simRel = datasetSimRel.union(publicationSimRel).map(s -> {
final Relation r = new Relation();
JavaRDD<DLIRelation> simRel = datasetSimRel.union(publicationSimRel).map(s -> {
final DLIRelation r = new DLIRelation();
r.setSource(s._1());
r.setTarget(s._2());
r.setRelType("similar");
return r;
}
);
spark.createDataset(simRel.rdd(), Encoders.bean(Relation.class)).distinct().write()
spark.createDataset(simRel.rdd(), Encoders.bean(DLIRelation.class)).distinct().write()
.mode(SaveMode.Overwrite).save(targetPath+"/pid_simRel");
}
}

View File

@ -7,6 +7,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.utils.DHPUtils;
import net.minidev.json.JSONArray;
@ -135,19 +136,19 @@ public class SparkScholexplorerCreateRawGraphJob {
SparkSXGeneratePidSimlarity.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") );
RDD<Relation> rdd = union.mapToPair((PairFunction<String, String, Relation>) f -> {
RDD<DLIRelation> rdd = union.mapToPair((PairFunction<String, String, DLIRelation>) f -> {
final String source = getJPathString(SOURCEJSONPATH, f);
final String target = getJPathString(TARGETJSONPATH, f);
final String reltype = getJPathString(RELJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, Relation.class));
return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, DLIRelation.class));
}).reduceByKey((a, b) -> {
a.mergeFrom(b);
return a;
}).map(Tuple2::_2).rdd();
spark.createDataset(rdd, Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).save(targetPath);
spark.createDataset(rdd, Encoders.bean(DLIRelation.class)).write().mode(SaveMode.Overwrite).save(targetPath);
Dataset<Relation> rel_ds =spark.read().load(targetPath).as(Encoders.bean(Relation.class));
System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel");

View File

@ -2,10 +2,13 @@ package eu.dnetlib.dhp.sx.graph.parser;
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.scholexplorer.relation.RelInfo;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -15,6 +18,7 @@ import javax.xml.stream.XMLStreamReader;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public abstract class AbstractScholexplorerParser {
@ -104,6 +108,74 @@ public abstract class AbstractScholexplorerParser {
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
}
protected DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di, final String dateOfCollection) {
final DLIUnknown uk = new DLIUnknown();
uk.setId(generateId(pid, pidType, "unknown"));
ProvenaceInfo pi = new ProvenaceInfo();
pi.setId(cf.getKey());
pi.setName(cf.getValue());
pi.setCompletionStatus("incomplete");
uk.setDataInfo(di);
uk.setDlicollectedfrom(Collections.singletonList(pi));
final StructuredProperty sourcePid = new StructuredProperty();
sourcePid.setValue(pid);
final Qualifier pt = new Qualifier();
pt.setClassname(pidType);
pt.setClassid(pidType);
pt.setSchemename("dnet:pid_types");
pt.setSchemeid("dnet:pid_types");
sourcePid.setQualifier(pt);
uk.setPid(Collections.singletonList(sourcePid));
uk.setDateofcollection(dateOfCollection);
return uk;
}
protected void generateRelations(RelationMapper relationMapper, Result parsedObject, List<Oaf> result, DataInfo di, String dateOfCollection, List<VtdUtilityParser.Node> relatedIdentifiers) {
if(relatedIdentifiers!= null) {
result.addAll(relatedIdentifiers.stream()
.flatMap(n -> {
final List<DLIRelation> rels = new ArrayList<>();
DLIRelation r = new DLIRelation();
r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation;
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
r.setDateOfCollection(dateOfCollection);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
inverseRelation = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setDataInfo(di);
rels.add(r);
r = new DLIRelation();
r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setDateOfCollection(dateOfCollection);
rels.add(r);
if("unknown".equalsIgnoreCase(relatedType))
result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di, dateOfCollection));
return rels.stream();
}).collect(Collectors.toList()));
}
}

View File

@ -42,7 +42,8 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']")));
parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"));
parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"));
String dateOfCollection = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']");
parsedObject.setDateofcollection(dateOfCollection);
final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']");
@ -123,7 +124,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
List<String> descs = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='description']");
if (descs != null && descs.size() > 0)
parsedObject.setDescription(descs.stream()
.map(it -> it.length() < 512 ? it : it.substring(0, 512))
.map(it -> it.length() < 10000 ? it : it.substring(0, 10000))
.map(it -> {
final Field<String> d = new Field<>();
d.setValue(it);
@ -137,48 +138,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType"));
if(relatedIdentifiers!= null) {
result.addAll(relatedIdentifiers.stream()
.flatMap(n -> {
final List<Relation> rels = new ArrayList<>();
Relation r = new Relation();
r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation = n.getAttributes().get("inverseRelationType");
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
inverseRelation = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setDataInfo(di);
rels.add(r);
r = new Relation();
r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
rels.add(r);
if("unknown".equalsIgnoreCase(relatedType))
result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di));
return rels.stream();
}).collect(Collectors.toList()));
}
generateRelations(relationMapper, parsedObject, result, di, dateOfCollection, relatedIdentifiers);
final List<Node> hostedBy =
@ -199,7 +159,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
}
List<StructuredProperty> subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Arrays.asList("subjectScheme")));
List<StructuredProperty> subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Collections.singletonList("subjectScheme")));
parsedObject.setSubject(subjects);
@ -265,24 +225,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
}
private DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di) {
final DLIUnknown uk = new DLIUnknown();
uk.setId(generateId(pid, pidType, "unknown"));
ProvenaceInfo pi = new ProvenaceInfo();
pi.setId(cf.getKey());
pi.setName(cf.getValue());
pi.setCompletionStatus("incomplete");
uk.setDataInfo(di);
uk.setDlicollectedfrom(Collections.singletonList(pi));
final StructuredProperty sourcePid = new StructuredProperty();
sourcePid.setValue(pid);
final Qualifier pt = new Qualifier();
pt.setClassname(pidType);
pt.setClassid(pidType);
pt.setSchemename("dnet:pid_types");
pt.setSchemeid("dnet:pid_types");
sourcePid.setQualifier(pt);
uk.setPid(Collections.singletonList(sourcePid));
return uk;
}
}

View File

@ -38,7 +38,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
di.setDeletedbyinference(false);
di.setInvisible(false);
parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"));
String dateOfCollection = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']");
parsedObject.setDateofcollection(dateOfCollection);
final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']");
parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']")));
@ -118,48 +119,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
final List<Node> relatedIdentifiers =
VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='relatedIdentifier']",
Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType"));
if (relatedIdentifiers != null) {
result.addAll(relatedIdentifiers.stream()
.flatMap(n -> {
final List<Relation> rels = new ArrayList<>();
Relation r = new Relation();
r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation = "Unknown";
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setRelClass("datacite");
r.setDataInfo(di);
rels.add(r);
r = new Relation();
r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
rels.add(r);
return rels.stream();
}).collect(Collectors.toList()));
}
generateRelations(relationMapper, parsedObject, result, di, dateOfCollection, relatedIdentifiers);
final List<Node> hostedBy =
VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='hostedBy']", Arrays.asList("id", "name"));
@ -206,8 +166,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
description.setValue(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='description']"));
if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 512) {
description.setValue(description.getValue().substring(0, 512));
if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 10000) {
description.setValue(description.getValue().substring(0, 10000));
}
parsedObject.setDescription(Collections.singletonList(description));

View File

@ -1,16 +1,16 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "g",
"paramLongName": "graphRawPath",

View File

@ -1,16 +1,16 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "s",
"paramLongName": "sourcePaths",
"paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
"paramRequired": true
},
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetPath",

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "rgp",
"paramLongName": "rawGraphPath",
"paramDescription": "the raw graph path",
"paramRequired": true
},
{
"paramName": "cgp",
"paramLongName": "claimsGraphPath",
"paramDescription": "the path of the claims graph",
"paramRequired": true
},
{
"paramName": "ogp",
"paramLongName": "outputRawGaphPath",
"paramDescription": "the path of output graph, combining raw and claims",
"paramRequired": true
},
{
"paramName": "clazz",
"paramLongName": "graphTableClassName",
"paramDescription": "class name associated to the input entity path",
"paramRequired": true
}
]

View File

@ -0,0 +1,10 @@
[
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true},
{"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true},
{"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true},
{"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true},
{"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true},
{"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true}
]

View File

@ -0,0 +1,542 @@
<workflow-app name="create RAW Graph (all steps)" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphOutputPath</name>
<description>the target path to store raw graph</description>
</property>
<property>
<name>reuseContent</name>
<value>false</value>
<description>should import content from the aggregator or reuse a previous version</description>
</property>
<property>
<name>contentPath</name>
<description>path location to store (or reuse) content from the aggregator</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>mongoURL</name>
<description>mongoDB url, example: mongodb://[username:password@]host[:port]</description>
</property>
<property>
<name>mongoDb</name>
<description>mongo database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reuse_aggregator_content"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="reuse_aggregator_content">
<switch>
<case to="start_import">${wf:conf('reuseContent') eq false}</case>
<case to="fork_generate_entities">${wf:conf('reuseContent') eq true}</case>
<default to="start_import"/>
</switch>
</decision>
<fork name="start_import">
<path start="ImportDB"/>
<path start="ImportDB_claims"/>
</fork>
<action name="ImportDB_claims">
<java>
<prepare>
<delete path="${contentPath}/db_claims"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${contentPath}/db_claims</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
<arg>-a</arg><arg>claims</arg>
</java>
<ok to="ImportODF_claims"/>
<error to="Kill"/>
</action>
<action name="ImportODF_claims">
<java>
<prepare>
<delete path="${contentPath}/odf_claims"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${contentPath}/odf_claims</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>claim</arg>
</java>
<ok to="ImportOAF_claims"/>
<error to="Kill"/>
</action>
<action name="ImportOAF_claims">
<java>
<prepare>
<delete path="${contentPath}/oaf_claims"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${contentPath}/oaf_claims</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>claim</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<action name="ImportDB">
<java>
<prepare>
<delete path="${contentPath}/db_records"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication</main-class>
<arg>-p</arg><arg>${contentPath}/db_records</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</java>
<ok to="ImportODF"/>
<error to="Kill"/>
</action>
<action name="ImportODF">
<java>
<prepare>
<delete path="${contentPath}/odf_records"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${contentPath}/odf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>ODF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="ImportOAF"/>
<error to="Kill"/>
</action>
<action name="ImportOAF">
<java>
<prepare>
<delete path="${contentPath}/oaf_records"/>
</prepare>
<main-class>eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication</main-class>
<arg>-p</arg><arg>${contentPath}/oaf_records</arg>
<arg>-mongourl</arg><arg>${mongoURL}</arg>
<arg>-mongodb</arg><arg>${mongoDb}</arg>
<arg>-f</arg><arg>OAF</arg>
<arg>-l</arg><arg>store</arg>
<arg>-i</arg><arg>cleaned</arg>
</java>
<ok to="wait_import"/>
<error to="Kill"/>
</action>
<join name="wait_import" to="fork_generate_entities"/>
<fork name="fork_generate_entities">
<path start="GenerateEntities_claim"/>
<path start="GenerateEntities"/>
</fork>
<action name="GenerateEntities_claim">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities_claim</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-s</arg><arg>${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims</arg>
<arg>-t</arg><arg>${workingDir}/entities_claim</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</spark>
<ok to="GenerateGraph_claims"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph_claims">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph_claims</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-s</arg><arg>${workingDir}/entities_claim</arg>
<arg>-g</arg><arg>${workingDir}/graph_claims</arg>
</spark>
<ok to="wait_graphs"/>
<error to="Kill"/>
</action>
<action name="GenerateEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEntities</name>
<class>eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-s</arg><arg>${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records</arg>
<arg>-t</arg><arg>${workingDir}/entities</arg>
<arg>-pgurl</arg><arg>${postgresURL}</arg>
<arg>-pguser</arg><arg>${postgresUser}</arg>
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
</spark>
<ok to="GenerateGraph"/>
<error to="Kill"/>
</action>
<action name="GenerateGraph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateGraph</name>
<class>eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>-s</arg><arg>${workingDir}/entities</arg>
<arg>-g</arg><arg>${workingDir}/graph_raw</arg>
</spark>
<ok to="wait_graphs"/>
<error to="Kill"/>
</action>
<join name="wait_graphs" to="fork_merge_claims"/>
<fork name="fork_merge_claims">
<path start="merge_claims_publication"/>
<path start="merge_claims_dataset"/>
<path start="merge_claims_software"/>
<path start="merge_claims_otherresearchproduct"/>
<path start="merge_claims_datasource"/>
<path start="merge_claims_organization"/>
<path start="merge_claims_project"/>
<path start="merge_claims_relation"/>
</fork>
<action name="merge_claims_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_publication</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_dataset</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_relation</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_software</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=1920
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=1920
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_datasource</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_organization</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<action name="merge_claims_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>MergeClaims_project</name>
<class>eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=200
</spark-opts>
<arg>--rawGraphPath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--claimsGraphPath</arg><arg>${workingDir}/graph_claims</arg>
<arg>--outputRawGaphPath</arg><arg>${graphOutputPath}</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_merge"/>
<error to="Kill"/>
</action>
<join name="wait_merge" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -8,6 +8,9 @@ SELECT
WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1'])
THEN
'openaire-cris_1.1@@@OpenAIRE CRIS v1.1@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire4.0'])
THEN
'openaire4.0@@@OpenAIRE 4.0@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0'])
THEN
'driver-openaire2.0@@@OpenAIRE 2.0+ (DRIVER OA, EC funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'

View File

@ -0,0 +1,20 @@
[
{
"paramName": "mt",
"paramLongName": "master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{
"paramName": "is",
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "inputPaths",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
}
]

View File

@ -1,34 +1,25 @@
package eu.dnetlib.dhp.oa.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
public class GraphHiveImporterJobTest {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = GraphHiveImporterJobTest.class.getClassLoader();
public static final String JDBC_DERBY_TEMPLATE = "jdbc:derby:;databaseName=%s/junit_metastore_db;create=true";
private static SparkSession spark;

View File

@ -1,27 +1,24 @@
package eu.dnetlib.dhp.migration.step2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.Map;
package eu.dnetlib.dhp.oa.graph.raw;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class MappersTest {
@ -30,7 +27,7 @@ public class MappersTest {
private Map<String, String> code2name;
@BeforeEach
void setUp() throws Exception {
public void setUp() throws Exception {
when(code2name.get(anyString())).thenAnswer(invocation -> invocation.getArgument(0));
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.migration.step1;
package eu.dnetlib.dhp.oa.graph.raw;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -69,6 +69,11 @@
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.provision.scholix;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -20,10 +19,6 @@ public class ScholixResource implements Serializable {
private List<ScholixEntityId> publisher;
private List<ScholixCollectedFrom> collectedFrom;
public static ScholixResource fromSummary(ScholixSummary summary) {
final ScholixResource resource = new ScholixResource();
@ -65,6 +60,7 @@ public class ScholixResource implements Serializable {
}
public List<ScholixIdentifier> getIdentifier() {
return identifier;
}

View File

@ -34,8 +34,13 @@ public class Datacite2Scholix {
ScholixResource resource = generateDataciteScholixResource(dJson);
return relIds.stream().flatMap(s-> {
final List<Scholix> result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated);
try {
final List<Scholix> result = generateScholix(resource, ""+s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated);
return result.stream();
} catch (Throwable e)
{
return new ArrayList<Scholix>().stream();
}
}).collect(Collectors.toList());
}
@ -48,6 +53,7 @@ public class Datacite2Scholix {
}
private List<Scholix> generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) {
if ("doi".equalsIgnoreCase(pidtype)) {
ScholixResource target = new ScholixResource();
target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype)));
@ -165,7 +171,7 @@ public class Datacite2Scholix {
return res;
}
protected String generateId(final String pid, final String pidType, final String entityType) {
public static String generateId(final String pid, final String pidType, final String entityType) {
String type;
switch (entityType){
case "publication":

View File

@ -27,10 +27,12 @@ public class DataciteClientIterator implements Iterator<String> {
final String esIndex;
final ObjectMapper mapper = new ObjectMapper();
public DataciteClientIterator(final String esHost, final String esIndex, final long timestamp) throws IOException {
public DataciteClientIterator(final String esHost, final String esIndex, long timestamp) throws IOException {
this.esHost = esHost;
this.esIndex = esIndex;
// THIS FIX IS NECESSARY to avoid different timezone
timestamp -= (60 *60 *2);
final String body =getResponse(String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), String.format("{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}", timestamp));
scrollId= getJPathString(scrollIdPath, body);
buffer = getBlobs(body);

View File

@ -4,18 +4,25 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier;
import eu.dnetlib.dhp.provision.scholix.ScholixRelationship;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class SparkResolveScholixTarget {
@ -29,8 +36,6 @@ public class SparkResolveScholixTarget {
final String sourcePath = parser.get("sourcePath");
final String workingDirPath= parser.get("workingDirPath");
final String indexHost= parser.get("indexHost");
try (SparkSession spark = getSession(conf, master)){
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -65,7 +70,55 @@ public class SparkResolveScholixTarget {
}, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB");
Dataset<ScholixResource> s2 = spark.read().load(workingDirPath+"/stepB").as(Encoders.bean(ScholixResource.class));
s1.joinWith(s2, s1.col("target.identifier.identifier").equalTo(s2.col("identifier.identifier")), "left")
.flatMap((FlatMapFunction<Tuple2<Scholix, ScholixResource>, Scholix>) f ->
{
final List<Scholix> res = new ArrayList<>();
final Scholix s = f._1();
final ScholixResource target = f._2();
if (StringUtils.isNotBlank(s.getIdentifier()))
res.add(s);
else if (target == null) {
ScholixResource currentTarget = s.getTarget();
currentTarget.setObjectType("unknown");
currentTarget.setDnetIdentifier(Datacite2Scholix.generateId(currentTarget.getIdentifier().get(0).getIdentifier(),currentTarget.getIdentifier().get(0).getSchema(), currentTarget.getObjectType()));
s.generateIdentifier();
res.add(s);
final Scholix inverse = new Scholix();
inverse.setTarget(s.getSource());
inverse.setSource(s.getTarget());
inverse.setLinkprovider(s.getLinkprovider());
inverse.setPublicationDate(s.getPublicationDate());
inverse.setPublisher(s.getPublisher());
inverse.setRelationship(new ScholixRelationship(s.getRelationship().getInverse(), s.getRelationship().getSchema(), s.getRelationship().getName()));
inverse.generateIdentifier();
res.add(inverse);
} else
{
target.setIdentifier(target.getIdentifier().stream().map(d -> new ScholixIdentifier(d.getIdentifier().toLowerCase(), d.getSchema().toLowerCase())).collect(Collectors.toList()));
s.setTarget(target);
s.generateIdentifier();
res.add(s);
final Scholix inverse = new Scholix();
inverse.setTarget(s.getSource());
inverse.setSource(s.getTarget());
inverse.setLinkprovider(s.getLinkprovider());
inverse.setPublicationDate(s.getPublicationDate());
inverse.setPublisher(s.getPublisher());
inverse.setRelationship(new ScholixRelationship(s.getRelationship().getInverse(), s.getRelationship().getSchema(), s.getRelationship().getName()));
inverse.generateIdentifier();
res.add(inverse);
}
return res.iterator();
}, Encoders.bean(Scholix.class)).javaRDD().map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(workingDirPath+"/resolved_json");
}
}

View File

@ -0,0 +1,10 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,68 @@
<workflow-app name="Index graph to ElasticSearch" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingDirPath</name>
<description>the source path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
</parameters>
<start to="indexSummary"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index Summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--type</arg><arg>summary</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,14 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,97 @@
<workflow-app name="Keep On Synch datacite" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingDirPath</name>
<description>the source path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
<property>
<name>timestamp</name>
<description>timestamp from incremental harvesting</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingDirPath}/synch'/>
<mkdir path='${workingDirPath}/synch'/>
</fs>
<ok to="ImportDataciteUpdate"/>
<error to="Kill"/>
</action>
<action name="ImportDataciteUpdate">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.update.RetrieveUpdateFromDatacite</main-class>
<arg>-t</arg><arg>${workingDirPath}/synch/input_json</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg>
<arg>-ih</arg><arg>ip-90-147-167-25.ct1.garrservices.it</arg>
<arg>-in</arg><arg>datacite</arg>
</java>
<ok to="resolveScholix"/>
<error to="Kill"/>
</action>
<action name="resolveScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>resolve and generate Scholix</name>
<class>eu.dnetlib.dhp.provision.update.SparkResolveScholixTarget</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<arg>-m</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${workingDirPath}/synch/input_json</arg>
<arg>-w</arg><arg>${workingDirPath}/synch</arg>
<arg>-h</arg><arg>ip-90-147-167-25.ct1.garrservices.it</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/synch/resolved_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -6,21 +6,13 @@ import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.provision.update.*;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
public class DataciteClientTest {
@Test
public void dataciteSCholixTest() throws Exception {
final String json = IOUtils.toString(getClass().getResourceAsStream("datacite.json"));
@ -32,25 +24,16 @@ public class DataciteClientTest {
}
public void testClient() throws Exception {
RetrieveUpdateFromDatacite.main(new String[]{
"-n", "file:///data/new_s2.txt",
"-t", "/data/new_s2.txt",
"-ts", "1585760736",
"-ih", "ip-90-147-167-25.ct1.garrservices.it",
"-in", "datacite",
});
SparkResolveScholixTarget.main(new String[]{
"-s", "file:///data/new_s.txt",
"-m", "local[*]",
"-w", "/data/scholix/provision",
"-h", "ip-90-147-167-25.ct1.garrservices.it",
});
}
// public void testS() throws Exception {
// RetrieveUpdateFromDatacite.main(new String[]{
// "-n", "file:///data/new_s2.txt",
// "-t", "/data/new_s2.txt",
// "-ts", "1586974078",
// "-ih", "ip-90-147-167-25.ct1.garrservices.it",
// "-in", "datacite",
// });
//
// }
public void testResolveDataset() throws Exception {
@ -64,34 +47,5 @@ public class DataciteClientTest {
ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46");
Assertions.assertNotNull(crossrefByDOI);
System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI));
}
private String getResponse(final String url,final String json ) {
CloseableHttpClient client = HttpClients.createDefault();
try {
HttpPost httpPost = new HttpPost(url);
if (json!= null) {
StringEntity entity = new StringEntity(json);
httpPost.setEntity(entity);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
}
CloseableHttpResponse response = client.execute(httpPost);
return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) {
throw new RuntimeException("Error on executing request ",e);
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close client ",e);
}
}
}
}

12
pom.xml
View File

@ -293,6 +293,12 @@
<artifactId>dnet-actionmanager-common</artifactId>
<version>6.0.5</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-api</artifactId>
<version>[4.0.1,5.0.0)</version>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
@ -346,6 +352,12 @@
<artifactId>mongo-java-driver</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.10</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>