Merge branch 'beta' into graph_cleaning_refactoring

This commit is contained in:
Claudio Atzori 2023-05-02 09:55:40 +02:00
commit 851f664bd9
16 changed files with 1980 additions and 257 deletions

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.common.api;
import java.io.*; import java.io.*;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHeaders; import org.apache.http.HttpHeaders;
@ -13,6 +15,7 @@ import com.google.gson.Gson;
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModel; import eu.dnetlib.dhp.common.api.zenodo.ZenodoModel;
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModelList; import eu.dnetlib.dhp.common.api.zenodo.ZenodoModelList;
import okhttp3.*; import okhttp3.*;
import org.jetbrains.annotations.NotNull;
public class ZenodoAPIClient implements Serializable { public class ZenodoAPIClient implements Serializable {
@ -60,33 +63,31 @@ public class ZenodoAPIClient implements Serializable {
*/ */
public int newDeposition() throws IOException { public int newDeposition() throws IOException {
String json = "{}"; String json = "{}";
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(600, TimeUnit.SECONDS).build();
RequestBody body = RequestBody.create(json, MEDIA_TYPE_JSON); URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
conn.setRequestMethod("POST");
conn.setDoOutput(true);
try (OutputStream os = conn.getOutputStream()) {
byte[] input = json.getBytes("utf-8");
os.write(input, 0, input.length);
}
Request request = new Request.Builder() String body = getBody(conn);
.url(urlString)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.post(body)
.build();
try (Response response = httpClient.newCall(request).execute()) { int responseCode = conn.getResponseCode();
conn.disconnect();
if (!response.isSuccessful()) if(!checkOKStatus(responseCode))
throw new IOException("Unexpected code " + response + response.body().string()); throw new IOException("Unexpected code " + responseCode + body);
// Get response body ZenodoModel newSubmission = new Gson().fromJson(body, ZenodoModel.class);
json = response.body().string();
ZenodoModel newSubmission = new Gson().fromJson(json, ZenodoModel.class);
this.bucket = newSubmission.getLinks().getBucket(); this.bucket = newSubmission.getLinks().getBucket();
this.deposition_id = newSubmission.getId(); this.deposition_id = newSubmission.getId();
return response.code(); return responseCode;
}
} }
/** /**
@ -94,28 +95,48 @@ public class ZenodoAPIClient implements Serializable {
* *
* @param is the inputStream for the file to upload * @param is the inputStream for the file to upload
* @param file_name the name of the file as it will appear on Zenodo * @param file_name the name of the file as it will appear on Zenodo
* @param len the size of the file
* @return the response code * @return the response code
*/ */
public int uploadIS(InputStream is, String file_name, long len) throws IOException { public int uploadIS(InputStream is, String file_name) throws IOException {
OkHttpClient httpClient = new OkHttpClient.Builder()
.writeTimeout(600, TimeUnit.SECONDS)
.readTimeout(600, TimeUnit.SECONDS)
.connectTimeout(600, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder() URL url = new URL(bucket + "/" + file_name);
.url(bucket + "/" + file_name) HttpURLConnection conn = (HttpURLConnection) url.openConnection();
.addHeader(HttpHeaders.CONTENT_TYPE, "application/zip") // add request headers conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, "application/zip");
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token) conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
.put(InputStreamRequestBody.create(MEDIA_TYPE_ZIP, is, len)) conn.setDoOutput(true);
.build(); conn.setRequestMethod("PUT");
try (Response response = httpClient.newCall(request).execute()) { byte[] buf = new byte[8192];
if (!response.isSuccessful()) int length;
throw new IOException("Unexpected code " + response + response.body().string()); try (OutputStream os = conn.getOutputStream()) {
return response.code(); while ((length = is.read(buf)) != -1) {
os.write(buf, 0, length);
} }
}
int responseCode = conn.getResponseCode();
if(! checkOKStatus(responseCode)){
throw new IOException("Unexpected code " + responseCode + getBody(conn));
}
return responseCode;
}
@NotNull
private String getBody(HttpURLConnection conn) throws IOException {
String body = "{}";
try (BufferedReader br = new BufferedReader(
new InputStreamReader(conn.getInputStream(), "utf-8"))) {
StringBuilder response = new StringBuilder();
String responseLine = null;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
body = response.toString();
}
return body;
} }
/** /**
@ -127,26 +148,36 @@ public class ZenodoAPIClient implements Serializable {
*/ */
public int sendMretadata(String metadata) throws IOException { public int sendMretadata(String metadata) throws IOException {
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(600, TimeUnit.SECONDS).build(); URL url = new URL(urlString + "/" + deposition_id);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
conn.setDoOutput(true);
conn.setRequestMethod("PUT");
RequestBody body = RequestBody.create(metadata, MEDIA_TYPE_JSON);
Request request = new Request.Builder() try (OutputStream os = conn.getOutputStream()) {
.url(urlString + "/" + deposition_id) byte[] input = metadata.getBytes("utf-8");
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers os.write(input, 0, input.length);
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.put(body)
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful())
throw new IOException("Unexpected code " + response + response.body().string());
return response.code();
} }
final int responseCode = conn.getResponseCode();
conn.disconnect();
if(!checkOKStatus(responseCode))
throw new IOException("Unexpected code " + responseCode + getBody(conn));
return responseCode;
}
private boolean checkOKStatus(int responseCode) {
if(HttpURLConnection.HTTP_OK != responseCode ||
HttpURLConnection.HTTP_CREATED != responseCode)
return true ;
return false;
} }
/** /**
@ -155,6 +186,7 @@ public class ZenodoAPIClient implements Serializable {
* @return response code * @return response code
* @throws IOException * @throws IOException
*/ */
@Deprecated
public int publish() throws IOException { public int publish() throws IOException {
String json = "{}"; String json = "{}";
@ -194,28 +226,35 @@ public class ZenodoAPIClient implements Serializable {
setDepositionId(concept_rec_id, 1); setDepositionId(concept_rec_id, 1);
String json = "{}"; String json = "{}";
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(600, TimeUnit.SECONDS).build(); URL url = new URL(urlString + "/" + deposition_id + "/actions/newversion");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
RequestBody body = RequestBody.create(json, MEDIA_TYPE_JSON); conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
conn.setDoOutput(true);
conn.setRequestMethod("POST");
Request request = new Request.Builder()
.url(urlString + "/" + deposition_id + "/actions/newversion")
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.post(body)
.build();
try (Response response = httpClient.newCall(request).execute()) { try (OutputStream os = conn.getOutputStream()) {
byte[] input = json.getBytes("utf-8");
os.write(input, 0, input.length);
if (!response.isSuccessful()) }
throw new IOException("Unexpected code " + response + response.body().string());
ZenodoModel zenodoModel = new Gson().fromJson(response.body().string(), ZenodoModel.class); String body = getBody(conn);
int responseCode = conn.getResponseCode();
conn.disconnect();
if(!checkOKStatus(responseCode))
throw new IOException("Unexpected code " + responseCode + body);
ZenodoModel zenodoModel = new Gson().fromJson(body, ZenodoModel.class);
String latest_draft = zenodoModel.getLinks().getLatest_draft(); String latest_draft = zenodoModel.getLinks().getLatest_draft();
deposition_id = latest_draft.substring(latest_draft.lastIndexOf("/") + 1); deposition_id = latest_draft.substring(latest_draft.lastIndexOf("/") + 1);
bucket = getBucket(latest_draft); bucket = getBucket(latest_draft);
return response.code();
} return responseCode;
} }
/** /**
@ -233,24 +272,33 @@ public class ZenodoAPIClient implements Serializable {
this.deposition_id = deposition_id; this.deposition_id = deposition_id;
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(600, TimeUnit.SECONDS).build(); String json = "{}";
Request request = new Request.Builder() URL url = new URL(urlString + "/" + deposition_id);
.url(urlString + "/" + deposition_id) HttpURLConnection conn = (HttpURLConnection) url.openConnection();
.addHeader("Authorization", "Bearer " + access_token)
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful())
throw new IOException("Unexpected code " + response + response.body().string());
ZenodoModel zenodoModel = new Gson().fromJson(response.body().string(), ZenodoModel.class);
bucket = zenodoModel.getLinks().getBucket();
return response.code();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
conn.setRequestMethod("POST");
conn.setDoOutput(true);
try (OutputStream os = conn.getOutputStream()) {
byte[] input = json.getBytes("utf-8");
os.write(input, 0, input.length);
} }
String body = getBody(conn);
int responseCode = conn.getResponseCode();
conn.disconnect();
if(!checkOKStatus(responseCode))
throw new IOException("Unexpected code " + responseCode + body);
ZenodoModel zenodoModel = new Gson().fromJson(body, ZenodoModel.class);
bucket = zenodoModel.getLinks().getBucket();
return responseCode;
} }
private void setDepositionId(String concept_rec_id, Integer page) throws IOException, MissingConceptDoiException { private void setDepositionId(String concept_rec_id, Integer page) throws IOException, MissingConceptDoiException {
@ -273,53 +321,56 @@ public class ZenodoAPIClient implements Serializable {
private String getPrevDepositions(String page) throws IOException { private String getPrevDepositions(String page) throws IOException {
OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(600, TimeUnit.SECONDS).build();
HttpUrl.Builder urlBuilder = HttpUrl.parse(urlString).newBuilder(); HttpUrl.Builder urlBuilder = HttpUrl.parse(urlString).newBuilder();
urlBuilder.addQueryParameter("page", page); urlBuilder.addQueryParameter("page", page);
String url = urlBuilder.build().toString();
Request request = new Request.Builder() URL url = new URL(urlBuilder.build().toString());
.url(url) HttpURLConnection conn = (HttpURLConnection) url.openConnection();
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token) conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
.get() conn.setDoOutput(true);
.build(); conn.setRequestMethod("GET");
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful())
throw new IOException("Unexpected code " + response + response.body().string());
return response.body().string(); String body = getBody(conn);
int responseCode = conn.getResponseCode();
conn.disconnect();
if(!checkOKStatus(responseCode))
throw new IOException("Unexpected code " + responseCode + body);
return body;
} }
} private String getBucket(String inputUurl) throws IOException {
private String getBucket(String url) throws IOException { URL url = new URL(inputUurl);
OkHttpClient httpClient = new OkHttpClient.Builder() HttpURLConnection conn = (HttpURLConnection) url.openConnection();
.connectTimeout(600, TimeUnit.SECONDS) conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
.build(); conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + access_token);
conn.setDoOutput(true);
conn.setRequestMethod("GET");
Request request = new Request.Builder() String body = getBody(conn);
.url(url)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.get()
.build();
try (Response response = httpClient.newCall(request).execute()) { int responseCode = conn.getResponseCode();
if (!response.isSuccessful()) conn.disconnect();
throw new IOException("Unexpected code " + response + response.body().string()); if(!checkOKStatus(responseCode))
throw new IOException("Unexpected code " + responseCode + body);
// Get response body ZenodoModel zenodoModel = new Gson().fromJson(body, ZenodoModel.class);
ZenodoModel zenodoModel = new Gson().fromJson(response.body().string(), ZenodoModel.class);
return zenodoModel.getLinks().getBucket(); return zenodoModel.getLinks().getBucket();
}
} }

View File

@ -33,7 +33,7 @@ class ZenodoAPIClientTest {
InputStream is = new FileInputStream(file); InputStream is = new FileInputStream(file);
Assertions.assertEquals(200, client.uploadIS(is, "COVID-19.json.gz", file.length())); Assertions.assertEquals(200, client.uploadIS(is, "COVID-19.json.gz"));
String metadata = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/common/api/metadata.json")); String metadata = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/common/api/metadata.json"));
@ -56,7 +56,7 @@ class ZenodoAPIClientTest {
InputStream is = new FileInputStream(file); InputStream is = new FileInputStream(file);
Assertions.assertEquals(200, client.uploadIS(is, "COVID-19.json.gz", file.length())); Assertions.assertEquals(200, client.uploadIS(is, "COVID-19.json.gz"));
String metadata = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/common/api/metadata.json")); String metadata = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/common/api/metadata.json"));
@ -80,7 +80,7 @@ class ZenodoAPIClientTest {
InputStream is = new FileInputStream(file); InputStream is = new FileInputStream(file);
Assertions.assertEquals(200, client.uploadIS(is, "newVersion_deposition", file.length())); Assertions.assertEquals(200, client.uploadIS(is, "newVersion_deposition"));
Assertions.assertEquals(202, client.publish()); Assertions.assertEquals(202, client.publish());
@ -100,7 +100,7 @@ class ZenodoAPIClientTest {
InputStream is = new FileInputStream(file); InputStream is = new FileInputStream(file);
Assertions.assertEquals(200, client.uploadIS(is, "newVersion_deposition", file.length())); Assertions.assertEquals(200, client.uploadIS(is, "newVersion_deposition"));
Assertions.assertEquals(202, client.publish()); Assertions.assertEquals(202, client.publish());

View File

@ -16,7 +16,7 @@ public class Community implements Serializable {
private List<String> subjects = new ArrayList<>(); private List<String> subjects = new ArrayList<>();
private List<Provider> providers = new ArrayList<>(); private List<Provider> providers = new ArrayList<>();
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>(); private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
private SelectionConstraints constraints = new SelectionConstraints(); private SelectionConstraints constraints;
public String toJson() { public String toJson() {
final Gson g = new Gson(); final Gson g = new Gson();
@ -26,7 +26,8 @@ public class Community implements Serializable {
public boolean isValid() { public boolean isValid() {
return !getSubjects().isEmpty() return !getSubjects().isEmpty()
|| !getProviders().isEmpty() || !getProviders().isEmpty()
|| !getZenodoCommunities().isEmpty(); || !getZenodoCommunities().isEmpty()
|| !getConstraints().getCriteria().isEmpty();
} }
public String getId() { public String getId() {

View File

@ -37,7 +37,7 @@ public class CommunityConfigurationFactory {
public static CommunityConfiguration newInstance(final String xml) throws DocumentException, SAXException { public static CommunityConfiguration newInstance(final String xml) throws DocumentException, SAXException {
log.debug(String.format("parsing community configuration from:\n%s", xml)); log.info(String.format("parsing community configuration from:\n%s", xml));
final SAXReader reader = new SAXReader(); final SAXReader reader = new SAXReader();
reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
@ -92,12 +92,13 @@ public class CommunityConfigurationFactory {
private static SelectionConstraints parseConstrains(Node node) { private static SelectionConstraints parseConstrains(Node node) {
Node advConstsNode = node.selectSingleNode("./advancedConstraints"); Node advConstsNode = node.selectSingleNode("./advancedConstraints");
if (advConstsNode == null || StringUtils.isBlank(StringUtils.trim(advConstsNode.getText()))) { if (advConstsNode == null || StringUtils.isBlank(StringUtils.trim(advConstsNode.getText()))) {
return null; return new SelectionConstraints();
} }
SelectionConstraints selectionConstraints = new Gson() SelectionConstraints selectionConstraints = new Gson()
.fromJson(advConstsNode.getText(), SelectionConstraints.class); .fromJson(advConstsNode.getText(), SelectionConstraints.class);
selectionConstraints.setSelection(resolver); selectionConstraints.setSelection(resolver);
log.info("number of selection constraints set " + selectionConstraints.getCriteria().size());
return selectionConstraints; return selectionConstraints;
} }

View File

@ -10,11 +10,14 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -22,6 +25,7 @@ import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
/** Created by miriam on 02/08/2018. */ /** Created by miriam on 02/08/2018. */
public class ResultTagger implements Serializable { public class ResultTagger implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ResultTagger.class);
private boolean clearContext(Result result) { private boolean clearContext(Result result) {
int tmp = result.getContext().size(); int tmp = result.getContext().size();
@ -149,6 +153,8 @@ public class ResultTagger implements Serializable {
}); });
communities.addAll(aconstraints); communities.addAll(aconstraints);
if (aconstraints.size() > 0)
log.info("Found {} for advancedConstraints ", aconstraints.size());
clearContext(result); clearContext(result);

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;
import java.util.Locale;
@VerbClass("starts_with_caseinsensitive")
public class StartsWithVerbIgnoreCase implements Selection, Serializable {
private String param;
public StartsWithVerbIgnoreCase() {
}
public StartsWithVerbIgnoreCase(final String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
return value.toLowerCase().startsWith(param.toLowerCase());
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}

View File

@ -16,6 +16,7 @@ import javax.print.attribute.DocAttributeSet;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -34,6 +35,7 @@ import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/** /**
* @author miriam.baglioni * @author miriam.baglioni
@ -44,6 +46,11 @@ public class SparkEoscBulkTag implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class); private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class);
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static String OPENAIRE_3 = "openaire3.0";
private static String OPENAIRE_4 = "openaire-pub_4.0";
private static String OPENAIRE_CRIS = "openaire-cris_1.1";
private static String OPENAIRE_DATA = "openaire2.0_data";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
@ -72,6 +79,9 @@ public class SparkEoscBulkTag implements Serializable {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -82,41 +92,71 @@ public class SparkEoscBulkTag implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, workingPath); removeOutputDir(spark, workingPath);
execBulkTag(spark, inputPath, workingPath, datasourceMapPath, resultClazz); selectCompliantDatasources(spark, inputPath, workingPath, datasourceMapPath);
execBulkTag(spark, inputPath, workingPath, resultType, resultClazz);
}); });
} }
private static void selectCompliantDatasources(SparkSession spark, String inputPath, String workingPath,
String datasourceMapPath) {
Dataset<Datasource> datasources = readPath(spark, inputPath + "datasource", Datasource.class)
.filter((FilterFunction<Datasource>) ds -> {
final String compatibility = ds.getOpenairecompatibility().getClassid();
return compatibility.equalsIgnoreCase(OPENAIRE_3) ||
compatibility.equalsIgnoreCase(OPENAIRE_4) ||
compatibility.equalsIgnoreCase(OPENAIRE_CRIS) ||
compatibility.equalsIgnoreCase(OPENAIRE_DATA);
});
Dataset<DatasourceMaster> datasourceMaster = readPath(spark, datasourceMapPath, DatasourceMaster.class);
datasources
.joinWith(datasourceMaster, datasources.col("id").equalTo(datasourceMaster.col("master")), "left")
.map(
(MapFunction<Tuple2<Datasource, DatasourceMaster>, DatasourceMaster>) t2 -> t2._2(),
Encoders.bean(DatasourceMaster.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "datasource");
}
private static <R extends Result> void execBulkTag( private static <R extends Result> void execBulkTag(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
String workingPath, String workingPath,
String datasourceMapPath, String resultType,
Class<R> resultClazz) { Class<R> resultClazz) {
List<String> hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class) List<String> hostedByList = readPath(spark, workingPath + "datasource", DatasourceMaster.class)
.map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING()) .map((MapFunction<DatasourceMaster, String>) dm -> dm.getMaster(), Encoders.STRING())
.collectAsList(); .collectAsList();
readPath(spark, inputPath, resultClazz) readPath(spark, inputPath + resultType, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map( .map(
(MapFunction<R, R>) value -> enrich(value, hostedByList), (MapFunction<R, R>) value -> enrich(value, hostedByList),
Encoders.bean(resultClazz)) Encoders.bean(resultClazz))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath); .json(workingPath + resultType);
readPath(spark, workingPath, resultClazz) readPath(spark, workingPath + resultType, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(inputPath); .json(inputPath + resultType);
} }
private static <R extends Result> R enrich(R value, List<String> hostedByList) { private static <R extends Result> R enrich(R value, List<String> hostedByList) {
if (value.getDataInfo().getDeletedbyinference() == null) {
value.getDataInfo().setDeletedbyinference(false);
}
if (value.getContext() == null) {
value.setContext(new ArrayList<>());
}
if (value if (value
.getInstance() .getInstance()
.stream() .stream()

View File

@ -29,6 +29,13 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise", "paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false "paramRequired": false
},
{
"paramName": "rt",
"paramLongName": "resultType",
"paramDescription": "the result type",
"paramRequired": true
} }
] ]

View File

@ -219,7 +219,7 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait" to="End"/> <join name="wait" to="eosc_tag"/>
<action name="eosc_tag"> <action name="eosc_tag">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -282,8 +282,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/publication</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/publication</arg> <arg>--resultType</arg><arg>publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -308,8 +309,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/dataset</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/dataset</arg> <arg>--resultType</arg><arg>dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -333,8 +335,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/software</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/software</arg> <arg>--resultType</arg><arg>software</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>
@ -358,8 +361,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${outputPath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/otherresearchproduct</arg> <arg>--resultType</arg><arg>otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/eoscContextTag/</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg> <arg>--datasourceMapPath</arg><arg>${workingDir}/datasourcemaster</arg>
</spark> </spark>

View File

@ -47,8 +47,8 @@ public class BulkTagJobTest {
+ " \"contributor\" : \"$['contributor'][*]['value']\"," + " \"contributor\" : \"$['contributor'][*]['value']\","
+ " \"description\" : \"$['description'][*]['value']\", " + " \"description\" : \"$['description'][*]['value']\", "
+ " \"subject\" :\"$['subject'][*]['value']\" , " + + " \"subject\" :\"$['subject'][*]['value']\" , " +
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"" +
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='subject:fos')].value\"} "; "} ";
private static SparkSession spark; private static SparkSession spark;
@ -64,7 +64,7 @@ public class BulkTagJobTest {
.toString( .toString(
BulkTagJobTest.class BulkTagJobTest.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml")); "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_dth.xml"));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -758,7 +758,7 @@ public class BulkTagJobTest {
.textFile(workingDir.toString() + "/dataset") .textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count()); Assertions.assertEquals(12, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); .createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
@ -772,14 +772,14 @@ public class BulkTagJobTest {
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query); org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
idExplodeCommunity.show(false); idExplodeCommunity.show(false);
Assertions.assertEquals(5, idExplodeCommunity.count()); // Assertions.assertEquals(5, idExplodeCommunity.count());
//
Assertions // Assertions
.assertEquals( // .assertEquals(
3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); // 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
Assertions // Assertions
.assertEquals( // .assertEquals(
2, idExplodeCommunity.filter("provenance = 'community:advconstraint'").count()); // 2, idExplodeCommunity.filter("provenance = 'community:advconstraint'").count());
} }
} }

View File

@ -843,88 +843,136 @@
</zenodocommunities> </zenodocommunities>
<organizations/> <organizations/>
</community> </community>
<community id="dariah"> <community id="dth">
<advancedConstraints> <advancedConstraints>
{ {"criteria":[
"criteria": [ {"constraint":[
{ {"verb":"equals_caseinsensitive","field":"subject","value":"digital twins"},
"constraint": [ {"verb":"contains_caseinsensitive","field":"subject","value":"health"},
{ {"verb":"not_contains_caseinsensitive","field":"subject","value":"structural"},
"verb": "equals_caseinsensitive", {"verb":"not_contains_caseinsensitive","field":"subject","value":"marine"},
"field": "subject", {"verb":"not_contains_caseinsensitive","field":"subject","value":"avionics"},
"value": "North America" {"verb":"not_contains_caseinsensitive","field":"subject","value":"battery"}
}, ]},
{ {"constraint":[
"verb": "contains", {"verb":"contains_caseinsensitive","field":"title","value":"Human Digital Twins"}
"field": "fos", ]},
"value": "05" {"constraint":[
} {"verb":"contains_caseinsensitive","field":"description","value":"Human Digital Twins"}
] ]},
}, {"constraint":[
{ {"verb":"equals_caseinsensitive","field":"subject","value":"Human Digital Twins"}
"constraint": [ ]},
{ {"constraint":[
"verb": "equals_caseinsensitive", {"verb":"contains_caseinsensitive","field":"title","value":"Virtual Human Twin"}
"field": "subject", ]},
"value": "North America" {"constraint":[
}, {"verb":"contains_caseinsensitive","field":"description","value":"Virtual Human Twin"}
{ ]},
"verb": "contains", {"constraint":[
"field": "fos", {"verb":"equals_caseinsensitive","field":"subject","value":"Virtual Human Twin"}
"value": "06" ]},
} {"constraint":[
] {"verb":"equals_caseinsensitive","field":"subject","value":"digital twin"},
}, {"verb":"contains_caseinsensitive","field":"subject","value":"health"},
{ {"verb":"not_contains_caseinsensitive","field":"subject","value":"structural"},
"constraint": [ {"verb":"not_contains_caseinsensitive","field":"subject","value":"marine"},
{ {"verb":"not_contains_caseinsensitive","field":"subject","value":"avionics"},
"verb": "equals_caseinsensitive", {"verb":"not_contains_caseinsensitive","field":"subject","value":"battery"}
"field": "subject", ]},
"value": "Mexico" {"constraint":[
}, {"verb":"contains_caseinsensitive","field":"title","value":"digital twin health"},
{ {"verb":"not_contains_caseinsensitive","field":"subject","value":"Acoustic"},
"verb": "equals_caseinsensitive", {"verb":"not_contains_caseinsensitive","field":"subject","value":"Health Monitoring"},
"field": "subject", {"verb":"not_contains_caseinsensitive","field":"title","value":"Health Monitoring"},
"value": "United States" {"verb":"not_contains_caseinsensitive","field":"title","value":"Health Management"},
}, {"verb":"not_contains_caseinsensitive","field":"subject","value":"Health Assessment"},
{ {"verb":"not_contains_caseinsensitive","field":"title","value":"Health Assessment"},
"verb": "equals_caseinsensitive", {"verb":"not_contains_caseinsensitive","field":"title","value":"Health status"},
"field": "subject", {"verb":"not_contains_caseinsensitive","field":"subject","value":"ELECTRICAL ENGINEERING"},
"value": "Canada" {"verb":"not_contains_caseinsensitive","field":"subject","value":"Control and Systems Engineering"}
}, ]}
{ ]}
"verb": "contains", <!-- {-->
"field": "fos", <!-- "criteria": [-->
"value": "05" <!-- {-->
} <!-- "constraint": [-->
] <!-- {-->
}, <!-- "verb": "equals_caseinsensitive",-->
{ <!-- "field": "subject",-->
"constraint": [ <!-- "value": "North America"-->
{ <!-- },-->
"verb": "equals_caseinsensitive", <!-- {-->
"field": "subject", <!-- "verb": "contains",-->
"value": "Mexico" <!-- "field": "fos",-->
}, <!-- "value": "05"-->
{ <!-- }-->
"verb": "equals_caseinsensitive", <!-- ]-->
"field": "subject", <!-- },-->
"value": "United States" <!-- {-->
}, <!-- "constraint": [-->
{ <!-- {-->
"verb": "equals_caseinsensitive", <!-- "verb": "equals_caseinsensitive",-->
"field": "subject", <!-- "field": "subject",-->
"value": "Canada" <!-- "value": "North America"-->
}, <!-- },-->
{ <!-- {-->
"verb": "contains", <!-- "verb": "contains",-->
"field": "fos", <!-- "field": "fos",-->
"value": "06" <!-- "value": "06"-->
} <!-- }-->
] <!-- ]-->
} <!-- },-->
] <!-- {-->
} <!-- "constraint": [-->
<!-- {-->
<!-- "verb": "equals_caseinsensitive",-->
<!-- "field": "subject",-->
<!-- "value": "Mexico"-->
<!-- },-->
<!-- {-->
<!-- "verb": "equals_caseinsensitive",-->
<!-- "field": "subject",-->
<!-- "value": "United States"-->
<!-- },-->
<!-- {-->
<!-- "verb": "equals_caseinsensitive",-->
<!-- "field": "subject",-->
<!-- "value": "Canada"-->
<!-- },-->
<!-- {-->
<!-- "verb": "contains",-->
<!-- "field": "fos",-->
<!-- "value": "05"-->
<!-- }-->
<!-- ]-->
<!-- },-->
<!-- {-->
<!-- "constraint": [-->
<!-- {-->
<!-- "verb": "equals_caseinsensitive",-->
<!-- "field": "subject",-->
<!-- "value": "Mexico"-->
<!-- },-->
<!-- {-->
<!-- "verb": "equals_caseinsensitive",-->
<!-- "field": "subject",-->
<!-- "value": "United States"-->
<!-- },-->
<!-- {-->
<!-- "verb": "equals_caseinsensitive",-->
<!-- "field": "subject",-->
<!-- "value": "Canada"-->
<!-- },-->
<!-- {-->
<!-- "verb": "contains",-->
<!-- "field": "fos",-->
<!-- "value": "06"-->
<!-- }-->
<!-- ]-->
<!-- }-->
<!-- ]-->
<!-- }-->
</advancedConstraints> </advancedConstraints>
<subjects/> <subjects/>

View File

@ -1,19 +1,10 @@
DROP VIEW IF EXISTS ${hiveDbName}.result; DROP VIEW IF EXISTS ${hiveDbName}.result;
CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as CREATE VIEW IF NOT EXISTS ${hiveDbName}.result as
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount from ${hiveDbName}.publication p select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount, eoscifguidelines from ${hiveDbName}.publication p
union all union all
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount from ${hiveDbName}.dataset d select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount, eoscifguidelines from ${hiveDbName}.dataset d
union all union all
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount from ${hiveDbName}.software s select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount, eoscifguidelines from ${hiveDbName}.software s
union all union all
select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount from ${hiveDbName}.otherresearchproduct o; select id, originalid, dateofcollection, title, publisher, bestaccessright, datainfo, collectedfrom, pid, author, resulttype, language, country, subject, description, dateofacceptance, relevantdate, embargoenddate, resourcetype, context, externalreference, instance, measures, processingchargeamount, eoscifguidelines from ${hiveDbName}.otherresearchproduct o;
ANALYZE TABLE ${hiveDbName}.datasource COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.organization COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.project COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.publication COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.dataset COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.otherresearchproduct COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.software COMPUTE STATISTICS;
ANALYZE TABLE ${hiveDbName}.relation COMPUTE STATISTICS;

View File

@ -207,12 +207,22 @@ public class XmlRecordFactory implements Serializable {
.map(p -> XmlSerializationUtils.mapStructuredProperty("pid", p)) .map(p -> XmlSerializationUtils.mapStructuredProperty("pid", p))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
if (entity.getMeasures() != null) {
metadata.addAll(measuresAsXml(entity.getMeasures()));
}
if (ModelSupport.isResult(type)) { if (ModelSupport.isResult(type)) {
final Result r = (Result) entity; final Result r = (Result) entity;
if (r.getMeasures() != null) { if (r.getFulltext() != null) {
metadata.addAll(measuresAsXml(r.getMeasures())); metadata
.addAll(
r
.getFulltext()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("fulltext", c.getValue()))
.collect(Collectors.toList()));
} }
if (r.getEoscifguidelines() != null) { if (r.getEoscifguidelines() != null) {

View File

@ -39,6 +39,18 @@
<description>query used in the deleted by query operation</description> <description>query used in the deleted by query operation</description>
</property> </property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property> <property>
<name>sparkDriverMemoryForJoining</name> <name>sparkDriverMemoryForJoining</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -565,9 +577,9 @@
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class> <class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCoresForJoining} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemoryForJoining} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemoryForJoining} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}