From ba9f07a6fe1c09214ad1ca673273fa1a54c2daf4 Mon Sep 17 00:00:00 2001 From: sandro Date: Wed, 8 Apr 2020 13:18:20 +0200 Subject: [PATCH] fixed wrong test --- .../provision/update/CrossRefParserJSON.java | 108 ++++++++++++++++++ .../dhp/provision/update/CrossrefClient.java | 88 ++++++++++++++ .../{ => update}/Datacite2Scholix.java | 52 +++------ .../dhp/provision/update/DataciteClient.java | 75 ++++++++++++ .../DataciteClientIterator.java} | 6 +- .../RetrieveUpdateFromDatacite.java | 35 ++++-- .../update/SparkResolveScholixTarget.java | 81 +++++++++++++ .../input_resolve_scholix_parameters.json | 26 +++++ .../input_retrieve_update_parameters.json | 33 ++++++ .../provision/retrieve_update_parameters.json | 0 .../dhp/sx/provision/oozie_app/workflow.xml | 102 ++++++++--------- .../dhp/provision/DataciteClientTest.java | 76 ++++++------ 12 files changed, 541 insertions(+), 141 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{ => update}/Datacite2Scholix.java (90%) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{DataciteClient.java => update/DataciteClientIterator.java} (93%) rename dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/{ => update}/RetrieveUpdateFromDatacite.java (52%) create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json create mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json delete mode 100644 dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/retrieve_update_parameters.json diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java new file mode 100644 index 0000000000..828d8f9b52 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossRefParserJSON.java @@ -0,0 +1,108 @@ +package eu.dnetlib.dhp.provision.update; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import eu.dnetlib.dhp.provision.scholix.ScholixCollectedFrom; +import eu.dnetlib.dhp.provision.scholix.ScholixEntityId; +import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.utils.DHPUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class CrossRefParserJSON { + + private static List collectedFrom =generateCrossrefCollectedFrom("complete"); + + public static ScholixResource parseRecord(final String record) { + if (record == null) return null; + JsonElement jElement = new JsonParser().parse(record); + JsonElement source = null; + if (jElement.getAsJsonObject().has("_source")) { + source = jElement.getAsJsonObject().get("_source"); + if (source == null || !source.isJsonObject()) + return null; + } + else if(jElement.getAsJsonObject().has("DOI")){ + source = jElement; + } else { + return null; + } + + final JsonObject message = source.getAsJsonObject(); + ScholixResource currentObject = new ScholixResource(); + + if (message.get("DOI") != null) { + final String doi = message.get("DOI").getAsString(); + currentObject.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi"))); + } + + if ((!message.get("created").isJsonNull()) && (message.getAsJsonObject("created").get("date-time") != null)) { + currentObject.setPublicationDate(message.getAsJsonObject("created").get("date-time").getAsString()); + } + + if (message.get("title")!= null && !message.get("title").isJsonNull() && message.get("title").isJsonArray() ) { + + JsonArray array = message.get("title").getAsJsonArray(); + currentObject.setTitle(array.get(0).getAsString()); + } + if (message.get("author") != null && !message.get("author").isJsonNull()) { + JsonArray author = message.getAsJsonArray("author"); + List authorList = new ArrayList<>(); + for (JsonElement anAuthor : author) { + JsonObject currentAuth = anAuthor.getAsJsonObject(); + + String family = ""; + String given = ""; + if (currentAuth != null && currentAuth.get("family") != null && !currentAuth.get("family").isJsonNull()) { + family = currentAuth.get("family").getAsString(); + } + if (currentAuth != null && currentAuth.get("given") != null && !currentAuth.get("given").isJsonNull()) { + given = currentAuth.get("given").getAsString(); + } + authorList.add(new ScholixEntityId(String.format("%s %s", family, given), null)); + } + currentObject.setCreator(authorList); + } + if (message.get("publisher") != null && !message.get("publisher").isJsonNull()) { + currentObject.setPublisher(Collections.singletonList(new ScholixEntityId(message.get("publisher").getAsString(), null))); + } + currentObject.setCollectedFrom(collectedFrom); + currentObject.setObjectType("publication"); + currentObject.setDnetIdentifier(generateId(message.get("DOI").getAsString(), "doi", "publication")); + + return currentObject; + } + + private static List generateCrossrefCollectedFrom(final String completionStatus) { + final ScholixEntityId scholixEntityId = new ScholixEntityId("Crossref", + Collections.singletonList(new ScholixIdentifier("dli_________::crossref", "dnet_identifier"))); + return Collections.singletonList( + new ScholixCollectedFrom( + scholixEntityId,"resolved", completionStatus)); + } + + private static String generateId(final String pid, final String pidType, final String entityType) { + String type; + switch (entityType){ + case "publication": + type = "50|"; + break; + case "dataset": + type = "60|"; + break; + case "unknown": + type = "70|"; + break; + default: + throw new IllegalArgumentException("unexpected value "+entityType); + } + return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim())); + } + + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java new file mode 100644 index 0000000000..3190ee5168 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/CrossrefClient.java @@ -0,0 +1,88 @@ +package eu.dnetlib.dhp.provision.update; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.ByteArrayOutputStream; +import java.util.zip.Inflater; + +public class CrossrefClient { + + private String host; + private String index ="crossref"; + private String indexType = "item"; + + + public CrossrefClient(String host) { + this.host = host; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public String getIndexType() { + return indexType; + } + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + private static String decompressBlob(final String blob) { + try { + byte[] byteArray = Base64.decodeBase64(blob.getBytes()); + final Inflater decompresser = new Inflater(); + decompresser.setInput(byteArray); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length); + byte[] buffer = new byte[8192]; + while (!decompresser.finished()) { + int size = decompresser.inflate(buffer); + bos.write(buffer, 0, size); + } + byte[] unzippeddata = bos.toByteArray(); + decompresser.end(); + return new String(unzippeddata); + } catch (Throwable e) { + throw new RuntimeException("Wrong record:" + blob,e); + } + } + + + + public ScholixResource getResourceByDOI(final String doi) { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F"))); + CloseableHttpResponse response = client.execute(httpGet); + String json = IOUtils.toString(response.getEntity().getContent()); + if (json.contains("blob")) { + JsonParser p = new JsonParser(); + final JsonElement root = p.parse(json); + json =decompressBlob(root.getAsJsonObject().get("_source").getAsJsonObject().get("blob").getAsString()); + } + return CrossRefParserJSON.parseRecord(json); + } catch (Throwable e) { + return null; + } + + } +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/Datacite2Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java similarity index 90% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/Datacite2Scholix.java rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java index 809186a502..c6617a8238 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/Datacite2Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.provision; +package eu.dnetlib.dhp.provision.update; import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.provision.scholix.*; @@ -15,16 +15,14 @@ import java.util.stream.Collectors; public class Datacite2Scholix { - + private String rootPath = "$.attributes"; final RelationMapper relationMapper; public Datacite2Scholix(RelationMapper relationMapper) { this.relationMapper = relationMapper; } - public List generateScholixFromJson(final String dJson) { - List> relIds = getRelatedIendtifiers(dJson); relIds = relIds!= null ? relIds.stream().filter(m-> m.containsKey("relatedIdentifierType") && m.containsKey("relationType" ) && m.containsKey( "relatedIdentifier") @@ -32,22 +30,24 @@ public class Datacite2Scholix { if(relIds== null || relIds.size() ==0 ) return null; - - - final String updated = JsonPath.read(dJson,"$.attributes.updated" ); + final String updated = JsonPath.read(dJson, rootPath + ".updated"); ScholixResource resource = generateDataciteScholixResource(dJson); return relIds.stream().flatMap(s-> { final List result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated); return result.stream(); }).collect(Collectors.toList()); - } + public String getRootPath() { + return rootPath; + } + + public void setRootPath(String rootPath) { + this.rootPath = rootPath; + } private List generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) { - - if ("doi".equalsIgnoreCase(pidtype)) { ScholixResource target = new ScholixResource(); target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype))); @@ -92,20 +92,17 @@ public class Datacite2Scholix { result.add(s2); return result; } - - - } public ScholixResource generateDataciteScholixResource(String dJson) { ScholixResource resource = new ScholixResource(); - String DOI_PATH = "$.attributes.doi"; + String DOI_PATH = rootPath + ".doi"; final String doi = JsonPath.read(dJson, DOI_PATH); resource.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi"))); resource.setObjectType(getType(dJson)); resource.setDnetIdentifier(generateId(doi, "doi", resource.getObjectType())); resource.setCollectedFrom(generateDataciteCollectedFrom("complete")); - final String publisher = JsonPath.read(dJson, "$.attributes.publisher"); + final String publisher = JsonPath.read(dJson, rootPath + ".publisher"); if (StringUtils.isNotBlank(publisher)) resource.setPublisher(Collections.singletonList(new ScholixEntityId(publisher, null))); final String date = getDate(dJson); @@ -119,7 +116,7 @@ public class Datacite2Scholix { } private List getCreators(final String json) { - final List creatorName = JsonPath.read(json, "$.attributes.creators[*].name"); + final List creatorName = JsonPath.read(json, rootPath + ".creators[*].name"); if (creatorName!= null && creatorName.size() >0) { return creatorName.stream().map(s-> new ScholixEntityId(s, null)).collect(Collectors.toList()); } @@ -127,12 +124,12 @@ public class Datacite2Scholix { } private String getTitle(final String json){ - final List titles = JsonPath.read(json, "$.attributes.titles[*].title"); + final List titles = JsonPath.read(json, rootPath + ".titles[*].title"); return titles!= null && titles.size()>0?titles.get(0): null; } private String getDate(final String json) { - final List> dates = JsonPath.read(json,"$.attributes.dates"); + final List> dates = JsonPath.read(json, rootPath + ".dates"); if(dates!= null && dates.size()>0){ List> issued = dates.stream().filter(s -> "issued".equalsIgnoreCase(s.get("dateType"))).collect(Collectors.toList()); @@ -152,7 +149,7 @@ public class Datacite2Scholix { private String getType(final String json) { try { - final String bibtext = JsonPath.read(json, "$.attributes.types.bibtex"); + final String bibtext = JsonPath.read(json, rootPath + ".types.bibtex"); if ("article".equalsIgnoreCase(bibtext)) { return "publication"; } @@ -162,14 +159,10 @@ public class Datacite2Scholix { } } - - - private List> getRelatedIendtifiers(final String json) { - String REL_IDENTIFIER_PATH = "$.attributes.relatedIdentifiers[*]"; + String REL_IDENTIFIER_PATH = rootPath + ".relatedIdentifiers[*]"; List> res = JsonPath.read(json, REL_IDENTIFIER_PATH); return res; - } protected String generateId(final String pid, final String pidType, final String entityType) { @@ -186,18 +179,7 @@ public class Datacite2Scholix { break; default: throw new IllegalArgumentException("unexpected value "+entityType); - } - return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim())); } - - - - - - - - - } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java new file mode 100644 index 0000000000..e1d25bf2e2 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClient.java @@ -0,0 +1,75 @@ +package eu.dnetlib.dhp.provision.update; + +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.IOException; + +public class DataciteClient { + + + private String host; + private String index ="datacite"; + private String indexType = "dump"; + private Datacite2Scholix d2s; + + public DataciteClient(String host) { + this.host = host; + + d2s = new Datacite2Scholix(null); + d2s.setRootPath("$._source.attributes"); + } + + public Iterable getDatasetsFromTs(final Long timestamp) { + return ()-> { + try { + return new DataciteClientIterator(host, index, timestamp); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + public String getIndexType() { + return indexType; + } + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + public ScholixResource getDatasetByDOI(final String doi) { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F"))); + CloseableHttpResponse response = client.execute(httpGet); + final String json =IOUtils.toString(response.getEntity().getContent()); + return d2s.generateDataciteScholixResource(json); + } catch (Throwable e) { + return null; + } + } + + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DataciteClient.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClientIterator.java similarity index 93% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DataciteClient.java rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClientIterator.java index 37a0bb23b0..e823945aec 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/DataciteClient.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/DataciteClientIterator.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.provision; +package eu.dnetlib.dhp.provision.update; import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; @@ -14,7 +14,7 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; -public class DataciteClient implements Iterator { +public class DataciteClientIterator implements Iterator { final static String blobPath = "$.hits.hits[*]._source"; final static String scrollIdPath = "$._scroll_id"; @@ -27,7 +27,7 @@ public class DataciteClient implements Iterator { final String esIndex; final ObjectMapper mapper = new ObjectMapper(); - public DataciteClient(final String esHost, final String esIndex, final long timestamp) throws IOException { + public DataciteClientIterator(final String esHost, final String esIndex, final long timestamp) throws IOException { this.esHost = esHost; this.esIndex = esIndex; diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/RetrieveUpdateFromDatacite.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/RetrieveUpdateFromDatacite.java similarity index 52% rename from dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/RetrieveUpdateFromDatacite.java rename to dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/RetrieveUpdateFromDatacite.java index ad00347336..ea659dbb1b 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/RetrieveUpdateFromDatacite.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/RetrieveUpdateFromDatacite.java @@ -1,8 +1,10 @@ -package eu.dnetlib.dhp.provision; +package eu.dnetlib.dhp.provision.update; +import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -11,17 +13,19 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import java.net.URI; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RetrieveUpdateFromDatacite { public static void main(String[] args) throws Exception{ - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/retrieve_update_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json"))); parser.parseArgument(args); final String hdfsuri = parser.get("namenode"); Path hdfswritepath = new Path(parser.get("targetPath")); - final String timestamp = parser.get("timestamp"); - + final long timestamp = Long.parseLong(parser.get("timestamp")); + final String host = parser.get("indexHost"); + final String index = parser.get("indexName"); // ====== Init HDFS File System Object Configuration conf = new Configuration(); @@ -32,13 +36,28 @@ public class RetrieveUpdateFromDatacite { conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); FileSystem.get(URI.create(hdfsuri), conf); - - final AtomicInteger counter = new AtomicInteger(0); + final Datacite2Scholix d2s = new Datacite2Scholix(RelationMapper.load()); + final ObjectMapper mapper = new ObjectMapper(); try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { + final Text value = new Text(); + final IntWritable key = new IntWritable(); + int i = 0; + for(String dataset: new DataciteClient(host).getDatasetsFromTs(timestamp)) { + i++; + List scholix = d2s.generateScholixFromJson(dataset); + if (scholix!= null) + for(Scholix s: scholix) { + key.set(i); + value.set(mapper.writeValueAsString(s)); + writer.append(key, value); + if (i % 10000 == 0) { + System.out.println("wrote "+i); + } + } + } } } - } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java new file mode 100644 index 0000000000..4628c46845 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/SparkResolveScholixTarget.java @@ -0,0 +1,81 @@ +package eu.dnetlib.dhp.provision.update; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import scala.Tuple2; + +import java.util.Collections; + +public class SparkResolveScholixTarget { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResolveScholixTarget.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String master = parser.get("master"); + final String sourcePath = parser.get("sourcePath"); + final String workingDirPath= parser.get("workingDirPath"); + final String indexHost= parser.get("indexHost"); + + + try (SparkSession spark = getSession(conf, master)){ + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + spark.createDataset(sc.sequenceFile(sourcePath, IntWritable.class,Text.class) + .map(Tuple2::_2) + .map(s-> new ObjectMapper().readValue(s.toString(), Scholix.class)).rdd(), Encoders.bean(Scholix.class)) + .write().save(workingDirPath+"/stepA"); + + + + Dataset s1 = spark.read().load(workingDirPath+"/stepA").as(Encoders.bean(Scholix.class)); + + s1.where(s1.col("target.dnetIdentifier").isNull()).select(s1.col("target.identifier")).distinct() + .map((MapFunction) f-> { + final String pid = ((Row) f.getList(0).get(0)).getString(0); + ScholixResource publication = new CrossrefClient(indexHost).getResourceByDOI(pid); + if (publication != null) { + return publication; + } + ScholixResource dataset = new DataciteClient(indexHost).getDatasetByDOI(pid); + if (dataset!= null) { + return dataset; + } + ScholixResource r = new ScholixResource(); + r.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, "doi"))); + r.setObjectType("unknown"); + r.setDnetIdentifier("70|"+DHPUtils.md5(String.format("%s::doi", pid.toLowerCase().trim()))); + + return r; + }, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB"); + + + + } + } + + private static SparkSession getSession(SparkConf conf, String master) { + return SparkSession + .builder() + .config(conf) + .appName(SparkResolveScholixTarget.class.getSimpleName()) + .master(master) + .getOrCreate(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json new file mode 100644 index 0000000000..e4b6b9dfda --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the source path", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingDirPath", + "paramDescription": "the working Dir Path", + "paramRequired": true + }, + { + "paramName": "h", + "paramLongName": "indexHost", + "paramDescription": "the working Dir Path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json new file mode 100644 index 0000000000..5c11aca8d1 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json @@ -0,0 +1,33 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the name node", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the working path where generated files", + "paramRequired": true + }, + { + "paramName": "ts", + "paramLongName": "timestamp", + "paramDescription": "the timestamp for incremental harvesting", + "paramRequired": true + }, + { + "paramName": "ih", + "paramLongName": "indexHost", + "paramDescription": "the ip name of the index", + "paramRequired": true + }, + { + "paramName": "in", + "paramLongName": "indexName", + "paramDescription": "the name of the index", + "paramRequired": true + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/retrieve_update_parameters.json b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/provision/retrieve_update_parameters.json deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index abdc690977..68543e3f23 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -16,21 +16,21 @@ sparkExecutorMemory memory for individual executor - - index - index name - - - idScholix - the identifier name of the scholix - - - idSummary - the identifier name of the summary - + + + + + + + + + + + + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -98,45 +98,45 @@ - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - index Summary - eu.dnetlib.dhp.provision.SparkIndexCollectionOnES - dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" - -mt yarn-cluster - --sourcePath${workingDirPath}/summary - --index${index}_object - --idPathid - --typesummary - - - - + + + + + + + + + + + + + + + + + + + - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - index scholix - eu.dnetlib.dhp.provision.SparkIndexCollectionOnES - dhp-graph-provision-scholexplorer-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" - -mt yarn-cluster - --sourcePath${workingDirPath}/scholix_json - --index${index}_scholix - --idPathidentifier - --typescholix - - - - + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java index 780d4a2d6b..e008d72be5 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java @@ -2,6 +2,8 @@ package eu.dnetlib.dhp.provision; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.Scholix; +import eu.dnetlib.dhp.provision.scholix.ScholixResource; +import eu.dnetlib.dhp.provision.update.*; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; @@ -9,10 +11,9 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.util.List; @@ -27,58 +28,45 @@ public class DataciteClientTest { Datacite2Scholix ds = new Datacite2Scholix(mapper); final List s = ds.generateScholixFromJson(json); - - System.out.println(new ObjectMapper().writeValueAsString(s)); - } - @Test + public void testClient() throws Exception { - DataciteClient client = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it","datacite",1585454082); - int i = 0; - final RelationMapper mapper = RelationMapper.load(); + RetrieveUpdateFromDatacite.main(new String[]{ + "-n", "file:///data/new_s2.txt", + "-t", "/data/new_s2.txt", + "-ts", "1585760736", + "-ih", "ip-90-147-167-25.ct1.garrservices.it", + "-in", "datacite", + }); - Datacite2Scholix ds = new Datacite2Scholix(mapper); - BufferedWriter writer = new BufferedWriter(new FileWriter("/Users/sandro/new_s.txt")); - final ObjectMapper m = new ObjectMapper(); + SparkResolveScholixTarget.main(new String[]{ + "-s", "file:///data/new_s.txt", + "-m", "local[*]", + "-w", "/data/scholix/provision", + "-h", "ip-90-147-167-25.ct1.garrservices.it", + + }); + } + + + public void testResolveDataset() throws Exception { + DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it"); + ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5"); + Assertions.assertNotNull(datasetByDOI); + System.out.println(new ObjectMapper().writeValueAsString(datasetByDOI)); + + + CrossrefClient cr = new CrossrefClient("ip-90-147-167-25.ct1.garrservices.it"); + ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46"); + Assertions.assertNotNull(crossrefByDOI); + System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI)); - - - while (client.hasNext()){ - i ++; - - - final String next = client.next(); - try { - final List res = ds.generateScholixFromJson(next); - if (res!= null) - res - .forEach( - s -> { - try { - - writer.write(m.writeValueAsString(s)); - writer.write("\n"); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - - ); - }catch (Throwable t) { - System.out.println(next); - throw new RuntimeException(t); - } - if(i %1000 == 0) { - System.out.println("added "+i); - } - } } private String getResponse(final String url,final String json ) {