Compare commits

..

1 Commits

Author SHA1 Message Date
dimitrispie aedd279f78 Updates Promotion DBs
- Add a step for promoting the splitted monitor DBs
2023-07-13 15:35:46 +03:00
750 changed files with 6907 additions and 42235 deletions

View File

@ -1,2 +1,2 @@
# dnet-hadoop
Dnet-hadoop is the project that defined all the OOZIE workflows for the OpenAIRE Graph construction, processing, provisioning.
Dnet-hadoop is a tool for

View File

@ -8,6 +8,8 @@ import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.plugin.MojoFailureException;
/**
* Generates oozie properties which were not provided from commandline.
@ -25,7 +27,7 @@ public class GenerateOoziePropertiesMojo extends AbstractMojo {
};
@Override
public void execute() {
public void execute() throws MojoExecutionException, MojoFailureException {
if (System.getProperties().containsKey(PROPERTY_NAME_WF_SOURCE_DIR)
&& !System.getProperties().containsKey(PROPERTY_NAME_SANDBOX_NAME)) {
String generatedSandboxName = generateSandboxName(
@ -44,24 +46,24 @@ public class GenerateOoziePropertiesMojo extends AbstractMojo {
/**
* Generates sandbox name from workflow source directory.
*
* @param wfSourceDir workflow source directory
* @param wfSourceDir
* @return generated sandbox name
*/
private String generateSandboxName(String wfSourceDir) {
// utilize all dir names until finding one of the limiters
List<String> sandboxNameParts = new ArrayList<>();
List<String> sandboxNameParts = new ArrayList<String>();
String[] tokens = StringUtils.split(wfSourceDir, File.separatorChar);
ArrayUtils.reverse(tokens);
if (tokens.length > 0) {
for (String token : tokens) {
for (String limiter : limiters) {
if (limiter.equals(token)) {
return !sandboxNameParts.isEmpty()
return sandboxNameParts.size() > 0
? StringUtils.join(sandboxNameParts.toArray())
: null;
}
}
if (!sandboxNameParts.isEmpty()) {
if (sandboxNameParts.size() > 0) {
sandboxNameParts.add(0, File.separator);
}
sandboxNameParts.add(0, token);

View File

@ -16,7 +16,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -290,7 +289,7 @@ public class WritePredefinedProjectProperties extends AbstractMojo {
*/
protected List<String> getEscapeChars(String escapeChars) {
List<String> tokens = getListFromCSV(escapeChars);
List<String> realTokens = new ArrayList<>();
List<String> realTokens = new ArrayList<String>();
for (String token : tokens) {
String realToken = getRealToken(token);
realTokens.add(realToken);
@ -325,7 +324,7 @@ public class WritePredefinedProjectProperties extends AbstractMojo {
* @return content
*/
protected String getContent(String comment, Properties properties, List<String> escapeTokens) {
List<String> names = new ArrayList<>(properties.stringPropertyNames());
List<String> names = new ArrayList<String>(properties.stringPropertyNames());
Collections.sort(names);
StringBuilder sb = new StringBuilder();
if (!StringUtils.isBlank(comment)) {
@ -353,7 +352,7 @@ public class WritePredefinedProjectProperties extends AbstractMojo {
throws MojoExecutionException {
try {
String content = getContent(comment, properties, escapeTokens);
FileUtils.writeStringToFile(file, content, StandardCharsets.UTF_8);
FileUtils.writeStringToFile(file, content, ENCODING_UTF8);
} catch (IOException e) {
throw new MojoExecutionException("Error creating properties file", e);
}
@ -400,9 +399,9 @@ public class WritePredefinedProjectProperties extends AbstractMojo {
*/
protected static final List<String> getListFromCSV(String csv) {
if (StringUtils.isBlank(csv)) {
return new ArrayList<>();
return new ArrayList<String>();
}
List<String> list = new ArrayList<>();
List<String> list = new ArrayList<String>();
String[] tokens = StringUtils.split(csv, ",");
for (String token : tokens) {
list.add(token.trim());

View File

@ -9,18 +9,18 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** @author mhorst, claudio.atzori */
class GenerateOoziePropertiesMojoTest {
public class GenerateOoziePropertiesMojoTest {
private final GenerateOoziePropertiesMojo mojo = new GenerateOoziePropertiesMojo();
@BeforeEach
void clearSystemProperties() {
public void clearSystemProperties() {
System.clearProperty(PROPERTY_NAME_SANDBOX_NAME);
System.clearProperty(PROPERTY_NAME_WF_SOURCE_DIR);
}
@Test
void testExecuteEmpty() throws Exception {
public void testExecuteEmpty() throws Exception {
// execute
mojo.execute();
@ -29,7 +29,7 @@ class GenerateOoziePropertiesMojoTest {
}
@Test
void testExecuteSandboxNameAlreadySet() throws Exception {
public void testExecuteSandboxNameAlreadySet() throws Exception {
// given
String workflowSourceDir = "eu/dnetlib/dhp/wf/transformers";
String sandboxName = "originalSandboxName";
@ -44,7 +44,7 @@ class GenerateOoziePropertiesMojoTest {
}
@Test
void testExecuteEmptyWorkflowSourceDir() throws Exception {
public void testExecuteEmptyWorkflowSourceDir() throws Exception {
// given
String workflowSourceDir = "";
System.setProperty(PROPERTY_NAME_WF_SOURCE_DIR, workflowSourceDir);
@ -57,7 +57,7 @@ class GenerateOoziePropertiesMojoTest {
}
@Test
void testExecuteNullSandboxNameGenerated() throws Exception {
public void testExecuteNullSandboxNameGenerated() throws Exception {
// given
String workflowSourceDir = "eu/dnetlib/dhp/";
System.setProperty(PROPERTY_NAME_WF_SOURCE_DIR, workflowSourceDir);
@ -70,7 +70,7 @@ class GenerateOoziePropertiesMojoTest {
}
@Test
void testExecute() throws Exception {
public void testExecute() throws Exception {
// given
String workflowSourceDir = "eu/dnetlib/dhp/wf/transformers";
System.setProperty(PROPERTY_NAME_WF_SOURCE_DIR, workflowSourceDir);
@ -83,7 +83,7 @@ class GenerateOoziePropertiesMojoTest {
}
@Test
void testExecuteWithoutRoot() throws Exception {
public void testExecuteWithoutRoot() throws Exception {
// given
String workflowSourceDir = "wf/transformers";
System.setProperty(PROPERTY_NAME_WF_SOURCE_DIR, workflowSourceDir);

View File

@ -20,7 +20,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
/** @author mhorst, claudio.atzori */
@ExtendWith(MockitoExtension.class)
class WritePredefinedProjectPropertiesTest {
public class WritePredefinedProjectPropertiesTest {
@Mock
private MavenProject mavenProject;
@ -39,7 +39,7 @@ class WritePredefinedProjectPropertiesTest {
// ----------------------------------- TESTS ---------------------------------------------
@Test
void testExecuteEmpty() throws Exception {
public void testExecuteEmpty() throws Exception {
// execute
mojo.execute();
@ -50,7 +50,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithProjectProperties() throws Exception {
public void testExecuteWithProjectProperties() throws Exception {
// given
String key = "projectPropertyKey";
String value = "projectPropertyValue";
@ -70,7 +70,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test()
void testExecuteWithProjectPropertiesAndInvalidOutputFile(@TempDir File testFolder) {
public void testExecuteWithProjectPropertiesAndInvalidOutputFile(@TempDir File testFolder) {
// given
String key = "projectPropertyKey";
String value = "projectPropertyValue";
@ -84,7 +84,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithProjectPropertiesExclusion(@TempDir File testFolder) throws Exception {
public void testExecuteWithProjectPropertiesExclusion(@TempDir File testFolder) throws Exception {
// given
String key = "projectPropertyKey";
String value = "projectPropertyValue";
@ -108,7 +108,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithProjectPropertiesInclusion(@TempDir File testFolder) throws Exception {
public void testExecuteWithProjectPropertiesInclusion(@TempDir File testFolder) throws Exception {
// given
String key = "projectPropertyKey";
String value = "projectPropertyValue";
@ -132,7 +132,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteIncludingPropertyKeysFromFile(@TempDir File testFolder) throws Exception {
public void testExecuteIncludingPropertyKeysFromFile(@TempDir File testFolder) throws Exception {
// given
String key = "projectPropertyKey";
String value = "projectPropertyValue";
@ -164,7 +164,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteIncludingPropertyKeysFromClasspathResource(@TempDir File testFolder)
public void testExecuteIncludingPropertyKeysFromClasspathResource(@TempDir File testFolder)
throws Exception {
// given
String key = "projectPropertyKey";
@ -194,7 +194,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteIncludingPropertyKeysFromBlankLocation() {
public void testExecuteIncludingPropertyKeysFromBlankLocation() {
// given
String key = "projectPropertyKey";
String value = "projectPropertyValue";
@ -214,7 +214,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteIncludingPropertyKeysFromXmlFile(@TempDir File testFolder)
public void testExecuteIncludingPropertyKeysFromXmlFile(@TempDir File testFolder)
throws Exception {
// given
String key = "projectPropertyKey";
@ -247,7 +247,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteIncludingPropertyKeysFromInvalidXmlFile(@TempDir File testFolder)
public void testExecuteIncludingPropertyKeysFromInvalidXmlFile(@TempDir File testFolder)
throws Exception {
// given
String key = "projectPropertyKey";
@ -273,7 +273,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithQuietModeOn(@TempDir File testFolder) throws Exception {
public void testExecuteWithQuietModeOn(@TempDir File testFolder) throws Exception {
// given
mojo.setQuiet(true);
mojo.setIncludePropertyKeysFromFiles(new String[] {
@ -290,7 +290,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteIncludingPropertyKeysFromInvalidFile() {
public void testExecuteIncludingPropertyKeysFromInvalidFile() {
// given
mojo.setIncludePropertyKeysFromFiles(new String[] {
"invalid location"
@ -301,7 +301,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithEnvironmentProperties(@TempDir File testFolder) throws Exception {
public void testExecuteWithEnvironmentProperties(@TempDir File testFolder) throws Exception {
// given
mojo.setIncludeEnvironmentVariables(true);
@ -318,7 +318,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithSystemProperties(@TempDir File testFolder) throws Exception {
public void testExecuteWithSystemProperties(@TempDir File testFolder) throws Exception {
// given
String key = "systemPropertyKey";
String value = "systemPropertyValue";
@ -337,7 +337,7 @@ class WritePredefinedProjectPropertiesTest {
}
@Test
void testExecuteWithSystemPropertiesAndEscapeChars(@TempDir File testFolder)
public void testExecuteWithSystemPropertiesAndEscapeChars(@TempDir File testFolder)
throws Exception {
// given
String key = "systemPropertyKey ";

View File

@ -25,11 +25,6 @@
<groupId>com.github.sisyphsu</groupId>
<artifactId>dateparser</artifactId>
</dependency>
<dependency>
<groupId>me.xuender</groupId>
<artifactId>unidecode</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
@ -117,11 +112,6 @@
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.application;
import java.io.*;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Maps;
public class ApplicationUtils {
}

View File

@ -56,13 +56,13 @@ public class ArgumentApplicationParser implements Serializable {
final StringWriter stringWriter = new StringWriter();
IOUtils.copy(gis, stringWriter);
return stringWriter.toString();
} catch (IOException e) {
log.error("Wrong value to decompress: {}", abstractCompressed);
throw new IllegalArgumentException(e);
} catch (Throwable e) {
log.error("Wrong value to decompress:" + abstractCompressed);
throw new RuntimeException(e);
}
}
public static String compressArgument(final String value) throws IOException {
public static String compressArgument(final String value) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(value.getBytes());

View File

@ -9,6 +9,9 @@ public class OptionsParameter {
private boolean paramRequired;
private boolean compressed;
public OptionsParameter() {
}
public String getParamName() {
return paramName;
}

View File

@ -34,7 +34,7 @@ public class ApiDescriptor {
return params;
}
public void setParams(final Map<String, String> params) {
public void setParams(final HashMap<String, String> params) {
this.params = params;
}

View File

@ -12,9 +12,6 @@ public class Constants {
public static String COAR_ACCESS_RIGHT_SCHEMA = "http://vocabularies.coar-repositories.org/documentation/access_rights/";
private Constants() {
}
static {
accessRightsCoarMap.put("OPEN", "c_abf2");
accessRightsCoarMap.put("RESTRICTED", "c_16ec");
@ -52,10 +49,4 @@ public class Constants {
public static final String CONTENT_INVALIDRECORDS = "InvalidRecords";
public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems";
// IETF Draft and used by Repositories like ZENODO , not included in APACHE HTTP java packages
// see https://ietf-wg-httpapi.github.io/ratelimit-headers/draft-ietf-httpapi-ratelimit-headers.html
public static final String HTTPHEADER_IETF_DRAFT_RATELIMIT_LIMIT = "X-RateLimit-Limit";
public static final String HTTPHEADER_IETF_DRAFT_RATELIMIT_REMAINING = "X-RateLimit-Remaining";
public static final String HTTPHEADER_IETF_DRAFT_RATELIMIT_RESET = "X-RateLimit-Reset";
}

View File

@ -84,7 +84,7 @@ public class GraphResultMapper implements Serializable {
.setDocumentationUrl(
value
.stream()
.map(Field::getValue)
.map(v -> v.getValue())
.collect(Collectors.toList())));
Optional
@ -100,20 +100,20 @@ public class GraphResultMapper implements Serializable {
.setContactgroup(
Optional
.ofNullable(ir.getContactgroup())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.map(value -> value.stream().map(cg -> cg.getValue()).collect(Collectors.toList()))
.orElse(null));
out
.setContactperson(
Optional
.ofNullable(ir.getContactperson())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.map(value -> value.stream().map(cp -> cp.getValue()).collect(Collectors.toList()))
.orElse(null));
out
.setTool(
Optional
.ofNullable(ir.getTool())
.map(value -> value.stream().map(Field::getValue).collect(Collectors.toList()))
.map(value -> value.stream().map(t -> t.getValue()).collect(Collectors.toList()))
.orElse(null));
out.setType(ModelConstants.ORP_DEFAULT_RESULTTYPE.getClassname());
@ -123,8 +123,7 @@ public class GraphResultMapper implements Serializable {
Optional
.ofNullable(input.getAuthor())
.ifPresent(
ats -> out.setAuthor(ats.stream().map(GraphResultMapper::getAuthor).collect(Collectors.toList())));
.ifPresent(ats -> out.setAuthor(ats.stream().map(at -> getAuthor(at)).collect(Collectors.toList())));
// I do not map Access Right UNKNOWN or OTHER
@ -211,7 +210,7 @@ public class GraphResultMapper implements Serializable {
if (oInst.isPresent()) {
out
.setInstance(
oInst.get().stream().map(GraphResultMapper::getInstance).collect(Collectors.toList()));
oInst.get().stream().map(i -> getInstance(i)).collect(Collectors.toList()));
}
@ -231,7 +230,7 @@ public class GraphResultMapper implements Serializable {
.stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("main title"))
.collect(Collectors.toList());
if (!iTitle.isEmpty()) {
if (iTitle.size() > 0) {
out.setMaintitle(iTitle.get(0).getValue());
}
@ -240,7 +239,7 @@ public class GraphResultMapper implements Serializable {
.stream()
.filter(t -> t.getQualifier().getClassid().equalsIgnoreCase("subtitle"))
.collect(Collectors.toList());
if (!iTitle.isEmpty()) {
if (iTitle.size() > 0) {
out.setSubtitle(iTitle.get(0).getValue());
}

View File

@ -28,7 +28,7 @@ public class HdfsSupport {
* @param configuration Configuration of hadoop env
*/
public static boolean exists(String path, Configuration configuration) {
logger.info("Checking existence for path: {}", path);
logger.info("Removing path: {}", path);
return rethrowAsRuntimeException(
() -> {
Path f = new Path(path);

View File

@ -14,33 +14,38 @@ public class MakeTarArchive implements Serializable {
private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException {
Path hdfsWritePath = new Path(outputPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, true);
}
return new TarArchiveOutputStream(fileSystem.create(hdfsWritePath).getWrappedStream());
fsDataOutputStream = fileSystem.create(hdfsWritePath);
return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
}
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name)
throws IOException {
Path hdfsWritePath = new Path(outputPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, true);
}
try (TarArchiveOutputStream ar = new TarArchiveOutputStream(
fileSystem.create(hdfsWritePath).getWrappedStream())) {
fsDataOutputStream = fileSystem.create(hdfsWritePath);
RemoteIterator<LocatedFileStatus> iterator = fileSystem
.listFiles(
new Path(inputPath), true);
TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
while (iterator.hasNext()) {
writeCurrentFile(fileSystem, dir_name, iterator, ar, 0);
}
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(inputPath), true);
while (fileStatusListIterator.hasNext()) {
writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0);
}
ar.close();
}
public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name,
@ -85,13 +90,6 @@ public class MakeTarArchive implements Serializable {
String p_string = p.toString();
if (!p_string.endsWith("_SUCCESS")) {
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
if (name.startsWith("part-") & name.length() > 10) {
String tmp = name.substring(0, 10);
if (name.contains(".")) {
tmp += name.substring(name.indexOf("."));
}
name = tmp;
}
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name);
entry.setSize(fileStatus.getLen());
current_size += fileStatus.getLen();

View File

@ -10,6 +10,8 @@ import java.util.Optional;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -19,7 +21,6 @@ import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.QueryBuilder;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@ -45,7 +46,7 @@ public class MdstoreClient implements Closeable {
final String currentId = Optional
.ofNullable(getColl(db, COLL_METADATA_MANAGER, true).find(query))
.map(FindIterable::first)
.map(r -> r.first())
.map(d -> d.getString("currentId"))
.orElseThrow(() -> new IllegalArgumentException("cannot find current mdstore id for: " + mdId));
@ -83,7 +84,7 @@ public class MdstoreClient implements Closeable {
if (!Iterables.contains(client.listDatabaseNames(), dbName)) {
final String err = String.format("Database '%s' not found in %s", dbName, client.getAddress());
log.warn(err);
throw new IllegalArgumentException(err);
throw new RuntimeException(err);
}
return client.getDatabase(dbName);
}
@ -96,7 +97,7 @@ public class MdstoreClient implements Closeable {
String.format("Missing collection '%s' in database '%s'", collName, db.getName()));
log.warn(err);
if (abortIfMissing) {
throw new IllegalArgumentException(err);
throw new RuntimeException(err);
} else {
return null;
}

View File

@ -24,6 +24,7 @@ import com.google.common.hash.Hashing;
*/
public class PacePerson {
private static final String UTF8 = "UTF-8";
private List<String> name = Lists.newArrayList();
private List<String> surname = Lists.newArrayList();
private List<String> fullname = Lists.newArrayList();

View File

@ -5,9 +5,6 @@ import java.io.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;
import com.google.gson.Gson;
import eu.dnetlib.dhp.common.api.zenodo.ZenodoModel;
@ -46,7 +43,7 @@ public class ZenodoAPIClient implements Serializable {
this.deposition_id = deposition_id;
}
public ZenodoAPIClient(String urlString, String access_token) {
public ZenodoAPIClient(String urlString, String access_token) throws IOException {
this.urlString = urlString;
this.access_token = access_token;
@ -66,8 +63,8 @@ public class ZenodoAPIClient implements Serializable {
Request request = new Request.Builder()
.url(urlString)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.addHeader("Content-Type", "application/json") // add request headers
.addHeader("Authorization", "Bearer " + access_token)
.post(body)
.build();
@ -106,8 +103,8 @@ public class ZenodoAPIClient implements Serializable {
Request request = new Request.Builder()
.url(bucket + "/" + file_name)
.addHeader(HttpHeaders.CONTENT_TYPE, "application/zip") // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.addHeader("Content-Type", "application/zip") // add request headers
.addHeader("Authorization", "Bearer " + access_token)
.put(InputStreamRequestBody.create(MEDIA_TYPE_ZIP, is, len))
.build();
@ -133,8 +130,8 @@ public class ZenodoAPIClient implements Serializable {
Request request = new Request.Builder()
.url(urlString + "/" + deposition_id)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.addHeader("Content-Type", "application/json") // add request headers
.addHeader("Authorization", "Bearer " + access_token)
.put(body)
.build();
@ -200,7 +197,7 @@ public class ZenodoAPIClient implements Serializable {
Request request = new Request.Builder()
.url(urlString + "/" + deposition_id + "/actions/newversion")
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.addHeader("Authorization", "Bearer " + access_token)
.post(body)
.build();
@ -273,8 +270,8 @@ public class ZenodoAPIClient implements Serializable {
Request request = new Request.Builder()
.url(urlString)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.addHeader("Content-Type", "application/json") // add request headers
.addHeader("Authorization", "Bearer " + access_token)
.get()
.build();
@ -296,8 +293,8 @@ public class ZenodoAPIClient implements Serializable {
Request request = new Request.Builder()
.url(url)
.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()) // add request headers
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + access_token)
.addHeader("Content-Type", "application/json") // add request headers
.addHeader("Authorization", "Bearer " + access_token)
.get()
.build();

View File

@ -32,13 +32,13 @@ public class Creator {
public static Creator newInstance(String name, String affiliation, String orcid) {
Creator c = new Creator();
if (name != null) {
if (!(name == null)) {
c.name = name;
}
if (affiliation != null) {
if (!(affiliation == null)) {
c.affiliation = affiliation;
}
if (orcid != null) {
if (!(orcid == null)) {
c.orcid = orcid;
}

View File

@ -3,12 +3,17 @@ package eu.dnetlib.dhp.common.api.zenodo;
import java.io.Serializable;
import net.minidev.json.annotate.JsonIgnore;
public class File implements Serializable {
private String checksum;
private String filename;
private long filesize;
private String id;
@JsonIgnore
// private Links links;
public String getChecksum() {
return checksum;
}
@ -41,4 +46,13 @@ public class File implements Serializable {
this.id = id;
}
// @JsonIgnore
// public Links getLinks() {
// return links;
// }
//
// @JsonIgnore
// public void setLinks(Links links) {
// this.links = links;
// }
}

View File

@ -1,56 +0,0 @@
package eu.dnetlib.dhp.common.collection;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.bean.CsvToBeanBuilder;
public class GetCSV {
public static final char DEFAULT_DELIMITER = ',';
private GetCSV() {
}
public static void getCsv(FileSystem fileSystem, BufferedReader reader, String hdfsPath,
String modelClass) throws IOException, ClassNotFoundException {
getCsv(fileSystem, reader, hdfsPath, modelClass, DEFAULT_DELIMITER);
}
public static void getCsv(FileSystem fileSystem, Reader reader, String hdfsPath,
String modelClass, char delimiter) throws IOException, ClassNotFoundException {
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, false);
}
fsDataOutputStream = fileSystem.create(hdfsWritePath);
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8))) {
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked")
final List lines = new CsvToBeanBuilder(reader)
.withType(Class.forName(modelClass))
.withSeparator(delimiter)
.build()
.parse();
for (Object line : lines) {
writer.write(mapper.writeValueAsString(line));
writer.newLine();
}
}
}
}

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.common.rest;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
@ -23,20 +23,17 @@ public class DNetRestClient {
private static final ObjectMapper mapper = new ObjectMapper();
private DNetRestClient() {
}
public static <T> T doGET(final String url, Class<T> clazz) throws Exception {
final HttpGet httpGet = new HttpGet(url);
return doHTTPRequest(httpGet, clazz);
}
public static String doGET(final String url) throws IOException {
public static String doGET(final String url) throws Exception {
final HttpGet httpGet = new HttpGet(url);
return doHTTPRequest(httpGet);
}
public static <V> String doPOST(final String url, V objParam) throws IOException {
public static <V> String doPOST(final String url, V objParam) throws Exception {
final HttpPost httpPost = new HttpPost(url);
if (objParam != null) {
@ -48,25 +45,25 @@ public class DNetRestClient {
return doHTTPRequest(httpPost);
}
public static <T, V> T doPOST(final String url, V objParam, Class<T> clazz) throws IOException {
public static <T, V> T doPOST(final String url, V objParam, Class<T> clazz) throws Exception {
return mapper.readValue(doPOST(url, objParam), clazz);
}
private static String doHTTPRequest(final HttpUriRequest r) throws IOException {
try (CloseableHttpClient client = HttpClients.createDefault()) {
private static String doHTTPRequest(final HttpUriRequest r) throws Exception {
CloseableHttpClient client = HttpClients.createDefault();
log.info("performing HTTP request, method {} on URI {}", r.getMethod(), r.getURI().toString());
log
.info(
"request headers: {}",
Arrays
.asList(r.getAllHeaders())
.stream()
.map(h -> h.getName() + ":" + h.getValue())
.collect(Collectors.joining(",")));
log.info("performing HTTP request, method {} on URI {}", r.getMethod(), r.getURI().toString());
log
.info(
"request headers: {}",
Arrays
.asList(r.getAllHeaders())
.stream()
.map(h -> h.getName() + ":" + h.getValue())
.collect(Collectors.joining(",")));
return IOUtils.toString(client.execute(r).getEntity().getContent());
}
CloseableHttpResponse response = client.execute(r);
return IOUtils.toString(response.getEntity().getContent());
}
private static <T> T doHTTPRequest(final HttpUriRequest r, Class<T> clazz) throws Exception {

View File

@ -46,7 +46,7 @@ public class Vocabulary implements Serializable {
}
public VocabularyTerm getTerm(final String id) {
return Optional.ofNullable(id).map(String::toLowerCase).map(terms::get).orElse(null);
return Optional.ofNullable(id).map(s -> s.toLowerCase()).map(s -> terms.get(s)).orElse(null);
}
protected void addTerm(final String id, final String name) {
@ -81,6 +81,7 @@ public class Vocabulary implements Serializable {
.ofNullable(getTermBySynonym(syn))
.map(term -> getTermAsQualifier(term.getId()))
.orElse(null);
// .orElse(OafMapperUtils.unknown(getId(), getName()));
}
}

View File

@ -46,6 +46,7 @@ public class VocabularyGroup implements Serializable {
}
vocs.addTerm(vocId, termId, termName);
// vocs.addSynonyms(vocId, termId, termId);
}
}
@ -57,6 +58,7 @@ public class VocabularyGroup implements Serializable {
final String syn = arr[2].trim();
vocs.addSynonyms(vocId, termId, syn);
// vocs.addSynonyms(vocId, termId, termId);
}
}
@ -96,7 +98,7 @@ public class VocabularyGroup implements Serializable {
.getTerms()
.values()
.stream()
.map(VocabularyTerm::getId)
.map(t -> t.getId())
.collect(Collectors.toCollection(HashSet::new));
}
@ -152,19 +154,16 @@ public class VocabularyGroup implements Serializable {
return Optional
.ofNullable(vocId)
.map(String::toLowerCase)
.map(vocs::containsKey)
.map(id -> vocs.containsKey(id))
.orElse(false);
}
private void addSynonyms(final String vocId, final String termId, final String syn) {
String id = Optional
.ofNullable(vocId)
.map(String::toLowerCase)
.map(s -> s.toLowerCase())
.orElseThrow(
() -> new IllegalArgumentException(
String
.format(
"empty vocabulary id for [term:%s, synonym:%s]", termId, syn)));
() -> new IllegalArgumentException(String.format("empty vocabulary id for [term:%s, synonym:%s]")));
Optional
.ofNullable(vocs.get(id))
.orElseThrow(() -> new IllegalArgumentException("missing vocabulary id: " + vocId))

View File

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.message;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -9,8 +10,8 @@ public class Message implements Serializable {
private static final long serialVersionUID = 401753881204524893L;
public static final String CURRENT_PARAM = "current";
public static final String TOTAL_PARAM = "total";
public static String CURRENT_PARAM = "current";
public static String TOTAL_PARAM = "total";
private MessageType messageType;

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.merge;
import java.text.Normalizer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
@ -18,9 +19,6 @@ public class AuthorMerger {
private static final Double THRESHOLD = 0.95;
private AuthorMerger() {
}
public static List<Author> merge(List<List<Author>> authors) {
authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2)));
@ -38,8 +36,7 @@ public class AuthorMerger {
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b, Double threshold) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
List<Author> base;
List<Author> enrich;
List<Author> base, enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
@ -65,24 +62,22 @@ public class AuthorMerger {
// <pidComparableString, Author> (if an Author has more than 1 pid, it appears 2 times in the list)
final Map<String, Author> basePidAuthorMap = base
.stream()
.filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.filter(Objects::nonNull)
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
// <pid, Author> (list of pid that are missing in the other list)
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
.stream()
.filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
@ -120,9 +115,9 @@ public class AuthorMerger {
}
public static String pidToComparableString(StructuredProperty pid) {
final String classid = pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase()
: "";
return (pid.getQualifier() != null ? classid : "")
return (pid.getQualifier() != null
? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
: "")
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
}
@ -155,7 +150,7 @@ public class AuthorMerger {
}
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().isEmpty())
if (a == null || a.getPid() == null || a.getPid().size() == 0)
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
@ -164,10 +159,7 @@ public class AuthorMerger {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
if (StringUtils.isNotBlank(author.getFullname()))
return new Person(author.getFullname(), false);
else
return new Person("", false);
return new Person(author.getFullname(), false);
}
}

View File

@ -12,9 +12,6 @@ import com.ximpleware.VTDNav;
/** Created by sandro on 9/29/16. */
public class VtdUtilityParser {
private VtdUtilityParser() {
}
public static List<Node> getTextValuesWithAttributes(
final AutoPilot ap, final VTDNav vn, final String xpath, final List<String> attributes)
throws VtdException {

View File

@ -7,19 +7,22 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import com.github.sisyphsu.dateparser.DateParserUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import me.xuender.unidecode.Unidecode;
public class GraphCleaningFunctions extends CleaningFunctions {
@ -27,11 +30,8 @@ public class GraphCleaningFunctions extends CleaningFunctions {
public static final int ORCID_LEN = 19;
public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)";
public static final String INVALID_AUTHOR_REGEX = ".*deactivated.*";
public static final String TITLE_TEST = "test";
public static final String TITLE_FILTER_REGEX = String.format("(%s)|\\W|\\d", TITLE_TEST);
public static final int TITLE_FILTER_RESIDUAL_LENGTH = 5;
public static final String TITLE_FILTER_REGEX = "[.*test.*\\W\\d]";
public static final int TITLE_FILTER_RESIDUAL_LENGTH = 10;
public static <T extends Oaf> T fixVocabularyNames(T value) {
if (value instanceof Datasource) {
@ -194,21 +194,11 @@ public class GraphCleaningFunctions extends CleaningFunctions {
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
.filter(
sp -> {
final String title = sp
.getValue()
.toLowerCase();
final String decoded = Unidecode.decode(title);
if (StringUtils.contains(decoded, TITLE_TEST)) {
return decoded
.replaceAll(TITLE_FILTER_REGEX, "")
.length() > TITLE_FILTER_RESIDUAL_LENGTH;
}
return !decoded
.replaceAll("\\W|\\d", "")
.isEmpty();
})
sp -> sp
.getValue()
.toLowerCase()
.replaceAll(TITLE_FILTER_REGEX, "")
.length() > TITLE_FILTER_RESIDUAL_LENGTH)
.map(GraphCleaningFunctions::cleanValue)
.collect(Collectors.toList()));
}
@ -293,7 +283,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
r
.getAuthor()
.stream()
.filter(Objects::nonNull)
.filter(a -> Objects.nonNull(a))
.filter(a -> StringUtils.isNotBlank(a.getFullname()))
.filter(a -> StringUtils.isNotBlank(a.getFullname().replaceAll("[\\W]", "")))
.collect(Collectors.toList()));

View File

@ -17,16 +17,13 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class OafMapperUtils {
private OafMapperUtils() {
}
public static Oaf merge(final Oaf left, final Oaf right) {
if (ModelSupport.isSubClass(left, OafEntity.class)) {
return mergeEntities((OafEntity) left, (OafEntity) right);
} else if (ModelSupport.isSubClass(left, Relation.class)) {
((Relation) left).mergeFrom((Relation) right);
} else {
throw new IllegalArgumentException("invalid Oaf type:" + left.getClass().getCanonicalName());
throw new RuntimeException("invalid Oaf type:" + left.getClass().getCanonicalName());
}
return left;
}
@ -41,7 +38,7 @@ public class OafMapperUtils {
} else if (ModelSupport.isSubClass(left, Project.class)) {
left.mergeFrom(right);
} else {
throw new IllegalArgumentException("invalid OafEntity subtype:" + left.getClass().getCanonicalName());
throw new RuntimeException("invalid OafEntity subtype:" + left.getClass().getCanonicalName());
}
return left;
}
@ -65,7 +62,7 @@ public class OafMapperUtils {
public static List<KeyValue> listKeyValues(final String... s) {
if (s.length % 2 > 0) {
throw new IllegalArgumentException("Invalid number of parameters (k,v,k,v,....)");
throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)");
}
final List<KeyValue> list = new ArrayList<>();
@ -91,7 +88,7 @@ public class OafMapperUtils {
.stream(values)
.map(v -> field(v, info))
.filter(Objects::nonNull)
.filter(distinctByKey(Field::getValue))
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
@ -100,7 +97,7 @@ public class OafMapperUtils {
.stream()
.map(v -> field(v, info))
.filter(Objects::nonNull)
.filter(distinctByKey(Field::getValue))
.filter(distinctByKey(f -> f.getValue()))
.collect(Collectors.toList());
}
@ -345,10 +342,10 @@ public class OafMapperUtils {
if (instanceList != null) {
final Optional<AccessRight> min = instanceList
.stream()
.map(Instance::getAccessright)
.map(i -> i.getAccessright())
.min(new AccessRightComparator<>());
final Qualifier rights = min.map(OafMapperUtils::qualifier).orElseGet(Qualifier::new);
final Qualifier rights = min.isPresent() ? qualifier(min.get()) : new Qualifier();
if (StringUtils.isBlank(rights.getClassid())) {
rights.setClassid(UNKNOWN);

View File

@ -4,19 +4,19 @@ package eu.dnetlib.dhp.utils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.*;
import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
@ -26,8 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import net.minidev.json.JSONArray;
import scala.collection.JavaConverters;
import scala.collection.Seq;
@ -36,9 +34,6 @@ public class DHPUtils {
private static final Logger log = LoggerFactory.getLogger(DHPUtils.class);
private DHPUtils() {
}
public static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
@ -49,59 +44,40 @@ public class DHPUtils {
md.update(s.getBytes(StandardCharsets.UTF_8));
return new String(Hex.encodeHex(md.digest()));
} catch (final Exception e) {
log.error("Error creating id from {}", s);
System.err.println("Error creating id");
return null;
}
}
/**
* Retrieves from the metadata store manager application the list of paths associated with mdstores characterized
* by he given format, layout, interpretation
* @param mdstoreManagerUrl the URL of the mdstore manager service
* @param format the mdstore format
* @param layout the mdstore layout
* @param interpretation the mdstore interpretation
* @param includeEmpty include Empty mdstores
* @return the set of hdfs paths
* @throws IOException in case of HTTP communication issues
*/
public static Set<String> mdstorePaths(final String mdstoreManagerUrl,
final String format,
final String layout,
final String interpretation,
boolean includeEmpty) throws IOException {
final String url = mdstoreManagerUrl + "/mdstores/";
final ObjectMapper objectMapper = new ObjectMapper();
final HttpGet req = new HttpGet(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String json = IOUtils.toString(response.getEntity().getContent());
final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class);
return Arrays
.stream(mdstores)
.filter(md -> md.getFormat().equalsIgnoreCase(format))
.filter(md -> md.getLayout().equalsIgnoreCase(layout))
.filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation))
.filter(md -> StringUtils.isNotBlank(md.getHdfsPath()))
.filter(md -> StringUtils.isNotBlank(md.getCurrentVersion()))
.filter(md -> includeEmpty || md.getSize() > 0)
.map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store")
.collect(Collectors.toSet());
}
}
}
public static String generateIdentifier(final String originalId, final String nsPrefix) {
return String.format("%s::%s", nsPrefix, DHPUtils.md5(originalId));
}
public static String generateUnresolvedIdentifier(final String pid, final String pidType) {
public static String compressString(final String input) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
Base64OutputStream b64os = new Base64OutputStream(out)) {
GZIPOutputStream gzip = new GZIPOutputStream(b64os);
gzip.write(input.getBytes(StandardCharsets.UTF_8));
gzip.close();
return out.toString();
} catch (Throwable e) {
return null;
}
}
final String cleanedPid = CleaningFunctions.normalizePidValue(pidType, pid);
return String.format("unresolved::%s::%s", cleanedPid, pidType.toLowerCase().trim());
public static String decompressString(final String input) {
byte[] byteArray = Base64.decodeBase64(input.getBytes());
int len;
try (GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream((byteArray)));
ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length)) {
byte[] buffer = new byte[1024];
while ((len = gis.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
return bos.toString();
} catch (Exception e) {
return null;
}
}
public static String getJPathString(final String jsonPath, final String json) {

View File

@ -18,16 +18,13 @@ public class ISLookupClientFactory {
private static final int requestTimeout = 60000 * 10;
private static final int connectTimeout = 60000 * 10;
private ISLookupClientFactory() {
}
public static ISLookUpService getLookUpService(final String isLookupUrl) {
return getServiceStub(ISLookUpService.class, isLookupUrl);
}
@SuppressWarnings("unchecked")
private static <T> T getServiceStub(final Class<T> clazz, final String endpoint) {
log.info("creating {} stub from {}", clazz.getName(), endpoint);
log.info(String.format("creating %s stub from %s", clazz.getName(), endpoint));
final JaxWsProxyFactoryBean jaxWsProxyFactory = new JaxWsProxyFactoryBean();
jaxWsProxyFactory.setServiceClass(clazz);
jaxWsProxyFactory.setAddress(endpoint);
@ -41,10 +38,12 @@ public class ISLookupClientFactory {
log
.info(
"setting connectTimeout to {}, requestTimeout to {} for service {}",
connectTimeout,
requestTimeout,
clazz.getCanonicalName());
String
.format(
"setting connectTimeout to %s, requestTimeout to %s for service %s",
connectTimeout,
requestTimeout,
clazz.getCanonicalName()));
policy.setConnectionTimeout(connectTimeout);
policy.setReceiveTimeout(requestTimeout);

View File

@ -10,7 +10,7 @@ import net.sf.saxon.trans.XPathException;
public abstract class AbstractExtensionFunction extends ExtensionFunctionDefinition {
public static final String DEFAULT_SAXON_EXT_NS_URI = "http://www.d-net.research-infrastructures.eu/saxon-extension";
public static String DEFAULT_SAXON_EXT_NS_URI = "http://www.d-net.research-infrastructures.eu/saxon-extension";
public abstract String getName();

View File

@ -26,7 +26,7 @@ public class ExtractYear extends AbstractExtensionFunction {
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null || arguments.length == 0) {
if (arguments == null | arguments.length == 0) {
return new StringValue("");
}
final Item item = arguments[0].head();
@ -63,7 +63,8 @@ public class ExtractYear extends AbstractExtensionFunction {
for (String format : dateFormats) {
try {
c.setTime(new SimpleDateFormat(format).parse(s));
return String.valueOf(c.get(Calendar.YEAR));
String year = String.valueOf(c.get(Calendar.YEAR));
return year;
} catch (ParseException e) {
}
}

View File

@ -30,7 +30,7 @@ public class NormalizeDate extends AbstractExtensionFunction {
@Override
public Sequence doCall(XPathContext context, Sequence[] arguments) throws XPathException {
if (arguments == null || arguments.length == 0) {
if (arguments == null | arguments.length == 0) {
return new StringValue(BLANK);
}
String s = arguments[0].head().getStringValue();

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.utils.saxon;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.commons.lang3.StringUtils;
import net.sf.saxon.expr.XPathContext;
@ -28,8 +26,7 @@ public class PickFirst extends AbstractExtensionFunction {
final String s1 = getValue(arguments[0]);
final String s2 = getValue(arguments[1]);
final String value = isNotBlank(s1) ? s1 : isNotBlank(s2) ? s2 : "";
return new StringValue(value);
return new StringValue(StringUtils.isNotBlank(s1) ? s1 : StringUtils.isNotBlank(s2) ? s2 : "");
}
private String getValue(final Sequence arg) throws XPathException {

View File

@ -12,9 +12,6 @@ import net.sf.saxon.TransformerFactoryImpl;
public class SaxonTransformerFactory {
private SaxonTransformerFactory() {
}
/**
* Creates the index record transformer from the given XSLT
*

View File

@ -7,10 +7,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
class ArgumentApplicationParserTest {
public class ArgumentApplicationParserTest {
@Test
void testParseParameter() throws Exception {
public void testParseParameter() throws Exception {
final String jsonConfiguration = IOUtils
.toString(
this.getClass().getResourceAsStream("/eu/dnetlib/application/parameters.json"));

View File

@ -21,13 +21,13 @@ public class HdfsSupportTest {
class Remove {
@Test
void shouldThrowARuntimeExceptionOnError() {
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () -> HdfsSupport.remove(null, new Configuration()));
}
@Test
void shouldRemoveADirFromHDFS(@TempDir Path tempDir) {
public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) {
// when
HdfsSupport.remove(tempDir.toString(), new Configuration());
@ -36,7 +36,7 @@ public class HdfsSupportTest {
}
@Test
void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException {
public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException {
// given
Path file = Files.createTempFile(tempDir, "p", "s");
@ -52,13 +52,13 @@ public class HdfsSupportTest {
class ListFiles {
@Test
void shouldThrowARuntimeExceptionOnError() {
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () -> HdfsSupport.listFiles(null, new Configuration()));
}
@Test
void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException {
public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException {
Path subDir1 = Files.createTempDirectory(tempDir, "list_me");
Path subDir2 = Files.createTempDirectory(tempDir, "list_me");

View File

@ -5,10 +5,10 @@ import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;
class PacePersonTest {
public class PacePersonTest {
@Test
void pacePersonTest1() {
public void pacePersonTest1() {
PacePerson p = new PacePerson("Artini, Michele", false);
assertEquals("Artini", p.getSurnameString());
@ -17,7 +17,7 @@ class PacePersonTest {
}
@Test
void pacePersonTest2() {
public void pacePersonTest2() {
PacePerson p = new PacePerson("Michele G. Artini", false);
assertEquals("Artini, Michele G.", p.getNormalisedFullname());
assertEquals("Michele G", p.getNameString());

View File

@ -18,8 +18,7 @@ public class SparkSessionSupportTest {
class RunWithSparkSession {
@Test
@SuppressWarnings("unchecked")
void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged()
public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged()
throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
@ -38,8 +37,7 @@ public class SparkSessionSupportTest {
}
@Test
@SuppressWarnings("unchecked")
void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged()
public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged()
throws Exception {
// given
SparkSession spark = mock(SparkSession.class);

View File

@ -12,7 +12,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@Disabled
class ZenodoAPIClientTest {
public class ZenodoAPIClientTest {
private final String URL_STRING = "https://sandbox.zenodo.org/api/deposit/depositions";
private final String ACCESS_TOKEN = "";
@ -22,7 +22,7 @@ class ZenodoAPIClientTest {
private final String depositionId = "674915";
@Test
void testUploadOldDeposition() throws IOException, MissingConceptDoiException {
public void testUploadOldDeposition() throws IOException, MissingConceptDoiException {
ZenodoAPIClient client = new ZenodoAPIClient(URL_STRING,
ACCESS_TOKEN);
Assertions.assertEquals(200, client.uploadOpenDeposition(depositionId));
@ -44,7 +44,7 @@ class ZenodoAPIClientTest {
}
@Test
void testNewDeposition() throws IOException {
public void testNewDeposition() throws IOException {
ZenodoAPIClient client = new ZenodoAPIClient(URL_STRING,
ACCESS_TOKEN);
@ -67,7 +67,7 @@ class ZenodoAPIClientTest {
}
@Test
void testNewVersionNewName() throws IOException, MissingConceptDoiException {
public void testNewVersionNewName() throws IOException, MissingConceptDoiException {
ZenodoAPIClient client = new ZenodoAPIClient(URL_STRING,
ACCESS_TOKEN);
@ -87,7 +87,7 @@ class ZenodoAPIClientTest {
}
@Test
void testNewVersionOldName() throws IOException, MissingConceptDoiException {
public void testNewVersionOldName() throws IOException, MissingConceptDoiException {
ZenodoAPIClient client = new ZenodoAPIClient(URL_STRING,
ACCESS_TOKEN);

View File

@ -21,7 +21,7 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
class AuthorMergerTest {
public class AuthorMergerTest {
private String publicationsBasePath;
@ -43,7 +43,7 @@ class AuthorMergerTest {
}
@Test
void mergeTest() { // used in the dedup: threshold set to 0.95
public void mergeTest() { // used in the dedup: threshold set to 0.95
for (List<Author> authors1 : authors) {
System.out.println("List " + (authors.indexOf(authors1) + 1));

View File

@ -4,8 +4,12 @@ package eu.dnetlib.dhp.schema.oaf.utils;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
@ -15,34 +19,15 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import me.xuender.unidecode.Unidecode;
import eu.dnetlib.dhp.schema.oaf.*;
class OafMapperUtilsTest {
public class OafMapperUtilsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Test
public void testUnidecode() {
assertEquals("Liu Ben Mu hiruzuSen tawa", Unidecode.decode("六本木ヒルズ森タワ"));
assertEquals("Nan Wu A Mi Tuo Fo", Unidecode.decode("南无阿弥陀佛"));
assertEquals("Yi Tiao Hui Zou Lu De Yu", Unidecode.decode("一条会走路的鱼"));
assertEquals("amidaniyorai", Unidecode.decode("あみだにょらい"));
assertEquals("T`owrk`iayi", Unidecode.decode("Թուրքիայի"));
assertEquals("Obzor tematiki", Unidecode.decode("Обзор тематики"));
assertEquals("GERMANSKIE IaZYKI", Unidecode.decode("ГЕРМАНСКИЕ ЯЗЫКИ"));
assertEquals("Diereunese tes ikanopoieses", Unidecode.decode("Διερεύνηση της ικανοποίησης"));
assertEquals("lqDy l'wly@", Unidecode.decode("القضايا الأولية"));
assertEquals("abc def ghi", Unidecode.decode("abc def ghi"));
}
@Test
void testDateValidation() {
public void testDateValidation() {
assertTrue(GraphCleaningFunctions.doCleanDate("2016-05-07T12:41:19.202Z ").isPresent());
assertTrue(GraphCleaningFunctions.doCleanDate("2020-09-10 11:08:52 ").isPresent());
@ -147,46 +132,44 @@ class OafMapperUtilsTest {
}
@Test
void testDate() {
final String date = GraphCleaningFunctions.cleanDate("23-FEB-1998");
assertNotNull(date);
System.out.println(date);
public void testDate() {
System.out.println(GraphCleaningFunctions.cleanDate("23-FEB-1998"));
}
@Test
void testMergePubs() throws IOException {
public void testMergePubs() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
Publication p2 = read("publication_2.json", Publication.class);
Dataset d1 = read("dataset_1.json", Dataset.class);
Dataset d2 = read("dataset_2.json", Dataset.class);
assertEquals(1, p1.getCollectedfrom().size());
assertEquals(ModelConstants.CROSSREF_ID, p1.getCollectedfrom().get(0).getKey());
assertEquals(1, d2.getCollectedfrom().size());
assertEquals(p1.getCollectedfrom().size(), 1);
assertEquals(p1.getCollectedfrom().get(0).getKey(), ModelConstants.CROSSREF_ID);
assertEquals(d2.getCollectedfrom().size(), 1);
assertFalse(cfId(d2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(
ModelConstants.PUBLICATION_RESULTTYPE_CLASSID,
assertTrue(
OafMapperUtils
.mergeResults(p1, d2)
.getResulttype()
.getClassid());
.getClassid()
.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID));
assertEquals(1, p2.getCollectedfrom().size());
assertEquals(p2.getCollectedfrom().size(), 1);
assertFalse(cfId(p2.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(1, d1.getCollectedfrom().size());
assertEquals(d1.getCollectedfrom().size(), 1);
assertTrue(cfId(d1.getCollectedfrom()).contains(ModelConstants.CROSSREF_ID));
assertEquals(
ModelConstants.DATASET_RESULTTYPE_CLASSID,
assertTrue(
OafMapperUtils
.mergeResults(p2, d1)
.getResulttype()
.getClassid());
.getClassid()
.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID));
}
protected HashSet<String> cfId(List<KeyValue> collectedfrom) {
return collectedfrom.stream().map(KeyValue::getKey).collect(Collectors.toCollection(HashSet::new));
return collectedfrom.stream().map(c -> c.getKey()).collect(Collectors.toCollection(HashSet::new));
}
protected <T extends Result> T read(String filename, Class<T> clazz) throws IOException {

View File

@ -3,10 +3,10 @@ package eu.dnetlib.scholexplorer.relation;
import org.junit.jupiter.api.Test;
class RelationMapperTest {
public class RelationMapperTest {
@Test
void testLoadRels() throws Exception {
public void testLoadRels() throws Exception {
RelationMapper relationMapper = RelationMapper.load();
relationMapper.keySet().forEach(System.out::println);

View File

@ -3,37 +3,40 @@ package eu.dnetlib.dhp.actionmanager;
import java.io.Serializable;
import java.io.StringReader;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Triple;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.actionmanager.rmi.ActionManagerException;
import eu.dnetlib.actionmanager.set.ActionManagerSet;
import eu.dnetlib.actionmanager.set.ActionManagerSet.ImpactTypes;
import eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJob;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
public class ISClient implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ISClient.class);
private static final Logger log = LoggerFactory.getLogger(PartitionActionSetsByPayloadTypeJob.class);
private static final String INPUT_ACTION_SET_ID_SEPARATOR = ",";
private final transient ISLookUpService isLookup;
private final ISLookUpService isLookup;
public ISClient(String isLookupUrl) {
isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -60,7 +63,7 @@ public class ISClient implements Serializable {
.map(
sets -> sets
.stream()
.map(ISClient::parseSetInfo)
.map(set -> parseSetInfo(set))
.filter(t -> ids.contains(t.getLeft()))
.map(t -> buildDirectory(basePath, t))
.collect(Collectors.toList()))
@ -70,17 +73,15 @@ public class ISClient implements Serializable {
}
}
private static Triple<String, String, String> parseSetInfo(String set) {
private Triple<String, String, String> parseSetInfo(String set) {
try {
final SAXReader reader = new SAXReader();
reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
Document doc = reader.read(new StringReader(set));
Document doc = new SAXReader().read(new StringReader(set));
return Triple
.of(
doc.valueOf("//SET/@id"),
doc.valueOf("//SET/@directory"),
doc.valueOf("//SET/@latest"));
} catch (DocumentException | SAXException e) {
} catch (DocumentException e) {
throw new IllegalStateException(e);
}
}
@ -98,7 +99,7 @@ public class ISClient implements Serializable {
final String q = "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ActionManagerServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
+ propertyName
+ "']/@value/string()";
log.debug("quering for service property: {}", q);
log.debug("quering for service property: " + q);
try {
final List<String> value = isLookup.quickSearchProfile(q);
return Iterables.getOnlyElement(value);

View File

@ -62,7 +62,6 @@ public class MergeAndGet {
x.getClass().getCanonicalName(), y.getClass().getCanonicalName()));
}
@SuppressWarnings("unchecked")
private static <G extends Oaf, A extends Oaf> G selectNewerAndGet(G x, A y) {
if (x.getClass().equals(y.getClass())
&& x.getLastupdatetimestamp() > y.getLastupdatetimestamp()) {

View File

@ -74,9 +74,7 @@ public class PromoteActionPayloadForGraphTableJob {
.orElse(true);
logger.info("shouldGroupById: {}", shouldGroupById);
@SuppressWarnings("unchecked")
Class<? extends Oaf> rowClazz = (Class<? extends Oaf>) Class.forName(graphTableClassName);
@SuppressWarnings("unchecked")
Class<? extends Oaf> actionPayloadClazz = (Class<? extends Oaf>) Class.forName(actionPayloadClassName);
throwIfGraphTableClassIsNotSubClassOfActionPayloadClass(rowClazz, actionPayloadClazz);
@ -154,7 +152,7 @@ public class PromoteActionPayloadForGraphTableJob {
return spark
.read()
.parquet(path)
.map((MapFunction<Row, String>) PromoteActionPayloadForGraphTableJob::extractPayload, Encoders.STRING())
.map((MapFunction<Row, String>) value -> extractPayload(value), Encoders.STRING())
.map(
(MapFunction<String, A>) value -> decodePayload(actionPayloadClazz, value),
Encoders.bean(actionPayloadClazz));

View File

@ -107,7 +107,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=5000
--conf spark.sql.shuffle.partitions=2560
</spark-opts>
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
@ -159,7 +159,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=5000
--conf spark.sql.shuffle.partitions=2560
</spark-opts>
<arg>--inputGraphTablePath</arg><arg>${workingDir}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>

View File

@ -99,7 +99,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=5000
--conf spark.sql.shuffle.partitions=2560
</spark-opts>
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>

View File

@ -80,7 +80,7 @@ public class PartitionActionSetsByPayloadTypeJobTest {
private ISClient isClient;
@Test
void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
// given
Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets");
Path outputDir = workingDir.resolve("output");

View File

@ -20,7 +20,7 @@ public class MergeAndGetTest {
class MergeFromAndGetStrategy {
@Test
void shouldThrowForOafAndOaf() {
public void shouldThrowForOafAndOaf() {
// given
Oaf a = mock(Oaf.class);
Oaf b = mock(Oaf.class);
@ -33,7 +33,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForOafAndRelation() {
public void shouldThrowForOafAndRelation() {
// given
Oaf a = mock(Oaf.class);
Relation b = mock(Relation.class);
@ -46,7 +46,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForOafAndOafEntity() {
public void shouldThrowForOafAndOafEntity() {
// given
Oaf a = mock(Oaf.class);
OafEntity b = mock(OafEntity.class);
@ -59,7 +59,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForRelationAndOaf() {
public void shouldThrowForRelationAndOaf() {
// given
Relation a = mock(Relation.class);
Oaf b = mock(Oaf.class);
@ -72,7 +72,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForRelationAndOafEntity() {
public void shouldThrowForRelationAndOafEntity() {
// given
Relation a = mock(Relation.class);
OafEntity b = mock(OafEntity.class);
@ -85,7 +85,7 @@ public class MergeAndGetTest {
}
@Test
void shouldBehaveProperlyForRelationAndRelation() {
public void shouldBehaveProperlyForRelationAndRelation() {
// given
Relation a = mock(Relation.class);
Relation b = mock(Relation.class);
@ -101,7 +101,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForOafEntityAndOaf() {
public void shouldThrowForOafEntityAndOaf() {
// given
OafEntity a = mock(OafEntity.class);
Oaf b = mock(Oaf.class);
@ -114,7 +114,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForOafEntityAndRelation() {
public void shouldThrowForOafEntityAndRelation() {
// given
OafEntity a = mock(OafEntity.class);
Relation b = mock(Relation.class);
@ -127,7 +127,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForOafEntityAndOafEntityButNotSubclasses() {
public void shouldThrowForOafEntityAndOafEntityButNotSubclasses() {
// given
class OafEntitySub1 extends OafEntity {
}
@ -145,7 +145,7 @@ public class MergeAndGetTest {
}
@Test
void shouldBehaveProperlyForOafEntityAndOafEntity() {
public void shouldBehaveProperlyForOafEntityAndOafEntity() {
// given
OafEntity a = mock(OafEntity.class);
OafEntity b = mock(OafEntity.class);
@ -165,7 +165,7 @@ public class MergeAndGetTest {
class SelectNewerAndGetStrategy {
@Test
void shouldThrowForOafEntityAndRelation() {
public void shouldThrowForOafEntityAndRelation() {
// given
OafEntity a = mock(OafEntity.class);
Relation b = mock(Relation.class);
@ -178,7 +178,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForRelationAndOafEntity() {
public void shouldThrowForRelationAndOafEntity() {
// given
Relation a = mock(Relation.class);
OafEntity b = mock(OafEntity.class);
@ -191,7 +191,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowForOafEntityAndResult() {
public void shouldThrowForOafEntityAndResult() {
// given
OafEntity a = mock(OafEntity.class);
Result b = mock(Result.class);
@ -204,7 +204,7 @@ public class MergeAndGetTest {
}
@Test
void shouldThrowWhenSuperTypeIsNewerForResultAndOafEntity() {
public void shouldThrowWhenSuperTypeIsNewerForResultAndOafEntity() {
// given
// real types must be used because subclass-superclass resolution does not work for
// mocks
@ -221,7 +221,7 @@ public class MergeAndGetTest {
}
@Test
void shouldShouldReturnLeftForOafEntityAndOafEntity() {
public void shouldShouldReturnLeftForOafEntityAndOafEntity() {
// given
OafEntity a = mock(OafEntity.class);
when(a.getLastupdatetimestamp()).thenReturn(1L);
@ -238,7 +238,7 @@ public class MergeAndGetTest {
}
@Test
void shouldShouldReturnRightForOafEntityAndOafEntity() {
public void shouldShouldReturnRightForOafEntityAndOafEntity() {
// given
OafEntity a = mock(OafEntity.class);
when(a.getLastupdatetimestamp()).thenReturn(2L);

View File

@ -77,7 +77,7 @@ public class PromoteActionPayloadForGraphTableJobTest {
class Main {
@Test
void shouldThrowWhenGraphTableClassIsNotASubClassOfActionPayloadClass() {
public void shouldThrowWhenGraphTableClassIsNotASubClassOfActionPayloadClass() {
// given
Class<Relation> rowClazz = Relation.class;
Class<OafEntity> actionPayloadClazz = OafEntity.class;
@ -116,7 +116,7 @@ public class PromoteActionPayloadForGraphTableJobTest {
@ParameterizedTest(name = "strategy: {0}, graph table: {1}, action payload: {2}")
@MethodSource("eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest#promoteJobTestParams")
void shouldPromoteActionPayloadForGraphTable(
public void shouldPromoteActionPayloadForGraphTable(
MergeAndGet.Strategy strategy,
Class<? extends Oaf> rowClazz,
Class<? extends Oaf> actionPayloadClazz)

View File

@ -44,7 +44,7 @@ public class PromoteActionPayloadFunctionsTest {
class JoinTableWithActionPayloadAndMerge {
@Test
void shouldThrowWhenTableTypeIsNotSubtypeOfActionPayloadType() {
public void shouldThrowWhenTableTypeIsNotSubtypeOfActionPayloadType() {
// given
class OafImpl extends Oaf {
}
@ -58,7 +58,7 @@ public class PromoteActionPayloadFunctionsTest {
}
@Test
void shouldRunProperlyWhenActionPayloadTypeAndTableTypeAreTheSame() {
public void shouldRunProperlyWhenActionPayloadTypeAndTableTypeAreTheSame() {
// given
String id0 = "id0";
String id1 = "id1";
@ -138,7 +138,7 @@ public class PromoteActionPayloadFunctionsTest {
}
@Test
void shouldRunProperlyWhenActionPayloadTypeIsSuperTypeOfTableType() {
public void shouldRunProperlyWhenActionPayloadTypeIsSuperTypeOfTableType() {
// given
String id0 = "id0";
String id1 = "id1";
@ -218,7 +218,7 @@ public class PromoteActionPayloadFunctionsTest {
class GroupTableByIdAndMerge {
@Test
void shouldRunProperly() {
public void shouldRunProperly() {
// given
String id1 = "id1";
String id2 = "id2";

View File

@ -29,13 +29,6 @@
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>scala-doc</id>
<phase>process-resources</phase> <!-- or wherever -->
<goals>
<goal>doc</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
@ -91,6 +84,14 @@
<artifactId>json</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>

View File

@ -4,7 +4,6 @@ package eu.dnetlib.dhp.actionmanager.bipfinder;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
@ -29,16 +28,15 @@ import eu.dnetlib.dhp.schema.oaf.Result;
public class CollectAndSave implements Serializable {
private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
Objects
.requireNonNull(
CollectAndSave.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json")));
CollectAndSave.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);

View File

@ -87,7 +87,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath,
String bipScorePath, Class<I> inputClazz) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<BipDeserialize> bipDeserializeJavaRDD = sc
.textFile(bipScorePath)
@ -101,6 +101,8 @@ public class SparkAtomicActionScoreJob implements Serializable {
return bs;
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
System.out.println(bipScores.count());
Dataset<I> results = readPath(spark, inputPath, inputClazz);
results.createOrReplaceTempView("result");
@ -122,7 +124,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
ret.setId(value._2().getId());
return ret;
}, Encoders.bean(BipScore.class))
.groupByKey((MapFunction<BipScore, String>) BipScore::getId, Encoders.STRING())
.groupByKey((MapFunction<BipScore, String>) value -> value.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, BipScore, Result>) (k, it) -> {
Result ret = new Result();
ret.setDataInfo(getDataInfo());

View File

@ -1,49 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import java.util.Optional;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class Constants {
public static final String DOI = "doi";
public static final String UPDATE_DATA_INFO_TYPE = "update";
public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
public static final String FOS_CLASS_ID = "FOS";
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
public static final String NULL = "NULL";
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private Constants() {
}
public static Boolean isSparkSessionManaged(ArgumentApplicationParser parser) {
return Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -1,77 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.GetCSV;
public class GetFOSData implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetFOSData.class);
public static final char DEFAULT_DELIMITER = '\t';
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
GetFOSData.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json"))));
parser.parseArgument(args);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String classForName = parser.get("classForName");
log.info("classForName {}", classForName);
final char delimiter = Optional
.ofNullable(parser.get("delimiter"))
.map(s -> s.charAt(0))
.orElse(DEFAULT_DELIMITER);
log.info("delimiter {}", delimiter);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
new GetFOSData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem);
}
public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs)
throws IOException, ClassNotFoundException {
// reads the csv and writes it as its json equivalent
try (InputStreamReader reader = new InputStreamReader(fs.open(new Path(inputPath)))) {
GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter);
}
}
}

View File

@ -1,145 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.*;
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.UPDATE_CLASS_NAME;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdfs.client.HdfsUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipDeserialize;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.BipScore;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
public class PrepareBipFinder implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareBipFinder.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareBipFinder.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}: ", sourcePath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
prepareResults(spark, sourcePath, outputPath);
});
}
private static <I extends Result> void prepareResults(SparkSession spark, String inputPath, String outputPath) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<BipDeserialize> bipDeserializeJavaRDD = sc
.textFile(inputPath)
.map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class));
spark
.createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> {
BipScore bs = new BipScore();
bs.setId(key);
bs.setScoreList(entry.get(key));
return bs;
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class))
.map((MapFunction<BipScore, Result>) v -> {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(v.getId(), DOI));
r.setMeasures(getMeasure(v));
return r;
}, Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/bip");
}
private static List<Measure> getMeasure(BipScore value) {
return value
.getScoreList()
.stream()
.map(score -> {
Measure m = new Measure();
m.setId(score.getId());
m
.setUnit(
score
.getUnit()
.stream()
.map(unit -> {
KeyValue kv = new KeyValue();
kv.setValue(unit.getValue());
kv.setKey(unit.getKey());
kv
.setDataInfo(
OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_MEASURE_BIP_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
""));
return kv;
})
.collect(Collectors.toList()));
return m;
})
.collect(Collectors.toList());
}
}

View File

@ -1,133 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
public class PrepareFOSSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareFOSSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
distributeFOSdois(
spark,
sourcePath,
outputPath);
});
}
private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) {
Dataset<FOSDataModel> fosDataset = readPath(spark, sourcePath, FOSDataModel.class);
fosDataset.flatMap((FlatMapFunction<FOSDataModel, FOSDataModel>) v -> {
List<FOSDataModel> fosList = new ArrayList<>();
final String level1 = v.getLevel1();
final String level2 = v.getLevel2();
final String level3 = v.getLevel3();
Arrays
.stream(v.getDoi().split("\u0002"))
.forEach(d -> fosList.add(FOSDataModel.newInstance(d, level1, level2, level3)));
return fosList.iterator();
}, Encoders.bean(FOSDataModel.class))
.map((MapFunction<FOSDataModel, Result>) value -> {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(value.getDoi(), DOI));
r.setSubject(getSubjects(value));
return r;
}, Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/fos");
}
private static List<StructuredProperty> getSubjects(FOSDataModel fos) {
return Arrays
.asList(getSubject(fos.getLevel1()), getSubject(fos.getLevel2()), getSubject(fos.getLevel3()))
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static StructuredProperty getSubject(String sbj) {
if (sbj.equals(NULL))
return null;
StructuredProperty sp = new StructuredProperty();
sp.setValue(sbj);
sp
.setQualifier(
OafMapperUtils
.qualifier(
FOS_CLASS_ID,
FOS_CLASS_NAME,
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
ModelConstants.DNET_SUBJECT_TYPOLOGIES));
sp
.setDataInfo(
OafMapperUtils
.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils
.qualifier(
UPDATE_SUBJECT_FOS_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
""));
return sp;
}
}

View File

@ -1,79 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.createunresolvedentities.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkSaveUnresolved implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareFOSSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/produce_unresolved_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
saveUnresolved(
spark,
sourcePath,
outputPath);
});
}
private static void saveUnresolved(SparkSession spark, String sourcePath, String outputPath) {
spark
.read()
.textFile(sourcePath + "/*")
.map(
(MapFunction<String, Result>) l -> OBJECT_MAPPER.readValue(l, Result.class),
Encoders.bean(Result.class))
.groupByKey((MapFunction<Result, String>) r -> r.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
Result ret = it.next();
it.forEachRemaining(r -> ret.mergeFrom(r));
return ret;
}, Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -1,28 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Class that maps the model of the bipFinder! input data.
* Only needed for deserialization purposes
*/
public class BipDeserialize extends HashMap<String, List<Score>> implements Serializable {
public BipDeserialize() {
super();
}
public List<Score> get(String key) {
if (super.get(key) == null) {
return new ArrayList<>();
}
return super.get(key);
}
}

View File

@ -1,30 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import java.io.Serializable;
import java.util.List;
/**
* Rewriting of the bipFinder input data by extracting the identifier of the result (doi)
*/
public class BipScore implements Serializable {
private String id; // doi
private List<Score> scoreList; // unit as given in the inputfile
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<Score> getScoreList() {
return scoreList;
}
public void setScoreList(List<Score> scoreList) {
this.scoreList = scoreList;
}
}

View File

@ -1,71 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import java.io.Serializable;
import com.opencsv.bean.CsvBindByPosition;
public class FOSDataModel implements Serializable {
@CsvBindByPosition(position = 1)
// @CsvBindByName(column = "doi")
private String doi;
@CsvBindByPosition(position = 2)
// @CsvBindByName(column = "level1")
private String level1;
@CsvBindByPosition(position = 3)
// @CsvBindByName(column = "level2")
private String level2;
@CsvBindByPosition(position = 4)
// @CsvBindByName(column = "level3")
private String level3;
public FOSDataModel() {
}
public FOSDataModel(String doi, String level1, String level2, String level3) {
this.doi = doi;
this.level1 = level1;
this.level2 = level2;
this.level3 = level3;
}
public static FOSDataModel newInstance(String d, String level1, String level2, String level3) {
return new FOSDataModel(d, level1, level2, level3);
}
public String getDoi() {
return doi;
}
public void setDoi(String doi) {
this.doi = doi;
}
public String getLevel1() {
return level1;
}
public void setLevel1(String level1) {
this.level1 = level1;
}
public String getLevel2() {
return level2;
}
public void setLevel2(String level2) {
this.level2 = level2;
}
public String getLevel3() {
return level3;
}
public void setLevel3(String level3) {
this.level3 = level3;
}
}

View File

@ -1,26 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import java.io.Serializable;
public class KeyValue implements Serializable {
private String key;
private String value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@ -1,30 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import java.io.Serializable;
import java.util.List;
/**
* represents the score in the input file
*/
public class Score implements Serializable {
private String id;
private List<KeyValue> unit;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public List<KeyValue> getUnit() {
return unit;
}
public void setUnit(List<KeyValue> unit) {
this.unit = unit;
}
}

View File

@ -0,0 +1,86 @@
package eu.dnetlib.dhp.actionmanager.datacite
import org.apache.commons.io.IOUtils
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients
import java.io.IOException
abstract class AbstractRestClient extends Iterator[String]{
var buffer: List[String] = List()
var current_index:Int = 0
var scroll_value: Option[String] = None
var complete:Boolean = false
def extractInfo(input: String): Unit
protected def getBufferData(): Unit
def doHTTPGETRequest(url:String): String = {
val httpGet = new HttpGet(url)
doHTTPRequest(httpGet)
}
def doHTTPPOSTRequest(url:String, json:String): String = {
val httpPost = new HttpPost(url)
if (json != null) {
val entity = new StringEntity(json)
httpPost.setEntity(entity)
httpPost.setHeader("Accept", "application/json")
httpPost.setHeader("Content-type", "application/json")
}
doHTTPRequest(httpPost)
}
def hasNext: Boolean = {
buffer.nonEmpty && current_index < buffer.size
}
override def next(): String = {
val next_item:String = buffer(current_index)
current_index = current_index + 1
if (current_index == buffer.size)
getBufferData()
next_item
}
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
val client = HttpClients.createDefault
var tries = 4
try {
while (tries > 0) {
println(s"requesting ${r.getURI}")
val response = client.execute(r)
println(s"get response with status${response.getStatusLine.getStatusCode}")
if (response.getStatusLine.getStatusCode > 400) {
tries -= 1
}
else
return IOUtils.toString(response.getEntity.getContent)
}
""
} catch {
case e: Throwable =>
throw new RuntimeException("Error on executing request ", e)
} finally try client.close()
catch {
case e: IOException =>
throw new RuntimeException("Unable to close client ", e)
}
}
getBufferData()
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.datacite
package eu.dnetlib.dhp.actionmanager.datacite
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.json4s.{DefaultFormats, JValue}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10, until:Long = -1) extends AbstractRestClient {

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.datacite
package eu.dnetlib.dhp.actionmanager.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
@ -325,9 +325,8 @@ object DataciteToOAFTransformation {
val grantId = m.matcher(awardUri).replaceAll("$2")
val targetId = s"$p${DHPUtils.md5(grantId)}"
List(
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)
// REMOVED INVERSE RELATION since there is a specific method that should generate later
// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo),
generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
)
}
else
@ -368,7 +367,7 @@ object DataciteToOAFTransformation {
result.setDateofcollection(ISO8601FORMAT.format(d))
result.setDateoftransformation(ISO8601FORMAT.format(d))
result.setDateoftransformation(ISO8601FORMAT.format(ts))
result.setDataInfo(dataInfo)
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())
@ -581,11 +580,11 @@ object DataciteToOAFTransformation {
rel.setProperties(List(dateProps).asJava)
rel.setSource(id)
rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType))
rel.setTarget(s"unresolved::${r.relatedIdentifier}::${r.relatedIdentifierType}")
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
rel.getCollectedfrom.asScala.map(c => c.getValue)(collection.breakOut)
rel
}).toList
})(collection breakOut)
}
def generateDataInfo(trust: String): DataInfo = {

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object ExportActionSetJobNode {
val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(ExportActionSetJobNode.getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o =>DataciteToOAFTransformation.toActionSet(o))
.filter(o => o!= null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}
}

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object FilterCrossrefEntitiesSpark {
val log: Logger = LoggerFactory.getLogger(getClass.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/filter_crossref_param.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
log.info("sourcePath: {}", sourcePath)
val targetPath = parser.get("targetPath")
log.info("targetPath: {}", targetPath)
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resEncoder: Encoder[Result] = Encoders.kryo[Result]
val d:Dataset[Oaf]= spark.read.load(sourcePath).as[Oaf]
d.filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]).write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -1,14 +1,9 @@
package eu.dnetlib.dhp.datacite
package eu.dnetlib.dhp.actionmanager.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@ -22,10 +17,11 @@ object GenerateDataciteDatasetSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
@ -37,28 +33,16 @@ object GenerateDataciteDatasetSpark {
.master(master)
.getOrCreate()
import spark.implicits._
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info("outputBasePath: {}", outputBasePath)
val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH"
import spark.implicits._
spark.read.load(sourcePath).as[DataciteType]
.filter(d => d.isActive)
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
.filter(d => d != null)
.flatMap(i => fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
val total_items = spark.read.load(targetPath).as[Oaf].count()
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
}
}
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.datacite
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.actionmanager.datacite.DataciteToOAFTransformation.df_it
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
@ -8,14 +9,14 @@ import org.apache.hadoop.io.{IntWritable, SequenceFile, Text}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.apache.spark.sql.functions.max
import org.slf4j.{Logger, LoggerFactory}
import java.time.format.DateTimeFormatter.ISO_DATE_TIME
import java.time.{LocalDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter._
import java.time.{LocalDate, LocalDateTime, ZoneOffset}
import scala.io.Source
object ImportDatacite {
@ -137,11 +138,11 @@ object ImportDatacite {
}
}
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs: Int): Long = {
var from: Long = timestamp * 1000
val delta: Long = 100000000L
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = {
var from:Long = timestamp * 1000
val delta:Long = 50000000L
var client: DataciteAPIImporter = null
val now: Long = System.currentTimeMillis()
val now :Long =System.currentTimeMillis()
var i = 0
try {
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
@ -167,7 +168,7 @@ object ImportDatacite {
start = System.currentTimeMillis
}
}
println(s"updating from value: $from -> ${from + delta}")
println(s"updating from value: $from -> ${from+delta}")
from = from + delta
}
} catch {
@ -182,4 +183,4 @@ object ImportDatacite {
i
}
}
}

View File

@ -1,181 +0,0 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import scala.Tuple2;
public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
private static final String ID_PREFIX = "50|doi_________::";
private static final String TRUST = "0.91";
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws IOException, ParseException {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
CreateActionSetSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json"))));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}", inputPath.toString());
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final boolean shouldDuplicateRels = Optional
.ofNullable(parser.get("shouldDuplicateRels"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
extractContent(spark, inputPath, outputPath, shouldDuplicateRels);
});
}
private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) {
spark
.sqlContext()
.createDataset(spark.sparkContext().textFile(inputPath + "/*", 6000), Encoders.STRING())
.flatMap(
(FlatMapFunction<String, Relation>) value -> createRelation(value, shouldDuplicateRels).iterator(),
Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null)
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
private static List<Relation> createRelation(String value, boolean duplicate) {
String[] line = value.split(",");
if (!line[1].startsWith("10.")) {
return new ArrayList<>();
}
List<Relation> relationList = new ArrayList<>();
String citing = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[1]));
final String cited = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", line[2]));
relationList
.addAll(
getRelations(
citing,
cited));
if (duplicate && line[1].endsWith(".refs")) {
citing = ID_PREFIX + IdentifierFactory
.md5(CleaningFunctions.normalizePidValue("doi", line[1].substring(0, line[1].indexOf(".refs"))));
relationList.addAll(getRelations(citing, cited));
}
return relationList;
}
private static Collection<Relation> getRelations(String citing, String cited) {
return Arrays
.asList(
getRelation(citing, cited, ModelConstants.CITES),
getRelation(cited, citing, ModelConstants.IS_CITED_BY));
}
public static Relation getRelation(
String source,
String target,
String relclass) {
Relation r = new Relation();
r.setCollectedfrom(getCollectedFrom());
r.setSource(source);
r.setTarget(target);
r.setRelClass(relclass);
r.setRelType(ModelConstants.RESULT_RESULT);
r.setSubRelType(ModelConstants.CITATION);
r
.setDataInfo(
getDataInfo());
return r;
}
public static List<KeyValue> getCollectedFrom() {
KeyValue kv = new KeyValue();
kv.setKey(ModelConstants.OPENOCITATIONS_ID);
kv.setValue(ModelConstants.OPENOCITATIONS_NAME);
return Arrays.asList(kv);
}
public static DataInfo getDataInfo() {
DataInfo di = new DataInfo();
di.setInferred(false);
di.setDeletedbyinference(false);
di.setTrust(TRUST);
di
.setProvenanceaction(
getQualifier(OPENCITATIONS_CLASSID, OPENCITATIONS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS));
return di;
}
public static Qualifier getQualifier(String class_id, String class_name,
String qualifierSchema) {
Qualifier pa = new Qualifier();
pa.setClassid(class_id);
pa.setClassname(class_name);
pa.setSchemeid(qualifierSchema);
pa.setSchemename(qualifierSchema);
return pa;
}
}

View File

@ -1,93 +0,0 @@
package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.*;
import java.io.Serializable;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class GetOpenCitationsRefs implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetOpenCitationsRefs.class);
public static void main(final String[] args) throws IOException, ParseException {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
GetOpenCitationsRefs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/opencitations/input_parameters.json"))));
parser.parseArgument(args);
final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString());
final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
GetOpenCitationsRefs ocr = new GetOpenCitationsRefs();
for (String file : inputFile) {
ocr.doExtract(workingPath + "/Original/" + file, workingPath, fileSystem);
}
}
private void doExtract(String inputFile, String workingPath, FileSystem fileSystem)
throws IOException {
final Path path = new Path(inputFile);
FSDataInputStream oc_zip = fileSystem.open(path);
int count = 1;
try (ZipInputStream zis = new ZipInputStream(oc_zip)) {
ZipEntry entry = null;
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory()) {
String fileName = entry.getName();
fileName = fileName.substring(0, fileName.indexOf("T")) + "_" + count;
count++;
try (
FSDataOutputStream out = fileSystem
.create(new Path(workingPath + "/COCI/" + fileName + ".gz"));
GZIPOutputStream gzipOs = new GZIPOutputStream(new BufferedOutputStream(out))) {
IOUtils.copy(zis, gzipOs);
}
}
}
}
}
}

View File

@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import scala.Tuple2;
@ -171,23 +171,26 @@ public class PrepareProgramme {
}
private static CSVProgramme groupProgrammeByCode(CSVProgramme a, CSVProgramme b) {
if (!a.getLanguage().equals("en") && b.getLanguage().equalsIgnoreCase("en")) {
a.setTitle(b.getTitle());
a.setLanguage(b.getLanguage());
if (!a.getLanguage().equals("en")) {
if (b.getLanguage().equalsIgnoreCase("en")) {
a.setTitle(b.getTitle());
a.setLanguage(b.getLanguage());
}
}
if (StringUtils.isEmpty(a.getShortTitle()) && !StringUtils.isEmpty(b.getShortTitle())) {
a.setShortTitle(b.getShortTitle());
if (StringUtils.isEmpty(a.getShortTitle())) {
if (!StringUtils.isEmpty(b.getShortTitle())) {
a.setShortTitle(b.getShortTitle());
}
}
return a;
}
@SuppressWarnings("unchecked")
private static List<CSVProgramme> prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
Object[] codedescription = h2020Programmes
.map(
value -> new Tuple2<>(value.getCode(),
new Tuple2<>(value.getTitle(), value.getShortTitle())))
new Tuple2<String, String>(value.getTitle(), value.getShortTitle())))
.collect()
.toArray();
@ -213,7 +216,7 @@ public class PrepareProgramme {
String[] tmp = ent.split("\\.");
if (tmp.length <= 2) {
if (StringUtils.isEmpty(entry._2()._2())) {
map.put(entry._1(), new Tuple2<>(entry._2()._1(), entry._2()._1()));
map.put(entry._1(), new Tuple2<String, String>(entry._2()._1(), entry._2()._1()));
} else {
map.put(entry._1(), entry._2());
}

View File

@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import scala.Tuple2;
@ -29,7 +29,7 @@ import scala.Tuple2;
*/
public class PrepareProjects {
private static final Logger log = LoggerFactory.getLogger(PrepareProjects.class);
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {

View File

@ -31,16 +31,15 @@ import eu.dnetlib.dhp.common.DbClient;
*/
public class ReadProjectsFromDB implements Closeable {
private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class);
private static final String query = "SELECT code " +
"from projects where id like 'corda__h2020%' ";
private final DbClient dbClient;
private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class);
private final Configuration conf;
private final BufferedWriter writer;
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final static String query = "SELECT code " +
"from projects where id like 'corda__h2020%' ";
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@ -66,9 +65,9 @@ public class ReadProjectsFromDB implements Closeable {
}
}
public void execute(final String sql, final Function<ResultSet, List<ProjectSubset>> producer) {
public void execute(final String sql, final Function<ResultSet, List<ProjectSubset>> producer) throws Exception {
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(this::writeProject);
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r));
dbClient.processResults(sql, consumer);
}
@ -95,20 +94,20 @@ public class ReadProjectsFromDB implements Closeable {
public ReadProjectsFromDB(
final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
throws IOException {
throws Exception {
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
this.conf = new Configuration();
this.conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(this.conf);
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, false);
}
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
fsDataOutputStream = fileSystem.create(hdfsWritePath);
this.writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
}
@Override

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
@ -21,16 +22,15 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.H2020Classification;
import eu.dnetlib.dhp.schema.oaf.H2020Programme;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
@ -47,10 +47,13 @@ import scala.Tuple2;
*
* To produce one single entry for each project code a step of groupoing is needed: each project can be associated to more
* than one programme.
*
*
*/
public class SparkAtomicActionJob {
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final HashMap<String, String> programmeMap = new HashMap<>();
public static void main(String[] args) throws Exception {
@ -134,6 +137,7 @@ public class SparkAtomicActionJob {
h2020classification.setClassification(csvProgramme.getClassification());
h2020classification.setH2020Programme(pm);
setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short());
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
pp.setH2020classification(Arrays.asList(h2020classification));
return pp;
@ -148,16 +152,20 @@ public class SparkAtomicActionJob {
.map((MapFunction<Tuple2<Project, EXCELTopic>, Project>) p -> {
Optional<EXCELTopic> op = Optional.ofNullable(p._2());
Project rp = p._1();
op.ifPresent(excelTopic -> rp.setH2020topicdescription(excelTopic.getTitle()));
if (op.isPresent()) {
rp.setH2020topicdescription(op.get().getTitle());
}
return rp;
}, Encoders.bean(Project.class))
.filter(Objects::nonNull)
.groupByKey(
(MapFunction<Project, String>) OafEntity::getId,
(MapFunction<Project, String>) p -> p.getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Project, Project>) (s, it) -> {
Project first = it.next();
it.forEachRemaining(first::mergeFrom);
it.forEachRemaining(p -> {
first.mergeFrom(p);
});
return first;
}, Encoders.bean(Project.class))
.toJavaRDD()
@ -181,6 +189,12 @@ public class SparkAtomicActionJob {
h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
}
// private static void setProgramme(H2020Classification h2020Classification, String classification) {
// String[] tmp = classification.split(" \\| ");
//
// h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
// }
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang.reflect.FieldUtils;
/**
* Reads a generic csv and maps it into classes that mirror its schema
*/
public class CSVParser {
public <R> List<R> parse(String csvFile, String classForName)
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException {
final CSVFormat format = CSVFormat.EXCEL
.withHeader()
.withDelimiter(';')
.withQuote('"')
.withTrim();
List<R> ret = new ArrayList<>();
final org.apache.commons.csv.CSVParser parser = org.apache.commons.csv.CSVParser.parse(csvFile, format);
final Set<String> headers = parser.getHeaderMap().keySet();
Class<?> clazz = Class.forName(classForName);
for (CSVRecord csvRecord : parser.getRecords()) {
final Object cc = clazz.newInstance();
for (String header : headers) {
FieldUtils.writeField(cc, header, csvRecord.get(header), true);
}
ret.add((R) cc);
}
return ret;
}
}

View File

@ -1,32 +1,20 @@
package eu.dnetlib.dhp.actionmanager.project.utils.model;
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.Serializable;
import com.opencsv.bean.CsvBindByName;
import com.opencsv.bean.CsvIgnore;
/**
* The model for the programme csv file
*/
public class CSVProgramme implements Serializable {
@CsvBindByName(column = "code")
private String rcn;
private String code;
@CsvBindByName(column = "title")
private String title;
@CsvBindByName(column = "shortTitle")
private String shortTitle;
@CsvBindByName(column = "language")
private String language;
@CsvIgnore
private String classification;
@CsvIgnore
private String classification_short;
public String getClassification_short() {
@ -45,6 +33,14 @@ public class CSVProgramme implements Serializable {
this.classification = classification;
}
public String getRcn() {
return rcn;
}
public void setRcn(String rcn) {
this.rcn = rcn;
}
public String getCode() {
return code;
}
@ -77,4 +73,5 @@ public class CSVProgramme implements Serializable {
this.language = language;
}
//
}

View File

@ -0,0 +1,200 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.Serializable;
/**
* the mmodel for the projects csv file
*/
public class CSVProject implements Serializable {
private String rcn;
private String id;
private String acronym;
private String status;
private String programme;
private String topics;
private String frameworkProgramme;
private String title;
private String startDate;
private String endDate;
private String projectUrl;
private String objective;
private String totalCost;
private String ecMaxContribution;
private String call;
private String fundingScheme;
private String coordinator;
private String coordinatorCountry;
private String participants;
private String participantCountries;
private String subjects;
public String getRcn() {
return rcn;
}
public void setRcn(String rcn) {
this.rcn = rcn;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAcronym() {
return acronym;
}
public void setAcronym(String acronym) {
this.acronym = acronym;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getProgramme() {
return programme;
}
public void setProgramme(String programme) {
this.programme = programme;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getFrameworkProgramme() {
return frameworkProgramme;
}
public void setFrameworkProgramme(String frameworkProgramme) {
this.frameworkProgramme = frameworkProgramme;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getStartDate() {
return startDate;
}
public void setStartDate(String startDate) {
this.startDate = startDate;
}
public String getEndDate() {
return endDate;
}
public void setEndDate(String endDate) {
this.endDate = endDate;
}
public String getProjectUrl() {
return projectUrl;
}
public void setProjectUrl(String projectUrl) {
this.projectUrl = projectUrl;
}
public String getObjective() {
return objective;
}
public void setObjective(String objective) {
this.objective = objective;
}
public String getTotalCost() {
return totalCost;
}
public void setTotalCost(String totalCost) {
this.totalCost = totalCost;
}
public String getEcMaxContribution() {
return ecMaxContribution;
}
public void setEcMaxContribution(String ecMaxContribution) {
this.ecMaxContribution = ecMaxContribution;
}
public String getCall() {
return call;
}
public void setCall(String call) {
this.call = call;
}
public String getFundingScheme() {
return fundingScheme;
}
public void setFundingScheme(String fundingScheme) {
this.fundingScheme = fundingScheme;
}
public String getCoordinator() {
return coordinator;
}
public void setCoordinator(String coordinator) {
this.coordinator = coordinator;
}
public String getCoordinatorCountry() {
return coordinatorCountry;
}
public void setCoordinatorCountry(String coordinatorCountry) {
this.coordinatorCountry = coordinatorCountry;
}
public String getParticipants() {
return participants;
}
public void setParticipants(String participants) {
this.participants = participants;
}
public String getParticipantCountries() {
return participantCountries;
}
public void setParticipantCountries(String participantCountries) {
this.participantCountries = participantCountries;
}
public String getSubjects() {
return subjects;
}
public void setSubjects(String subjects) {
this.subjects = subjects;
}
}

View File

@ -17,8 +17,6 @@ import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import eu.dnetlib.dhp.actionmanager.project.utils.model.EXCELTopic;
/**
* Reads a generic excel file and maps it into classes that mirror its schema
*/
@ -28,52 +26,52 @@ public class EXCELParser {
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException,
InvalidFormatException {
try (OPCPackage pkg = OPCPackage.open(file); XSSFWorkbook wb = new XSSFWorkbook(pkg)) {
OPCPackage pkg = OPCPackage.open(file);
XSSFWorkbook wb = new XSSFWorkbook(pkg);
XSSFSheet sheet = wb.getSheet(sheetName);
XSSFSheet sheet = wb.getSheet(sheetName);
if (sheet == null) {
throw new IllegalArgumentException("Sheet name " + sheetName + " not present in current file");
}
if (sheetName == null) {
throw new RuntimeException("Sheet name " + sheetName + " not present in current file");
}
List<R> ret = new ArrayList<>();
List<R> ret = new ArrayList<>();
DataFormatter dataFormatter = new DataFormatter();
Iterator<Row> rowIterator = sheet.rowIterator();
List<String> headers = new ArrayList<>();
int count = 0;
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
DataFormatter dataFormatter = new DataFormatter();
Iterator<Row> rowIterator = sheet.rowIterator();
List<String> headers = new ArrayList<>();
int count = 0;
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
if (count == 0) {
Iterator<Cell> cellIterator = row.cellIterator();
if (count == 0) {
Iterator<Cell> cellIterator = row.cellIterator();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
headers.add(dataFormatter.formatCellValue(cell));
}
} else {
Class<?> clazz = Class.forName(classForName);
final Object cc = clazz.newInstance();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
headers.add(dataFormatter.formatCellValue(cell));
}
} else {
Class<?> clazz = Class.forName(classForName);
final Object cc = clazz.newInstance();
for (int i = 0; i < headers.size(); i++) {
Cell cell = row.getCell(i);
FieldUtils.writeField(cc, headers.get(i), dataFormatter.formatCellValue(cell), true);
}
EXCELTopic et = (EXCELTopic) cc;
if (StringUtils.isNotBlank(et.getRcn())) {
ret.add((R) cc);
}
for (int i = 0; i < headers.size(); i++) {
Cell cell = row.getCell(i);
FieldUtils.writeField(cc, headers.get(i), dataFormatter.formatCellValue(cell), true);
}
count += 1;
EXCELTopic et = (EXCELTopic) cc;
if (StringUtils.isNotBlank(et.getRcn())) {
ret.add((R) cc);
}
}
return ret;
count += 1;
}
return ret;
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.project.utils.model;
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.Serializable;

View File

@ -1,21 +1,34 @@
package eu.dnetlib.dhp.actionmanager.project.utils;
import java.io.*;
import java.util.Optional;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.GetCSV;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
import eu.dnetlib.dhp.collection.HttpConnector2;
/**
* Applies the parsing of a csv file and writes the Serialization of it in hdfs
*/
public class ReadCSV {
public class ReadCSV implements Closeable {
private static final Log log = LogFactory.getLog(ReadCSV.class);
private final Configuration conf;
private final BufferedWriter writer;
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final String csvFile;
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -31,22 +44,56 @@ public class ReadCSV {
final String hdfsPath = parser.get("hdfsPath");
final String hdfsNameNode = parser.get("hdfsNameNode");
final String classForName = parser.get("classForName");
Optional<String> delimiter = Optional.ofNullable(parser.get("delimiter"));
char del = ';';
if (delimiter.isPresent())
del = delimiter.get().charAt(0);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
try (final ReadCSV readCSV = new ReadCSV(hdfsPath, hdfsNameNode, fileURL)) {
FileSystem fileSystem = FileSystem.get(conf);
BufferedReader reader = new BufferedReader(
new InputStreamReader(new HttpConnector2().getInputSourceAsStream(fileURL)));
log.info("Getting CSV file...");
readCSV.execute(classForName);
GetCSV.getCsv(fileSystem, reader, hdfsPath, classForName, del);
}
}
reader.close();
public void execute(final String classForName) throws Exception {
CSVParser csvParser = new CSVParser();
csvParser
.parse(csvFile, classForName)
.stream()
.forEach(p -> write(p));
}
@Override
public void close() throws IOException {
writer.close();
}
public ReadCSV(
final String hdfsPath,
final String hdfsNameNode,
final String fileURL)
throws Exception {
this.conf = new Configuration();
this.conf.set("fs.defaultFS", hdfsNameNode);
HttpConnector2 httpConnector = new HttpConnector2();
FileSystem fileSystem = FileSystem.get(this.conf);
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, false);
}
fsDataOutputStream = fileSystem.create(hdfsWritePath);
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
this.csvFile = httpConnector.getInputSource(fileURL);
}
protected void write(final Object p) {
try {
writer.write(OBJECT_MAPPER.writeValueAsString(p));
writer.newLine();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -11,20 +11,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
import eu.dnetlib.dhp.collection.HttpConnector2;
/**
* Applies the parsing of an excel file and writes the Serialization of it in hdfs
*/
public class ReadExcel implements Closeable {
private static final Log log = LogFactory.getLog(ReadExcel.class);
private static final Log log = LogFactory.getLog(ReadCSV.class);
private final Configuration conf;
private final BufferedWriter writer;
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final InputStream excelFile;
@ -33,7 +31,7 @@ public class ReadExcel implements Closeable {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
ReadExcel.class
ReadCSV.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
@ -53,15 +51,13 @@ public class ReadExcel implements Closeable {
}
}
public void execute(final String classForName, final String sheetName)
throws IOException, ClassNotFoundException, InvalidFormatException, IllegalAccessException,
InstantiationException {
public void execute(final String classForName, final String sheetName) throws Exception {
EXCELParser excelParser = new EXCELParser();
excelParser
.parse(excelFile, classForName, sheetName)
.stream()
.forEach(this::write);
.forEach(p -> write(p));
}
@Override
@ -72,20 +68,20 @@ public class ReadExcel implements Closeable {
public ReadExcel(
final String hdfsPath,
final String hdfsNameNode,
final String fileURL) throws CollectorException, IOException {
final Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
final String fileURL)
throws Exception {
this.conf = new Configuration();
this.conf.set("fs.defaultFS", hdfsNameNode);
HttpConnector2 httpConnector = new HttpConnector2();
FileSystem fileSystem = FileSystem.get(conf);
FileSystem fileSystem = FileSystem.get(this.conf);
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, false);
}
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
fsDataOutputStream = fileSystem.create(hdfsWritePath);
this.writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
this.excelFile = httpConnector.getInputSourceAsStream(fileURL);
}

View File

@ -1,46 +0,0 @@
package eu.dnetlib.dhp.actionmanager.project.utils.model;
import java.io.Serializable;
import com.opencsv.bean.CsvBindByName;
/**
* the mmodel for the projects csv file
*/
public class CSVProject implements Serializable {
@CsvBindByName(column = "id")
private String id;
@CsvBindByName(column = "programme")
private String programme;
@CsvBindByName(column = "topics")
private String topics;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getProgramme() {
return programme;
}
public void setProgramme(String programme) {
this.programme = programme;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
}

View File

@ -9,7 +9,6 @@ import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
@ -75,7 +74,7 @@ public class GenerateRorActionSetJob {
final String jsonConfiguration = IOUtils
.toString(
GenerateRorActionSetJob.class
SparkAtomicActionJob.class
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/ror/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
@ -109,7 +108,7 @@ public class GenerateRorActionSetJob {
private static void processRorOrganizations(final SparkSession spark,
final String inputPath,
final String outputPath) throws IOException {
final String outputPath) throws Exception {
readInputPath(spark, inputPath)
.map(
@ -204,7 +203,7 @@ public class GenerateRorActionSetJob {
private static Dataset<RorOrganization> readInputPath(
final SparkSession spark,
final String path) throws IOException {
final String path) throws Exception {
try (final FileSystem fileSystem = FileSystem.get(new Configuration());
final InputStream is = fileSystem.open(new Path(path))) {

View File

@ -7,8 +7,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class Address implements Serializable {
private static final long serialVersionUID = 2444635485253443195L;
@JsonProperty("lat")
private Float lat;
@ -39,6 +37,8 @@ public class Address implements Serializable {
@JsonProperty("line")
private String line;
private final static long serialVersionUID = 2444635485253443195L;
public Float getLat() {
return lat;
}

View File

@ -7,14 +7,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class Country implements Serializable {
private static final long serialVersionUID = 4357848706229493627L;
@JsonProperty("country_code")
private String countryCode;
@JsonProperty("country_name")
private String countryName;
private final static long serialVersionUID = 4357848706229493627L;
public String getCountryCode() {
return countryCode;
}

View File

@ -13,7 +13,7 @@ public class ExternalIdType implements Serializable {
private String preferred;
private static final long serialVersionUID = 2616688352998387611L;
private final static long serialVersionUID = 2616688352998387611L;
public ExternalIdType() {
}

View File

@ -15,7 +15,8 @@ import com.fasterxml.jackson.databind.JsonNode;
public class ExternalIdTypeDeserializer extends JsonDeserializer<ExternalIdType> {
@Override
public ExternalIdType deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException {
public ExternalIdType deserialize(final JsonParser p, final DeserializationContext ctxt)
throws IOException, JsonProcessingException {
final ObjectCodec oc = p.getCodec();
final JsonNode node = oc.readTree(p);

View File

@ -19,7 +19,7 @@ public class GeonamesAdmin implements Serializable {
@JsonProperty("code")
private String code;
private static final long serialVersionUID = 7294958526269195673L;
private final static long serialVersionUID = 7294958526269195673L;
public String getAsciiName() {
return asciiName;

View File

@ -31,7 +31,7 @@ public class GeonamesCity implements Serializable {
@JsonProperty("license")
private License license;
private static final long serialVersionUID = -8389480201526252955L;
private final static long serialVersionUID = -8389480201526252955L;
public NameAndCode getNutsLevel2() {
return nutsLevel2;

View File

@ -13,7 +13,7 @@ public class Label implements Serializable {
@JsonProperty("label")
private String label;
private static final long serialVersionUID = -6576156103297850809L;
private final static long serialVersionUID = -6576156103297850809L;
public String getIso639() {
return iso639;

View File

@ -13,7 +13,7 @@ public class License implements Serializable {
@JsonProperty("license")
private String license;
private static final long serialVersionUID = -194308261058176439L;
private final static long serialVersionUID = -194308261058176439L;
public String getAttribution() {
return attribution;

View File

@ -7,14 +7,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class NameAndCode implements Serializable {
private static final long serialVersionUID = 5459836979206140843L;
@JsonProperty("name")
private String name;
@JsonProperty("code")
private String code;
private final static long serialVersionUID = 5459836979206140843L;
public String getName() {
return name;
}

View File

@ -7,8 +7,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class Relationship implements Serializable {
private static final long serialVersionUID = 7847399503395576960L;
@JsonProperty("type")
private String type;
@ -18,6 +16,8 @@ public class Relationship implements Serializable {
@JsonProperty("label")
private String label;
private final static long serialVersionUID = 7847399503395576960L;
public String getType() {
return type;
}

View File

@ -11,8 +11,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class RorOrganization implements Serializable {
private static final long serialVersionUID = -2658312087616043225L;
@JsonProperty("ip_addresses")
private List<String> ipAddresses = new ArrayList<>();
@ -61,6 +59,8 @@ public class RorOrganization implements Serializable {
@JsonProperty("status")
private String status;
private final static long serialVersionUID = -2658312087616043225L;
public List<String> getIpAddresses() {
return ipAddresses;
}

View File

@ -1,69 +0,0 @@
package eu.dnetlib.dhp.actionmanager.scholix
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object SparkCreateActionset {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val workingDirFolder = parser.get("workingDirFolder")
log.info(s"workingDirFolder -> $workingDirFolder")
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
log.info("extract source and target Identifier involved in relations")
log.info("save relation filtered")
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
log.info("saving entities")
val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
entities
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
.map(p => p._1._2)
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
}
}

View File

@ -1,86 +0,0 @@
package eu.dnetlib.dhp.actionmanager.scholix
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object SparkSaveActionSet {
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
item match {
case dataset: OafDataset =>
val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
a.setClazz(classOf[OafDataset])
a.setPayload(dataset)
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
case publication: Publication =>
val a: AtomicAction[Publication] = new AtomicAction[Publication]
a.setClazz(classOf[Publication])
a.setPayload(publication)
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
case software: Software =>
val a: AtomicAction[Software] = new AtomicAction[Software]
a.setClazz(classOf[Software])
a.setPayload(software)
(software.getClass.getCanonicalName, mapper.writeValueAsString(a))
case orp: OtherResearchProduct =>
val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
a.setClazz(classOf[OtherResearchProduct])
a.setPayload(orp)
(orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
case relation: Relation =>
val a: AtomicAction[Relation] = new AtomicAction[Relation]
a.setClazz(classOf[Relation])
a.setPayload(relation)
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
case _ =>
null
}
}
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o => toActionSet(o))
.filter(o => o != null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
}
}

Some files were not shown because too many files have changed in this diff Show More