Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Miriam Baglioni 2020-04-08 18:07:26 +02:00
commit df2fc4a6d7
23 changed files with 745 additions and 292 deletions

View File

@ -29,6 +29,20 @@ public class SparkSessionSupport {
runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn);
}
/**
* Runs a given function using SparkSession created with hive support and using default builder and supplied SparkConf.
* Stops SparkSession when SparkSession is managed. Allows to reuse SparkSession created externally.
*
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
*/
public static void runWithSparkHiveSession(SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
runWithSparkSession(c -> SparkSession.builder().config(c).enableHiveSupport().getOrCreate(), conf, isSparkSessionManaged, fn);
}
/**
* Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.actionmanager.promote;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
@ -45,34 +46,7 @@ public class PromoteActionPayloadForGraphTableJobTest {
conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
conf.setMaster("local");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{
Author.class,
Context.class,
Country.class,
DataInfo.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
Result.class,
Software.class,
StructuredProperty.class
});
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate();
}

View File

@ -0,0 +1,69 @@
package eu.dnetlib.dhp.oa.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class GraphHiveImporterJob {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJob.class);
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(GraphHiveImporterJob.class.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_graph_hive_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName: {}", hiveDbName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession(conf, isSparkSessionManaged,
spark -> loadGraphAsHiveDB(spark, inputPath, hiveDbName));
}
// protected for testing
private static void loadGraphAsHiveDB(SparkSession spark, String inputPath, String hiveDbName) {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// Read the input file and convert it into RDD of serializable object
ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name)
.map(s -> new ObjectMapper().readValue(s, clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.saveAsTable(hiveDbName + "." + name));
}
}

View File

@ -1,62 +0,0 @@
package eu.dnetlib.dhp.oa.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class SparkGraphImporterJob {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_graph_parameters.json")));
parser.parseArgument(args);
new SparkGraphImporterJob().run(parser);
}
private void run(ArgumentApplicationParser parser) {
try(SparkSession spark = getSparkSession(parser)) {
final String inputPath = parser.get("sourcePath");
final String hiveDbName = parser.get("hive_db_name");
runWith(spark, inputPath, hiveDbName);
}
}
// protected for testing
protected void runWith(SparkSession spark, String inputPath, String hiveDbName) {
spark.sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", hiveDbName));
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// Read the input file and convert it into RDD of serializable object
ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name)
.map(s -> new ObjectMapper().readValue(s, clazz))
.rdd(), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.saveAsTable(hiveDbName + "." + name));
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
return SparkSession
.builder()
.appName(SparkGraphImporterJob.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -49,7 +49,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>MapGraphAsHiveDB</name>
<class>eu.dnetlib.dhp.oa.graph.SparkGraphImporterJob</class>
<class>eu.dnetlib.dhp.oa.graph.GraphHiveImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the target hive database name",
"paramRequired": true
}
]

View File

@ -1,6 +0,0 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true},
{"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true}
]

View File

@ -0,0 +1,92 @@
package eu.dnetlib.dhp.oa.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
public class GraphHiveImporterJobTest {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = GraphHiveImporterJobTest.class.getClassLoader();
public static final String JDBC_DERBY_TEMPLATE = "jdbc:derby:;databaseName=%s/junit_metastore_db;create=true";
private static SparkSession spark;
private static Path workingDir;
private static String dbName;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(GraphHiveImporterJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
dbName = RandomStringUtils.randomAlphabetic(5);
log.info("using DB name {}", "test_" + dbName);
SparkConf conf = new SparkConf();
conf.setAppName(GraphHiveImporterJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
conf.set("javax.jdo.option.ConnectionURL", String.format(JDBC_DERBY_TEMPLATE, workingDir.resolve("warehouse").toString()));
spark = SparkSession
.builder()
.appName(GraphHiveImporterJobTest.class.getSimpleName())
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testImportGraphAsHiveDB() throws Exception {
GraphHiveImporterJob.main(new String[]{
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath", getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(),
"-hiveMetastoreUris", "",
"-hiveDbName", dbName
});
ModelSupport.oafTypes.forEach((name, clazz) -> {
long count = spark.read().table(dbName + "." + name).count();
int expected = name.equals("relation") ? 100 : 10;
Assertions.assertEquals(expected, count, String.format("%s should be %s", name, expected));
});
}
}

View File

@ -1,54 +0,0 @@
package eu.dnetlib.dhp.oa.graph;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
public class SparkGraphImporterJobTest {
private final static String TEST_DB_NAME = "test";
@Test
public void testImport(@TempDir Path outPath) {
try(SparkSession spark = testSparkSession(outPath.toString())) {
new SparkGraphImporterJob().runWith(
spark,
getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(),
TEST_DB_NAME);
GraphMappingUtils.types.forEach((name, clazz) -> {
final long count = spark.read().table(TEST_DB_NAME + "." + name).count();
if (name.equals("relation")) {
Assertions.assertEquals(100, count, String.format("%s should be 100", name));
} else {
Assertions.assertEquals(10, count, String.format("%s should be 10", name));
}
});
}
}
private SparkSession testSparkSession(final String inputPath) {
SparkConf conf = new SparkConf();
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("hive.metastore.warehouse.dir", inputPath + "/warehouse");
conf.set("spark.sql.warehouse.dir", inputPath);
conf.set("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s/junit_metastore_db;create=true", inputPath));
conf.set("spark.ui.enabled", "false");
return SparkSession
.builder()
.appName(SparkGraphImporterJobTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.provision.update;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.provision.scholix.ScholixCollectedFrom;
import eu.dnetlib.dhp.provision.scholix.ScholixEntityId;
import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class CrossRefParserJSON {
private static List<ScholixCollectedFrom> collectedFrom =generateCrossrefCollectedFrom("complete");
public static ScholixResource parseRecord(final String record) {
if (record == null) return null;
JsonElement jElement = new JsonParser().parse(record);
JsonElement source = null;
if (jElement.getAsJsonObject().has("_source")) {
source = jElement.getAsJsonObject().get("_source");
if (source == null || !source.isJsonObject())
return null;
}
else if(jElement.getAsJsonObject().has("DOI")){
source = jElement;
} else {
return null;
}
final JsonObject message = source.getAsJsonObject();
ScholixResource currentObject = new ScholixResource();
if (message.get("DOI") != null) {
final String doi = message.get("DOI").getAsString();
currentObject.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi")));
}
if ((!message.get("created").isJsonNull()) && (message.getAsJsonObject("created").get("date-time") != null)) {
currentObject.setPublicationDate(message.getAsJsonObject("created").get("date-time").getAsString());
}
if (message.get("title")!= null && !message.get("title").isJsonNull() && message.get("title").isJsonArray() ) {
JsonArray array = message.get("title").getAsJsonArray();
currentObject.setTitle(array.get(0).getAsString());
}
if (message.get("author") != null && !message.get("author").isJsonNull()) {
JsonArray author = message.getAsJsonArray("author");
List<ScholixEntityId> authorList = new ArrayList<>();
for (JsonElement anAuthor : author) {
JsonObject currentAuth = anAuthor.getAsJsonObject();
String family = "";
String given = "";
if (currentAuth != null && currentAuth.get("family") != null && !currentAuth.get("family").isJsonNull()) {
family = currentAuth.get("family").getAsString();
}
if (currentAuth != null && currentAuth.get("given") != null && !currentAuth.get("given").isJsonNull()) {
given = currentAuth.get("given").getAsString();
}
authorList.add(new ScholixEntityId(String.format("%s %s", family, given), null));
}
currentObject.setCreator(authorList);
}
if (message.get("publisher") != null && !message.get("publisher").isJsonNull()) {
currentObject.setPublisher(Collections.singletonList(new ScholixEntityId(message.get("publisher").getAsString(), null)));
}
currentObject.setCollectedFrom(collectedFrom);
currentObject.setObjectType("publication");
currentObject.setDnetIdentifier(generateId(message.get("DOI").getAsString(), "doi", "publication"));
return currentObject;
}
private static List<ScholixCollectedFrom> generateCrossrefCollectedFrom(final String completionStatus) {
final ScholixEntityId scholixEntityId = new ScholixEntityId("Crossref",
Collections.singletonList(new ScholixIdentifier("dli_________::crossref", "dnet_identifier")));
return Collections.singletonList(
new ScholixCollectedFrom(
scholixEntityId,"resolved", completionStatus));
}
private static String generateId(final String pid, final String pidType, final String entityType) {
String type;
switch (entityType){
case "publication":
type = "50|";
break;
case "dataset":
type = "60|";
break;
case "unknown":
type = "70|";
break;
default:
throw new IllegalArgumentException("unexpected value "+entityType);
}
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
}
}

View File

@ -0,0 +1,88 @@
package eu.dnetlib.dhp.provision.update;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.ByteArrayOutputStream;
import java.util.zip.Inflater;
public class CrossrefClient {
private String host;
private String index ="crossref";
private String indexType = "item";
public CrossrefClient(String host) {
this.host = host;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getIndexType() {
return indexType;
}
public void setIndexType(String indexType) {
this.indexType = indexType;
}
private static String decompressBlob(final String blob) {
try {
byte[] byteArray = Base64.decodeBase64(blob.getBytes());
final Inflater decompresser = new Inflater();
decompresser.setInput(byteArray);
final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
byte[] buffer = new byte[8192];
while (!decompresser.finished()) {
int size = decompresser.inflate(buffer);
bos.write(buffer, 0, size);
}
byte[] unzippeddata = bos.toByteArray();
decompresser.end();
return new String(unzippeddata);
} catch (Throwable e) {
throw new RuntimeException("Wrong record:" + blob,e);
}
}
public ScholixResource getResourceByDOI(final String doi) {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F")));
CloseableHttpResponse response = client.execute(httpGet);
String json = IOUtils.toString(response.getEntity().getContent());
if (json.contains("blob")) {
JsonParser p = new JsonParser();
final JsonElement root = p.parse(json);
json =decompressBlob(root.getAsJsonObject().get("_source").getAsJsonObject().get("blob").getAsString());
}
return CrossRefParserJSON.parseRecord(json);
} catch (Throwable e) {
return null;
}
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision;
package eu.dnetlib.dhp.provision.update;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.provision.scholix.*;
@ -15,16 +15,14 @@ import java.util.stream.Collectors;
public class Datacite2Scholix {
private String rootPath = "$.attributes";
final RelationMapper relationMapper;
public Datacite2Scholix(RelationMapper relationMapper) {
this.relationMapper = relationMapper;
}
public List<Scholix> generateScholixFromJson(final String dJson) {
List<Map<String, String>> relIds = getRelatedIendtifiers(dJson);
relIds = relIds!= null ? relIds.stream().filter(m->
m.containsKey("relatedIdentifierType") && m.containsKey("relationType" ) && m.containsKey( "relatedIdentifier")
@ -32,22 +30,24 @@ public class Datacite2Scholix {
if(relIds== null || relIds.size() ==0 )
return null;
final String updated = JsonPath.read(dJson,"$.attributes.updated" );
final String updated = JsonPath.read(dJson, rootPath + ".updated");
ScholixResource resource = generateDataciteScholixResource(dJson);
return relIds.stream().flatMap(s-> {
final List<Scholix> result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated);
return result.stream();
}).collect(Collectors.toList());
}
public String getRootPath() {
return rootPath;
}
public void setRootPath(String rootPath) {
this.rootPath = rootPath;
}
private List<Scholix> generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) {
if ("doi".equalsIgnoreCase(pidtype)) {
ScholixResource target = new ScholixResource();
target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype)));
@ -92,20 +92,17 @@ public class Datacite2Scholix {
result.add(s2);
return result;
}
}
public ScholixResource generateDataciteScholixResource(String dJson) {
ScholixResource resource = new ScholixResource();
String DOI_PATH = "$.attributes.doi";
String DOI_PATH = rootPath + ".doi";
final String doi = JsonPath.read(dJson, DOI_PATH);
resource.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi")));
resource.setObjectType(getType(dJson));
resource.setDnetIdentifier(generateId(doi, "doi", resource.getObjectType()));
resource.setCollectedFrom(generateDataciteCollectedFrom("complete"));
final String publisher = JsonPath.read(dJson, "$.attributes.publisher");
final String publisher = JsonPath.read(dJson, rootPath + ".publisher");
if (StringUtils.isNotBlank(publisher))
resource.setPublisher(Collections.singletonList(new ScholixEntityId(publisher, null)));
final String date = getDate(dJson);
@ -119,7 +116,7 @@ public class Datacite2Scholix {
}
private List<ScholixEntityId> getCreators(final String json) {
final List<String> creatorName = JsonPath.read(json, "$.attributes.creators[*].name");
final List<String> creatorName = JsonPath.read(json, rootPath + ".creators[*].name");
if (creatorName!= null && creatorName.size() >0) {
return creatorName.stream().map(s-> new ScholixEntityId(s, null)).collect(Collectors.toList());
}
@ -127,12 +124,12 @@ public class Datacite2Scholix {
}
private String getTitle(final String json){
final List<String> titles = JsonPath.read(json, "$.attributes.titles[*].title");
final List<String> titles = JsonPath.read(json, rootPath + ".titles[*].title");
return titles!= null && titles.size()>0?titles.get(0): null;
}
private String getDate(final String json) {
final List<Map<String,String>> dates = JsonPath.read(json,"$.attributes.dates");
final List<Map<String,String>> dates = JsonPath.read(json, rootPath + ".dates");
if(dates!= null && dates.size()>0){
List<Map<String, String>> issued = dates.stream().filter(s -> "issued".equalsIgnoreCase(s.get("dateType"))).collect(Collectors.toList());
@ -152,7 +149,7 @@ public class Datacite2Scholix {
private String getType(final String json) {
try {
final String bibtext = JsonPath.read(json, "$.attributes.types.bibtex");
final String bibtext = JsonPath.read(json, rootPath + ".types.bibtex");
if ("article".equalsIgnoreCase(bibtext)) {
return "publication";
}
@ -162,14 +159,10 @@ public class Datacite2Scholix {
}
}
private List<Map<String, String>> getRelatedIendtifiers(final String json) {
String REL_IDENTIFIER_PATH = "$.attributes.relatedIdentifiers[*]";
String REL_IDENTIFIER_PATH = rootPath + ".relatedIdentifiers[*]";
List<Map<String, String>> res = JsonPath.read(json, REL_IDENTIFIER_PATH);
return res;
}
protected String generateId(final String pid, final String pidType, final String entityType) {
@ -186,18 +179,7 @@ public class Datacite2Scholix {
break;
default:
throw new IllegalArgumentException("unexpected value "+entityType);
}
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
}
}

View File

@ -0,0 +1,75 @@
package eu.dnetlib.dhp.provision.update;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.IOException;
public class DataciteClient {
private String host;
private String index ="datacite";
private String indexType = "dump";
private Datacite2Scholix d2s;
public DataciteClient(String host) {
this.host = host;
d2s = new Datacite2Scholix(null);
d2s.setRootPath("$._source.attributes");
}
public Iterable<String> getDatasetsFromTs(final Long timestamp) {
return ()-> {
try {
return new DataciteClientIterator(host, index, timestamp);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getIndexType() {
return indexType;
}
public void setIndexType(String indexType) {
this.indexType = indexType;
}
public ScholixResource getDatasetByDOI(final String doi) {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F")));
CloseableHttpResponse response = client.execute(httpGet);
final String json =IOUtils.toString(response.getEntity().getContent());
return d2s.generateDataciteScholixResource(json);
} catch (Throwable e) {
return null;
}
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision;
package eu.dnetlib.dhp.provision.update;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
@ -14,7 +14,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
public class DataciteClient implements Iterator<String> {
public class DataciteClientIterator implements Iterator<String> {
final static String blobPath = "$.hits.hits[*]._source";
final static String scrollIdPath = "$._scroll_id";
@ -27,7 +27,7 @@ public class DataciteClient implements Iterator<String> {
final String esIndex;
final ObjectMapper mapper = new ObjectMapper();
public DataciteClient(final String esHost, final String esIndex, final long timestamp) throws IOException {
public DataciteClientIterator(final String esHost, final String esIndex, final long timestamp) throws IOException {
this.esHost = esHost;
this.esIndex = esIndex;

View File

@ -1,8 +1,10 @@
package eu.dnetlib.dhp.provision;
package eu.dnetlib.dhp.provision.update;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -11,17 +13,19 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class RetrieveUpdateFromDatacite {
public static void main(String[] args) throws Exception{
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/retrieve_update_parameters.json")));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json")));
parser.parseArgument(args);
final String hdfsuri = parser.get("namenode");
Path hdfswritepath = new Path(parser.get("targetPath"));
final String timestamp = parser.get("timestamp");
final long timestamp = Long.parseLong(parser.get("timestamp"));
final String host = parser.get("indexHost");
final String index = parser.get("indexName");
// ====== Init HDFS File System Object
Configuration conf = new Configuration();
@ -32,13 +36,28 @@ public class RetrieveUpdateFromDatacite {
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem.get(URI.create(hdfsuri), conf);
final AtomicInteger counter = new AtomicInteger(0);
final Datacite2Scholix d2s = new Datacite2Scholix(RelationMapper.load());
final ObjectMapper mapper = new ObjectMapper();
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final Text value = new Text();
final IntWritable key = new IntWritable();
int i = 0;
for(String dataset: new DataciteClient(host).getDatasetsFromTs(timestamp)) {
i++;
List<Scholix> scholix = d2s.generateScholixFromJson(dataset);
if (scholix!= null)
for(Scholix s: scholix) {
key.set(i);
value.set(mapper.writeValueAsString(s));
writer.append(key, value);
if (i % 10000 == 0) {
System.out.println("wrote "+i);
}
}
}
}
}
}

View File

@ -0,0 +1,81 @@
package eu.dnetlib.dhp.provision.update;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
import java.util.Collections;
public class SparkResolveScholixTarget {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResolveScholixTarget.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String master = parser.get("master");
final String sourcePath = parser.get("sourcePath");
final String workingDirPath= parser.get("workingDirPath");
final String indexHost= parser.get("indexHost");
try (SparkSession spark = getSession(conf, master)){
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
spark.createDataset(sc.sequenceFile(sourcePath, IntWritable.class,Text.class)
.map(Tuple2::_2)
.map(s-> new ObjectMapper().readValue(s.toString(), Scholix.class)).rdd(), Encoders.bean(Scholix.class))
.write().save(workingDirPath+"/stepA");
Dataset<Scholix> s1 = spark.read().load(workingDirPath+"/stepA").as(Encoders.bean(Scholix.class));
s1.where(s1.col("target.dnetIdentifier").isNull()).select(s1.col("target.identifier")).distinct()
.map((MapFunction<Row, ScholixResource>) f-> {
final String pid = ((Row) f.getList(0).get(0)).getString(0);
ScholixResource publication = new CrossrefClient(indexHost).getResourceByDOI(pid);
if (publication != null) {
return publication;
}
ScholixResource dataset = new DataciteClient(indexHost).getDatasetByDOI(pid);
if (dataset!= null) {
return dataset;
}
ScholixResource r = new ScholixResource();
r.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, "doi")));
r.setObjectType("unknown");
r.setDnetIdentifier("70|"+DHPUtils.md5(String.format("%s::doi", pid.toLowerCase().trim())));
return r;
}, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB");
}
}
private static SparkSession getSession(SparkConf conf, String master) {
return SparkSession
.builder()
.config(conf)
.appName(SparkResolveScholixTarget.class.getSimpleName())
.master(master)
.getOrCreate();
}
}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the name node",
"paramRequired": true
},
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingDirPath",
"paramDescription": "the working Dir Path",
"paramRequired": true
},
{
"paramName": "h",
"paramLongName": "indexHost",
"paramDescription": "the working Dir Path",
"paramRequired": true
}
]

View File

@ -0,0 +1,33 @@
[
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the name node",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramDescription": "the working path where generated files",
"paramRequired": true
},
{
"paramName": "ts",
"paramLongName": "timestamp",
"paramDescription": "the timestamp for incremental harvesting",
"paramRequired": true
},
{
"paramName": "ih",
"paramLongName": "indexHost",
"paramDescription": "the ip name of the index",
"paramRequired": true
},
{
"paramName": "in",
"paramLongName": "indexName",
"paramDescription": "the name of the index",
"paramRequired": true
}
]

View File

@ -16,21 +16,21 @@
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
<property>
<name>idScholix</name>
<description>the identifier name of the scholix </description>
</property>
<property>
<name>idSummary</name>
<description>the identifier name of the summary</description>
</property>
<!-- <property>-->
<!-- <name>index</name>-->
<!-- <description>index name</description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>idScholix</name>-->
<!-- <description>the identifier name of the scholix </description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>idSummary</name>-->
<!-- <description>the identifier name of the summary</description>-->
<!-- </property>-->
</parameters>
<start to="indexSummary"/>
<start to="DeleteTargetPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -98,45 +98,45 @@
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index Summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--type</arg><arg>summary</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<!-- <action name="indexSummary">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>index Summary</name>-->
<!-- <class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>-->
<!-- <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>-->
<!-- <spark-opts>&#45;&#45;executor-memory ${sparkExecutorMemory} &#45;&#45;driver-memory=${sparkDriverMemory} ${sparkExtraOPT} &#45;&#45;conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>-->
<!-- <arg>-mt</arg> <arg>yarn-cluster</arg>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDirPath}/summary</arg>-->
<!-- <arg>&#45;&#45;index</arg><arg>${index}_object</arg>-->
<!-- <arg>&#45;&#45;idPath</arg><arg>id</arg>-->
<!-- <arg>&#45;&#45;type</arg><arg>summary</arg>-->
<!-- </spark>-->
<!-- <ok to="indexScholix"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<!-- <action name="indexScholix">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>index scholix</name>-->
<!-- <class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>-->
<!-- <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>-->
<!-- <spark-opts>&#45;&#45;executor-memory ${sparkExecutorMemory} &#45;&#45;driver-memory=${sparkDriverMemory} ${sparkExtraOPT} &#45;&#45;conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>-->
<!-- <arg>-mt</arg> <arg>yarn-cluster</arg>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>-->
<!-- <arg>&#45;&#45;index</arg><arg>${index}_scholix</arg>-->
<!-- <arg>&#45;&#45;idPath</arg><arg>identifier</arg>-->
<!-- <arg>&#45;&#45;type</arg><arg>scholix</arg>-->
<!-- </spark>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<end name="End"/>
</workflow-app>

View File

@ -2,6 +2,8 @@ package eu.dnetlib.dhp.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.provision.update.*;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -9,10 +11,9 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
@ -27,58 +28,45 @@ public class DataciteClientTest {
Datacite2Scholix ds = new Datacite2Scholix(mapper);
final List<Scholix> s = ds.generateScholixFromJson(json);
System.out.println(new ObjectMapper().writeValueAsString(s));
}
@Test
public void testClient() throws Exception {
DataciteClient client = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it","datacite",1585454082);
int i = 0;
final RelationMapper mapper = RelationMapper.load();
Datacite2Scholix ds = new Datacite2Scholix(mapper);
BufferedWriter writer = new BufferedWriter(new FileWriter("/Users/sandro/new_s.txt"));
final ObjectMapper m = new ObjectMapper();
RetrieveUpdateFromDatacite.main(new String[]{
"-n", "file:///data/new_s2.txt",
"-t", "/data/new_s2.txt",
"-ts", "1585760736",
"-ih", "ip-90-147-167-25.ct1.garrservices.it",
"-in", "datacite",
});
SparkResolveScholixTarget.main(new String[]{
"-s", "file:///data/new_s.txt",
"-m", "local[*]",
"-w", "/data/scholix/provision",
"-h", "ip-90-147-167-25.ct1.garrservices.it",
while (client.hasNext()){
i ++;
final String next = client.next();
try {
final List<Scholix> res = ds.generateScholixFromJson(next);
if (res!= null)
res
.forEach(
s -> {
try {
writer.write(m.writeValueAsString(s));
writer.write("\n");
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
);
}catch (Throwable t) {
System.out.println(next);
throw new RuntimeException(t);
}
if(i %1000 == 0) {
System.out.println("added "+i);
}
}
public void testResolveDataset() throws Exception {
DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it");
ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5");
Assertions.assertNotNull(datasetByDOI);
System.out.println(new ObjectMapper().writeValueAsString(datasetByDOI));
CrossrefClient cr = new CrossrefClient("ip-90-147-167-25.ct1.garrservices.it");
ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46");
Assertions.assertNotNull(crossrefByDOI);
System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI));
}
private String getResponse(final String url,final String json ) {