forked from D-Net/dnet-hadoop
fixed wrong test
This commit is contained in:
parent
d74e128aa6
commit
ba9f07a6fe
|
@ -0,0 +1,108 @@
|
|||
package eu.dnetlib.dhp.provision.update;
|
||||
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixCollectedFrom;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixEntityId;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class CrossRefParserJSON {
|
||||
|
||||
private static List<ScholixCollectedFrom> collectedFrom =generateCrossrefCollectedFrom("complete");
|
||||
|
||||
public static ScholixResource parseRecord(final String record) {
|
||||
if (record == null) return null;
|
||||
JsonElement jElement = new JsonParser().parse(record);
|
||||
JsonElement source = null;
|
||||
if (jElement.getAsJsonObject().has("_source")) {
|
||||
source = jElement.getAsJsonObject().get("_source");
|
||||
if (source == null || !source.isJsonObject())
|
||||
return null;
|
||||
}
|
||||
else if(jElement.getAsJsonObject().has("DOI")){
|
||||
source = jElement;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
||||
final JsonObject message = source.getAsJsonObject();
|
||||
ScholixResource currentObject = new ScholixResource();
|
||||
|
||||
if (message.get("DOI") != null) {
|
||||
final String doi = message.get("DOI").getAsString();
|
||||
currentObject.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi")));
|
||||
}
|
||||
|
||||
if ((!message.get("created").isJsonNull()) && (message.getAsJsonObject("created").get("date-time") != null)) {
|
||||
currentObject.setPublicationDate(message.getAsJsonObject("created").get("date-time").getAsString());
|
||||
}
|
||||
|
||||
if (message.get("title")!= null && !message.get("title").isJsonNull() && message.get("title").isJsonArray() ) {
|
||||
|
||||
JsonArray array = message.get("title").getAsJsonArray();
|
||||
currentObject.setTitle(array.get(0).getAsString());
|
||||
}
|
||||
if (message.get("author") != null && !message.get("author").isJsonNull()) {
|
||||
JsonArray author = message.getAsJsonArray("author");
|
||||
List<ScholixEntityId> authorList = new ArrayList<>();
|
||||
for (JsonElement anAuthor : author) {
|
||||
JsonObject currentAuth = anAuthor.getAsJsonObject();
|
||||
|
||||
String family = "";
|
||||
String given = "";
|
||||
if (currentAuth != null && currentAuth.get("family") != null && !currentAuth.get("family").isJsonNull()) {
|
||||
family = currentAuth.get("family").getAsString();
|
||||
}
|
||||
if (currentAuth != null && currentAuth.get("given") != null && !currentAuth.get("given").isJsonNull()) {
|
||||
given = currentAuth.get("given").getAsString();
|
||||
}
|
||||
authorList.add(new ScholixEntityId(String.format("%s %s", family, given), null));
|
||||
}
|
||||
currentObject.setCreator(authorList);
|
||||
}
|
||||
if (message.get("publisher") != null && !message.get("publisher").isJsonNull()) {
|
||||
currentObject.setPublisher(Collections.singletonList(new ScholixEntityId(message.get("publisher").getAsString(), null)));
|
||||
}
|
||||
currentObject.setCollectedFrom(collectedFrom);
|
||||
currentObject.setObjectType("publication");
|
||||
currentObject.setDnetIdentifier(generateId(message.get("DOI").getAsString(), "doi", "publication"));
|
||||
|
||||
return currentObject;
|
||||
}
|
||||
|
||||
private static List<ScholixCollectedFrom> generateCrossrefCollectedFrom(final String completionStatus) {
|
||||
final ScholixEntityId scholixEntityId = new ScholixEntityId("Crossref",
|
||||
Collections.singletonList(new ScholixIdentifier("dli_________::crossref", "dnet_identifier")));
|
||||
return Collections.singletonList(
|
||||
new ScholixCollectedFrom(
|
||||
scholixEntityId,"resolved", completionStatus));
|
||||
}
|
||||
|
||||
private static String generateId(final String pid, final String pidType, final String entityType) {
|
||||
String type;
|
||||
switch (entityType){
|
||||
case "publication":
|
||||
type = "50|";
|
||||
break;
|
||||
case "dataset":
|
||||
type = "60|";
|
||||
break;
|
||||
case "unknown":
|
||||
type = "70|";
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("unexpected value "+entityType);
|
||||
}
|
||||
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package eu.dnetlib.dhp.provision.update;
|
||||
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonParser;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.zip.Inflater;
|
||||
|
||||
public class CrossrefClient {
|
||||
|
||||
private String host;
|
||||
private String index ="crossref";
|
||||
private String indexType = "item";
|
||||
|
||||
|
||||
public CrossrefClient(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public void setIndex(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
public String getIndexType() {
|
||||
return indexType;
|
||||
}
|
||||
|
||||
public void setIndexType(String indexType) {
|
||||
this.indexType = indexType;
|
||||
}
|
||||
|
||||
private static String decompressBlob(final String blob) {
|
||||
try {
|
||||
byte[] byteArray = Base64.decodeBase64(blob.getBytes());
|
||||
final Inflater decompresser = new Inflater();
|
||||
decompresser.setInput(byteArray);
|
||||
final ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
|
||||
byte[] buffer = new byte[8192];
|
||||
while (!decompresser.finished()) {
|
||||
int size = decompresser.inflate(buffer);
|
||||
bos.write(buffer, 0, size);
|
||||
}
|
||||
byte[] unzippeddata = bos.toByteArray();
|
||||
decompresser.end();
|
||||
return new String(unzippeddata);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Wrong record:" + blob,e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public ScholixResource getResourceByDOI(final String doi) {
|
||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F")));
|
||||
CloseableHttpResponse response = client.execute(httpGet);
|
||||
String json = IOUtils.toString(response.getEntity().getContent());
|
||||
if (json.contains("blob")) {
|
||||
JsonParser p = new JsonParser();
|
||||
final JsonElement root = p.parse(json);
|
||||
json =decompressBlob(root.getAsJsonObject().get("_source").getAsJsonObject().get("blob").getAsString());
|
||||
}
|
||||
return CrossRefParserJSON.parseRecord(json);
|
||||
} catch (Throwable e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
package eu.dnetlib.dhp.provision.update;
|
||||
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import eu.dnetlib.dhp.provision.scholix.*;
|
||||
|
@ -15,16 +15,14 @@ import java.util.stream.Collectors;
|
|||
|
||||
public class Datacite2Scholix {
|
||||
|
||||
|
||||
private String rootPath = "$.attributes";
|
||||
final RelationMapper relationMapper;
|
||||
|
||||
public Datacite2Scholix(RelationMapper relationMapper) {
|
||||
this.relationMapper = relationMapper;
|
||||
}
|
||||
|
||||
|
||||
public List<Scholix> generateScholixFromJson(final String dJson) {
|
||||
|
||||
List<Map<String, String>> relIds = getRelatedIendtifiers(dJson);
|
||||
relIds = relIds!= null ? relIds.stream().filter(m->
|
||||
m.containsKey("relatedIdentifierType") && m.containsKey("relationType" ) && m.containsKey( "relatedIdentifier")
|
||||
|
@ -32,22 +30,24 @@ public class Datacite2Scholix {
|
|||
if(relIds== null || relIds.size() ==0 )
|
||||
return null;
|
||||
|
||||
|
||||
|
||||
final String updated = JsonPath.read(dJson,"$.attributes.updated" );
|
||||
final String updated = JsonPath.read(dJson, rootPath + ".updated");
|
||||
ScholixResource resource = generateDataciteScholixResource(dJson);
|
||||
|
||||
return relIds.stream().flatMap(s-> {
|
||||
final List<Scholix> result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated);
|
||||
return result.stream();
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
public String getRootPath() {
|
||||
return rootPath;
|
||||
}
|
||||
|
||||
public void setRootPath(String rootPath) {
|
||||
this.rootPath = rootPath;
|
||||
}
|
||||
|
||||
private List<Scholix> generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) {
|
||||
|
||||
|
||||
if ("doi".equalsIgnoreCase(pidtype)) {
|
||||
ScholixResource target = new ScholixResource();
|
||||
target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype)));
|
||||
|
@ -92,20 +92,17 @@ public class Datacite2Scholix {
|
|||
result.add(s2);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public ScholixResource generateDataciteScholixResource(String dJson) {
|
||||
ScholixResource resource = new ScholixResource();
|
||||
String DOI_PATH = "$.attributes.doi";
|
||||
String DOI_PATH = rootPath + ".doi";
|
||||
final String doi = JsonPath.read(dJson, DOI_PATH);
|
||||
resource.setIdentifier(Collections.singletonList(new ScholixIdentifier(doi, "doi")));
|
||||
resource.setObjectType(getType(dJson));
|
||||
resource.setDnetIdentifier(generateId(doi, "doi", resource.getObjectType()));
|
||||
resource.setCollectedFrom(generateDataciteCollectedFrom("complete"));
|
||||
final String publisher = JsonPath.read(dJson, "$.attributes.publisher");
|
||||
final String publisher = JsonPath.read(dJson, rootPath + ".publisher");
|
||||
if (StringUtils.isNotBlank(publisher))
|
||||
resource.setPublisher(Collections.singletonList(new ScholixEntityId(publisher, null)));
|
||||
final String date = getDate(dJson);
|
||||
|
@ -119,7 +116,7 @@ public class Datacite2Scholix {
|
|||
}
|
||||
|
||||
private List<ScholixEntityId> getCreators(final String json) {
|
||||
final List<String> creatorName = JsonPath.read(json, "$.attributes.creators[*].name");
|
||||
final List<String> creatorName = JsonPath.read(json, rootPath + ".creators[*].name");
|
||||
if (creatorName!= null && creatorName.size() >0) {
|
||||
return creatorName.stream().map(s-> new ScholixEntityId(s, null)).collect(Collectors.toList());
|
||||
}
|
||||
|
@ -127,12 +124,12 @@ public class Datacite2Scholix {
|
|||
}
|
||||
|
||||
private String getTitle(final String json){
|
||||
final List<String> titles = JsonPath.read(json, "$.attributes.titles[*].title");
|
||||
final List<String> titles = JsonPath.read(json, rootPath + ".titles[*].title");
|
||||
return titles!= null && titles.size()>0?titles.get(0): null;
|
||||
}
|
||||
|
||||
private String getDate(final String json) {
|
||||
final List<Map<String,String>> dates = JsonPath.read(json,"$.attributes.dates");
|
||||
final List<Map<String,String>> dates = JsonPath.read(json, rootPath + ".dates");
|
||||
if(dates!= null && dates.size()>0){
|
||||
|
||||
List<Map<String, String>> issued = dates.stream().filter(s -> "issued".equalsIgnoreCase(s.get("dateType"))).collect(Collectors.toList());
|
||||
|
@ -152,7 +149,7 @@ public class Datacite2Scholix {
|
|||
|
||||
private String getType(final String json) {
|
||||
try {
|
||||
final String bibtext = JsonPath.read(json, "$.attributes.types.bibtex");
|
||||
final String bibtext = JsonPath.read(json, rootPath + ".types.bibtex");
|
||||
if ("article".equalsIgnoreCase(bibtext)) {
|
||||
return "publication";
|
||||
}
|
||||
|
@ -162,14 +159,10 @@ public class Datacite2Scholix {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private List<Map<String, String>> getRelatedIendtifiers(final String json) {
|
||||
String REL_IDENTIFIER_PATH = "$.attributes.relatedIdentifiers[*]";
|
||||
String REL_IDENTIFIER_PATH = rootPath + ".relatedIdentifiers[*]";
|
||||
List<Map<String, String>> res = JsonPath.read(json, REL_IDENTIFIER_PATH);
|
||||
return res;
|
||||
|
||||
}
|
||||
|
||||
protected String generateId(final String pid, final String pidType, final String entityType) {
|
||||
|
@ -186,18 +179,7 @@ public class Datacite2Scholix {
|
|||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("unexpected value "+entityType);
|
||||
|
||||
}
|
||||
|
||||
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
package eu.dnetlib.dhp.provision.update;
|
||||
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DataciteClient {
|
||||
|
||||
|
||||
private String host;
|
||||
private String index ="datacite";
|
||||
private String indexType = "dump";
|
||||
private Datacite2Scholix d2s;
|
||||
|
||||
public DataciteClient(String host) {
|
||||
this.host = host;
|
||||
|
||||
d2s = new Datacite2Scholix(null);
|
||||
d2s.setRootPath("$._source.attributes");
|
||||
}
|
||||
|
||||
public Iterable<String> getDatasetsFromTs(final Long timestamp) {
|
||||
return ()-> {
|
||||
try {
|
||||
return new DataciteClientIterator(host, index, timestamp);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public void setIndex(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
public String getIndexType() {
|
||||
return indexType;
|
||||
}
|
||||
|
||||
public void setIndexType(String indexType) {
|
||||
this.indexType = indexType;
|
||||
}
|
||||
|
||||
public ScholixResource getDatasetByDOI(final String doi) {
|
||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
HttpGet httpGet = new HttpGet(String.format("http://%s:9200/%s/%s/%s", host, index,indexType, doi.replaceAll("/","%2F")));
|
||||
CloseableHttpResponse response = client.execute(httpGet);
|
||||
final String json =IOUtils.toString(response.getEntity().getContent());
|
||||
return d2s.generateDataciteScholixResource(json);
|
||||
} catch (Throwable e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
package eu.dnetlib.dhp.provision.update;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import net.minidev.json.JSONArray;
|
||||
|
@ -14,7 +14,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DataciteClient implements Iterator<String> {
|
||||
public class DataciteClientIterator implements Iterator<String> {
|
||||
|
||||
final static String blobPath = "$.hits.hits[*]._source";
|
||||
final static String scrollIdPath = "$._scroll_id";
|
||||
|
@ -27,7 +27,7 @@ public class DataciteClient implements Iterator<String> {
|
|||
final String esIndex;
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
public DataciteClient(final String esHost, final String esIndex, final long timestamp) throws IOException {
|
||||
public DataciteClientIterator(final String esHost, final String esIndex, final long timestamp) throws IOException {
|
||||
|
||||
this.esHost = esHost;
|
||||
this.esIndex = esIndex;
|
|
@ -1,8 +1,10 @@
|
|||
package eu.dnetlib.dhp.provision;
|
||||
package eu.dnetlib.dhp.provision.update;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.provision.scholix.Scholix;
|
||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -11,17 +13,19 @@ import org.apache.hadoop.io.SequenceFile;
|
|||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class RetrieveUpdateFromDatacite {
|
||||
|
||||
public static void main(String[] args) throws Exception{
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/retrieve_update_parameters.json")));
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(RetrieveUpdateFromDatacite.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_retrieve_update_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final String hdfsuri = parser.get("namenode");
|
||||
Path hdfswritepath = new Path(parser.get("targetPath"));
|
||||
final String timestamp = parser.get("timestamp");
|
||||
|
||||
final long timestamp = Long.parseLong(parser.get("timestamp"));
|
||||
final String host = parser.get("indexHost");
|
||||
final String index = parser.get("indexName");
|
||||
|
||||
// ====== Init HDFS File System Object
|
||||
Configuration conf = new Configuration();
|
||||
|
@ -32,13 +36,28 @@ public class RetrieveUpdateFromDatacite {
|
|||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
|
||||
FileSystem.get(URI.create(hdfsuri), conf);
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger(0);
|
||||
final Datacite2Scholix d2s = new Datacite2Scholix(RelationMapper.load());
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
|
||||
SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
|
||||
SequenceFile.Writer.valueClass(Text.class))) {
|
||||
|
||||
final Text value = new Text();
|
||||
final IntWritable key = new IntWritable();
|
||||
int i = 0;
|
||||
for(String dataset: new DataciteClient(host).getDatasetsFromTs(timestamp)) {
|
||||
i++;
|
||||
List<Scholix> scholix = d2s.generateScholixFromJson(dataset);
|
||||
if (scholix!= null)
|
||||
for(Scholix s: scholix) {
|
||||
key.set(i);
|
||||
value.set(mapper.writeValueAsString(s));
|
||||
writer.append(key, value);
|
||||
if (i % 10000 == 0) {
|
||||
System.out.println("wrote "+i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package eu.dnetlib.dhp.provision.update;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.provision.scholix.Scholix;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
public class SparkResolveScholixTarget {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkResolveScholixTarget.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_resolve_scholix_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
final String master = parser.get("master");
|
||||
final String sourcePath = parser.get("sourcePath");
|
||||
final String workingDirPath= parser.get("workingDirPath");
|
||||
final String indexHost= parser.get("indexHost");
|
||||
|
||||
|
||||
try (SparkSession spark = getSession(conf, master)){
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
|
||||
spark.createDataset(sc.sequenceFile(sourcePath, IntWritable.class,Text.class)
|
||||
.map(Tuple2::_2)
|
||||
.map(s-> new ObjectMapper().readValue(s.toString(), Scholix.class)).rdd(), Encoders.bean(Scholix.class))
|
||||
.write().save(workingDirPath+"/stepA");
|
||||
|
||||
|
||||
|
||||
Dataset<Scholix> s1 = spark.read().load(workingDirPath+"/stepA").as(Encoders.bean(Scholix.class));
|
||||
|
||||
s1.where(s1.col("target.dnetIdentifier").isNull()).select(s1.col("target.identifier")).distinct()
|
||||
.map((MapFunction<Row, ScholixResource>) f-> {
|
||||
final String pid = ((Row) f.getList(0).get(0)).getString(0);
|
||||
ScholixResource publication = new CrossrefClient(indexHost).getResourceByDOI(pid);
|
||||
if (publication != null) {
|
||||
return publication;
|
||||
}
|
||||
ScholixResource dataset = new DataciteClient(indexHost).getDatasetByDOI(pid);
|
||||
if (dataset!= null) {
|
||||
return dataset;
|
||||
}
|
||||
ScholixResource r = new ScholixResource();
|
||||
r.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, "doi")));
|
||||
r.setObjectType("unknown");
|
||||
r.setDnetIdentifier("70|"+DHPUtils.md5(String.format("%s::doi", pid.toLowerCase().trim())));
|
||||
|
||||
return r;
|
||||
}, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB");
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private static SparkSession getSession(SparkConf conf, String master) {
|
||||
return SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(SparkResolveScholixTarget.class.getSimpleName())
|
||||
.master(master)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "m",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "the name node",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the source path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingDirPath",
|
||||
"paramDescription": "the working Dir Path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "h",
|
||||
"paramLongName": "indexHost",
|
||||
"paramDescription": "the working Dir Path",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,33 @@
|
|||
[
|
||||
{
|
||||
"paramName": "n",
|
||||
"paramLongName": "namenode",
|
||||
"paramDescription": "the name node",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the working path where generated files",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ts",
|
||||
"paramLongName": "timestamp",
|
||||
"paramDescription": "the timestamp for incremental harvesting",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ih",
|
||||
"paramLongName": "indexHost",
|
||||
"paramDescription": "the ip name of the index",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "in",
|
||||
"paramLongName": "indexName",
|
||||
"paramDescription": "the name of the index",
|
||||
"paramRequired": true
|
||||
}
|
||||
|
||||
]
|
|
@ -16,21 +16,21 @@
|
|||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>index</name>
|
||||
<description>index name</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>idScholix</name>
|
||||
<description>the identifier name of the scholix </description>
|
||||
</property>
|
||||
<property>
|
||||
<name>idSummary</name>
|
||||
<description>the identifier name of the summary</description>
|
||||
</property>
|
||||
<!-- <property>-->
|
||||
<!-- <name>index</name>-->
|
||||
<!-- <description>index name</description>-->
|
||||
<!-- </property>-->
|
||||
<!-- <property>-->
|
||||
<!-- <name>idScholix</name>-->
|
||||
<!-- <description>the identifier name of the scholix </description>-->
|
||||
<!-- </property>-->
|
||||
<!-- <property>-->
|
||||
<!-- <name>idSummary</name>-->
|
||||
<!-- <description>the identifier name of the summary</description>-->
|
||||
<!-- </property>-->
|
||||
</parameters>
|
||||
|
||||
<start to="indexSummary"/>
|
||||
<start to="DeleteTargetPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
|
@ -98,45 +98,45 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="indexSummary">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>index Summary</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
|
||||
<arg>--index</arg><arg>${index}_object</arg>
|
||||
<arg>--idPath</arg><arg>id</arg>
|
||||
<arg>--type</arg><arg>summary</arg>
|
||||
</spark>
|
||||
<ok to="indexScholix"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<!-- <action name="indexSummary">-->
|
||||
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
|
||||
<!-- <job-tracker>${jobTracker}</job-tracker>-->
|
||||
<!-- <name-node>${nameNode}</name-node>-->
|
||||
<!-- <master>yarn-cluster</master>-->
|
||||
<!-- <mode>cluster</mode>-->
|
||||
<!-- <name>index Summary</name>-->
|
||||
<!-- <class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>-->
|
||||
<!-- <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>-->
|
||||
<!-- <spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>-->
|
||||
<!-- <arg>-mt</arg> <arg>yarn-cluster</arg>-->
|
||||
<!-- <arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>-->
|
||||
<!-- <arg>--index</arg><arg>${index}_object</arg>-->
|
||||
<!-- <arg>--idPath</arg><arg>id</arg>-->
|
||||
<!-- <arg>--type</arg><arg>summary</arg>-->
|
||||
<!-- </spark>-->
|
||||
<!-- <ok to="indexScholix"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
<action name="indexScholix">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>index scholix</name>
|
||||
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
|
||||
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
|
||||
<arg>--index</arg><arg>${index}_scholix</arg>
|
||||
<arg>--idPath</arg><arg>identifier</arg>
|
||||
<arg>--type</arg><arg>scholix</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<!-- <action name="indexScholix">-->
|
||||
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
|
||||
<!-- <job-tracker>${jobTracker}</job-tracker>-->
|
||||
<!-- <name-node>${nameNode}</name-node>-->
|
||||
<!-- <master>yarn-cluster</master>-->
|
||||
<!-- <mode>cluster</mode>-->
|
||||
<!-- <name>index scholix</name>-->
|
||||
<!-- <class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>-->
|
||||
<!-- <jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>-->
|
||||
<!-- <spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>-->
|
||||
<!-- <arg>-mt</arg> <arg>yarn-cluster</arg>-->
|
||||
<!-- <arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>-->
|
||||
<!-- <arg>--index</arg><arg>${index}_scholix</arg>-->
|
||||
<!-- <arg>--idPath</arg><arg>identifier</arg>-->
|
||||
<!-- <arg>--type</arg><arg>scholix</arg>-->
|
||||
<!-- </spark>-->
|
||||
<!-- <ok to="End"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -2,6 +2,8 @@ package eu.dnetlib.dhp.provision;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.provision.scholix.Scholix;
|
||||
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
|
||||
import eu.dnetlib.dhp.provision.update.*;
|
||||
import eu.dnetlib.scholexplorer.relation.RelationMapper;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
|
@ -9,10 +11,9 @@ import org.apache.http.client.methods.HttpPost;
|
|||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -27,58 +28,45 @@ public class DataciteClientTest {
|
|||
|
||||
Datacite2Scholix ds = new Datacite2Scholix(mapper);
|
||||
final List<Scholix> s = ds.generateScholixFromJson(json);
|
||||
|
||||
|
||||
System.out.println(new ObjectMapper().writeValueAsString(s));
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
|
||||
public void testClient() throws Exception {
|
||||
DataciteClient client = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it","datacite",1585454082);
|
||||
int i = 0;
|
||||
final RelationMapper mapper = RelationMapper.load();
|
||||
|
||||
Datacite2Scholix ds = new Datacite2Scholix(mapper);
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter("/Users/sandro/new_s.txt"));
|
||||
|
||||
final ObjectMapper m = new ObjectMapper();
|
||||
RetrieveUpdateFromDatacite.main(new String[]{
|
||||
"-n", "file:///data/new_s2.txt",
|
||||
"-t", "/data/new_s2.txt",
|
||||
"-ts", "1585760736",
|
||||
"-ih", "ip-90-147-167-25.ct1.garrservices.it",
|
||||
"-in", "datacite",
|
||||
});
|
||||
|
||||
|
||||
SparkResolveScholixTarget.main(new String[]{
|
||||
"-s", "file:///data/new_s.txt",
|
||||
"-m", "local[*]",
|
||||
"-w", "/data/scholix/provision",
|
||||
"-h", "ip-90-147-167-25.ct1.garrservices.it",
|
||||
|
||||
|
||||
|
||||
while (client.hasNext()){
|
||||
i ++;
|
||||
|
||||
|
||||
final String next = client.next();
|
||||
try {
|
||||
final List<Scholix> res = ds.generateScholixFromJson(next);
|
||||
if (res!= null)
|
||||
res
|
||||
.forEach(
|
||||
s -> {
|
||||
try {
|
||||
|
||||
writer.write(m.writeValueAsString(s));
|
||||
writer.write("\n");
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
);
|
||||
}catch (Throwable t) {
|
||||
System.out.println(next);
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
if(i %1000 == 0) {
|
||||
System.out.println("added "+i);
|
||||
}
|
||||
}
|
||||
public void testResolveDataset() throws Exception {
|
||||
DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it");
|
||||
ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5");
|
||||
Assertions.assertNotNull(datasetByDOI);
|
||||
System.out.println(new ObjectMapper().writeValueAsString(datasetByDOI));
|
||||
|
||||
|
||||
CrossrefClient cr = new CrossrefClient("ip-90-147-167-25.ct1.garrservices.it");
|
||||
ScholixResource crossrefByDOI = cr.getResourceByDOI("10.26850/1678-4618eqj.v35.1.2010.p41-46");
|
||||
Assertions.assertNotNull(crossrefByDOI);
|
||||
System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI));
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
private String getResponse(final String url,final String json ) {
|
||||
|
|
Loading…
Reference in New Issue