forked from antonis.lempesis/dnet-hadoop
cleaned up workflow for actionset migration, adjusted dnet|cnr* dependency versions
This commit is contained in:
parent
5afa7d3e0c
commit
91e7220f20
|
@ -107,7 +107,6 @@
|
|||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
<classifier>stable_ids</classifier>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -51,11 +51,6 @@
|
|||
<artifactId>hadoop-distcp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-openaire-data-protos</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-api</artifactId>
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
|
||||
|
||||
public class LicenseComparator implements Comparator<Qualifier> {
|
||||
|
||||
@Override
|
||||
public int compare(Qualifier left, Qualifier right) {
|
||||
|
||||
if (left == null && right == null)
|
||||
return 0;
|
||||
if (left == null)
|
||||
return 1;
|
||||
if (right == null)
|
||||
return -1;
|
||||
|
||||
String lClass = left.getClassid();
|
||||
String rClass = right.getClassid();
|
||||
|
||||
if (lClass.equals(rClass))
|
||||
return 0;
|
||||
|
||||
if (lClass.equals("OPEN SOURCE"))
|
||||
return -1;
|
||||
if (rClass.equals("OPEN SOURCE"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("OPEN"))
|
||||
return -1;
|
||||
if (rClass.equals("OPEN"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("6MONTHS"))
|
||||
return -1;
|
||||
if (rClass.equals("6MONTHS"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("12MONTHS"))
|
||||
return -1;
|
||||
if (rClass.equals("12MONTHS"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("EMBARGO"))
|
||||
return -1;
|
||||
if (rClass.equals("EMBARGO"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("RESTRICTED"))
|
||||
return -1;
|
||||
if (rClass.equals("RESTRICTED"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("CLOSED"))
|
||||
return -1;
|
||||
if (rClass.equals("CLOSED"))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals("UNKNOWN"))
|
||||
return -1;
|
||||
if (rClass.equals("UNKNOWN"))
|
||||
return 1;
|
||||
|
||||
// Else (but unlikely), lexicographical ordering will do.
|
||||
return lClass.compareTo(rClass);
|
||||
}
|
||||
}
|
|
@ -1,196 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.tools.DistCp;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
||||
public class MigrateActionSet {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MigrateActionSet.class);
|
||||
|
||||
private static final String SEPARATOR = "/";
|
||||
private static final String TARGET_PATHS = "target_paths";
|
||||
private static final String RAWSET_PREFIX = "rawset_";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
MigrateActionSet.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new MigrateActionSet().run(parser);
|
||||
}
|
||||
|
||||
private void run(ArgumentApplicationParser parser) throws Exception {
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
final String sourceNN = parser.get("sourceNameNode");
|
||||
final String targetNN = parser.get("targetNameNode");
|
||||
final String workDir = parser.get("workingDirectory");
|
||||
final Integer distcp_num_maps = Integer.parseInt(parser.get("distcp_num_maps"));
|
||||
|
||||
final String distcp_memory_mb = parser.get("distcp_memory_mb");
|
||||
final String distcp_task_timeout = parser.get("distcp_task_timeout");
|
||||
|
||||
final String transform_only_s = parser.get("transform_only");
|
||||
|
||||
log.info("transform only param: {}", transform_only_s);
|
||||
|
||||
final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only"));
|
||||
|
||||
log.info("transform only: {}", transformOnly);
|
||||
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
Configuration conf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps);
|
||||
FileSystem targetFS = FileSystem.get(conf);
|
||||
|
||||
Configuration sourceConf = getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps);
|
||||
sourceConf.set(FileSystem.FS_DEFAULT_NAME_KEY, sourceNN);
|
||||
FileSystem sourceFS = FileSystem.get(sourceConf);
|
||||
|
||||
Properties props = new Properties();
|
||||
|
||||
List<Path> targetPaths = new ArrayList<>();
|
||||
|
||||
final List<Path> sourcePaths = getSourcePaths(sourceNN, isLookUp);
|
||||
log
|
||||
.info(
|
||||
"paths to process:\n{}", sourcePaths
|
||||
.stream()
|
||||
.map(p -> p.toString())
|
||||
.collect(Collectors.joining("\n")));
|
||||
|
||||
for (Path source : sourcePaths) {
|
||||
|
||||
if (!sourceFS.exists(source)) {
|
||||
log.warn("skipping unexisting path: {}", source);
|
||||
} else {
|
||||
|
||||
LinkedList<String> pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath()));
|
||||
|
||||
final String rawSet = pathQ.pollLast();
|
||||
log.info("got RAWSET: {}", rawSet);
|
||||
|
||||
if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) {
|
||||
|
||||
final String actionSetDirectory = pathQ.pollLast();
|
||||
|
||||
final Path targetPath = new Path(
|
||||
targetNN + workDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawSet);
|
||||
|
||||
log.info("using TARGET PATH: {}", targetPath);
|
||||
|
||||
if (!transformOnly) {
|
||||
if (targetFS.exists(targetPath)) {
|
||||
targetFS.delete(targetPath, true);
|
||||
}
|
||||
runDistcp(
|
||||
distcp_num_maps, distcp_memory_mb, distcp_task_timeout, conf, source, targetPath);
|
||||
}
|
||||
|
||||
targetPaths.add(targetPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final String targetPathsCsv = targetPaths.stream().map(p -> p.toString()).collect(Collectors.joining(","));
|
||||
props.setProperty(TARGET_PATHS, targetPathsCsv);
|
||||
File file = new File(System.getProperty("oozie.action.output.properties"));
|
||||
|
||||
try (OutputStream os = new FileOutputStream(file)) {
|
||||
props.store(os, "");
|
||||
}
|
||||
System.out.println(file.getAbsolutePath());
|
||||
}
|
||||
|
||||
private void runDistcp(
|
||||
Integer distcp_num_maps,
|
||||
String distcp_memory_mb,
|
||||
String distcp_task_timeout,
|
||||
Configuration conf,
|
||||
Path source,
|
||||
Path targetPath)
|
||||
throws Exception {
|
||||
|
||||
final DistCpOptions op = new DistCpOptions(source, targetPath);
|
||||
op.setMaxMaps(distcp_num_maps);
|
||||
op.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
|
||||
op.preserve(DistCpOptions.FileAttribute.REPLICATION);
|
||||
op.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE);
|
||||
|
||||
int res = ToolRunner
|
||||
.run(
|
||||
new DistCp(conf, op),
|
||||
new String[] {
|
||||
"-Dmapred.task.timeout=" + distcp_task_timeout,
|
||||
"-Dmapreduce.map.memory.mb=" + distcp_memory_mb,
|
||||
"-pb",
|
||||
"-m " + distcp_num_maps,
|
||||
source.toString(),
|
||||
targetPath.toString()
|
||||
});
|
||||
|
||||
if (res != 0) {
|
||||
throw new RuntimeException(String.format("distcp exited with code %s", res));
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration getConfiguration(
|
||||
String distcp_task_timeout, String distcp_memory_mb, Integer distcp_num_maps) {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set("dfs.webhdfs.socket.connect-timeout", distcp_task_timeout);
|
||||
conf.set("dfs.webhdfs.socket.read-timeout", distcp_task_timeout);
|
||||
conf.set("dfs.http.client.retry.policy.enabled", "true");
|
||||
conf.set("mapred.task.timeout", distcp_task_timeout);
|
||||
conf.set("mapreduce.map.memory.mb", distcp_memory_mb);
|
||||
conf.set("mapred.map.tasks", String.valueOf(distcp_num_maps));
|
||||
return conf;
|
||||
}
|
||||
|
||||
private List<Path> getSourcePaths(String sourceNN, ISLookUpService isLookUp)
|
||||
throws ISLookUpException {
|
||||
String XQUERY = "distinct-values(\n"
|
||||
+ "let $basePath := collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()\n"
|
||||
+ "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') \n"
|
||||
+ "let $setDir := $x//SET/@directory/string()\n"
|
||||
+ "let $rawSet := $x//RAW_SETS/LATEST/@id/string()\n"
|
||||
+ "return concat($basePath, '/', $setDir, '/', $rawSet))";
|
||||
|
||||
log.info(String.format("running xquery:\n%s", XQUERY));
|
||||
return isLookUp
|
||||
.quickSearchProfile(XQUERY)
|
||||
.stream()
|
||||
.map(p -> sourceNN + p)
|
||||
.map(Path::new)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
|
@ -1,719 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.googlecode.protobuf.format.JsonFormat;
|
||||
|
||||
import eu.dnetlib.data.proto.*;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
|
||||
public class ProtoConverter implements Serializable {
|
||||
|
||||
public static Oaf convert(OafProtos.Oaf oaf) {
|
||||
try {
|
||||
switch (oaf.getKind()) {
|
||||
case entity:
|
||||
return convertEntity(oaf);
|
||||
case relation:
|
||||
return convertRelation(oaf);
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid kind " + oaf.getKind());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("error on getting " + JsonFormat.printToString(oaf), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Relation convertRelation(OafProtos.Oaf oaf) {
|
||||
final OafProtos.OafRel r = oaf.getRel();
|
||||
final Relation rel = new Relation();
|
||||
rel.setDataInfo(mapDataInfo(oaf.getDataInfo()));
|
||||
rel.setLastupdatetimestamp(oaf.getLastupdatetimestamp());
|
||||
rel.setSource(r.getSource());
|
||||
rel.setTarget(r.getTarget());
|
||||
rel.setRelType(r.getRelType().toString());
|
||||
rel.setSubRelType(r.getSubRelType().toString());
|
||||
rel.setRelClass(r.getRelClass());
|
||||
rel
|
||||
.setCollectedfrom(
|
||||
r.getCollectedfromCount() > 0
|
||||
? r.getCollectedfromList().stream().map(kv -> mapKV(kv)).collect(Collectors.toList())
|
||||
: null);
|
||||
return rel;
|
||||
}
|
||||
|
||||
private static OafEntity convertEntity(OafProtos.Oaf oaf) {
|
||||
|
||||
switch (oaf.getEntity().getType()) {
|
||||
case result:
|
||||
final Result r = convertResult(oaf);
|
||||
r.setInstance(convertInstances(oaf));
|
||||
r.setExternalReference(convertExternalRefs(oaf));
|
||||
return r;
|
||||
case project:
|
||||
return convertProject(oaf);
|
||||
case datasource:
|
||||
return convertDataSource(oaf);
|
||||
case organization:
|
||||
return convertOrganization(oaf);
|
||||
default:
|
||||
throw new RuntimeException("received unknown type");
|
||||
}
|
||||
}
|
||||
|
||||
private static List<Instance> convertInstances(OafProtos.Oaf oaf) {
|
||||
|
||||
final ResultProtos.Result r = oaf.getEntity().getResult();
|
||||
if (r.getInstanceCount() > 0) {
|
||||
return r.getInstanceList().stream().map(i -> convertInstance(i)).collect(Collectors.toList());
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
private static Instance convertInstance(ResultProtos.Result.Instance ri) {
|
||||
final Instance i = new Instance();
|
||||
i.setAccessright(mapAccessRight(ri.getAccessright()));
|
||||
i.setCollectedfrom(mapKV(ri.getCollectedfrom()));
|
||||
i.setDateofacceptance(mapStringField(ri.getDateofacceptance()));
|
||||
i.setDistributionlocation(ri.getDistributionlocation());
|
||||
i.setHostedby(mapKV(ri.getHostedby()));
|
||||
i.setInstancetype(mapQualifier(ri.getInstancetype()));
|
||||
i.setLicense(mapStringField(ri.getLicense()));
|
||||
i
|
||||
.setUrl(
|
||||
ri.getUrlList() != null ? ri
|
||||
.getUrlList()
|
||||
.stream()
|
||||
.distinct()
|
||||
.collect(Collectors.toCollection(ArrayList::new)) : null);
|
||||
i.setRefereed(mapRefereed(ri.getRefereed()));
|
||||
i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount()));
|
||||
i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency()));
|
||||
return i;
|
||||
}
|
||||
|
||||
private static Qualifier mapRefereed(FieldTypeProtos.StringField refereed) {
|
||||
Qualifier q = new Qualifier();
|
||||
q.setClassid(refereed.getValue());
|
||||
q.setSchemename(refereed.getValue());
|
||||
q.setSchemeid(DNET_REVIEW_LEVELS);
|
||||
q.setSchemename(DNET_REVIEW_LEVELS);
|
||||
return q;
|
||||
}
|
||||
|
||||
private static List<ExternalReference> convertExternalRefs(OafProtos.Oaf oaf) {
|
||||
ResultProtos.Result r = oaf.getEntity().getResult();
|
||||
if (r.getExternalReferenceCount() > 0) {
|
||||
return r
|
||||
.getExternalReferenceList()
|
||||
.stream()
|
||||
.map(e -> convertExtRef(e))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
private static ExternalReference convertExtRef(ResultProtos.Result.ExternalReference e) {
|
||||
ExternalReference ex = new ExternalReference();
|
||||
ex.setUrl(e.getUrl());
|
||||
ex.setSitename(e.getSitename());
|
||||
ex.setRefidentifier(e.getRefidentifier());
|
||||
ex.setQuery(e.getQuery());
|
||||
ex.setQualifier(mapQualifier(e.getQualifier()));
|
||||
ex.setLabel(e.getLabel());
|
||||
ex.setDataInfo(ex.getDataInfo());
|
||||
return ex;
|
||||
}
|
||||
|
||||
private static Organization convertOrganization(OafProtos.Oaf oaf) {
|
||||
final OrganizationProtos.Organization.Metadata m = oaf.getEntity().getOrganization().getMetadata();
|
||||
final Organization org = setOaf(new Organization(), oaf);
|
||||
setEntity(org, oaf);
|
||||
org.setLegalshortname(mapStringField(m.getLegalshortname()));
|
||||
org.setLegalname(mapStringField(m.getLegalname()));
|
||||
org
|
||||
.setAlternativeNames(
|
||||
m
|
||||
.getAlternativeNamesList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
org.setWebsiteurl(mapStringField(m.getWebsiteurl()));
|
||||
org.setLogourl(mapStringField(m.getLogourl()));
|
||||
org.setEclegalbody(mapStringField(m.getEclegalbody()));
|
||||
org.setEclegalperson(mapStringField(m.getEclegalperson()));
|
||||
org.setEcnonprofit(mapStringField(m.getEcnonprofit()));
|
||||
org.setEcresearchorganization(mapStringField(m.getEcresearchorganization()));
|
||||
org.setEchighereducation(mapStringField(m.getEchighereducation()));
|
||||
org
|
||||
.setEcinternationalorganizationeurinterests(
|
||||
mapStringField(m.getEcinternationalorganizationeurinterests()));
|
||||
org.setEcinternationalorganization(mapStringField(m.getEcinternationalorganization()));
|
||||
org.setEcenterprise(mapStringField(m.getEcenterprise()));
|
||||
org.setEcsmevalidated(mapStringField(m.getEcsmevalidated()));
|
||||
org.setEcnutscode(mapStringField(m.getEcnutscode()));
|
||||
org.setCountry(mapQualifier(m.getCountry()));
|
||||
|
||||
return org;
|
||||
}
|
||||
|
||||
private static Datasource convertDataSource(OafProtos.Oaf oaf) {
|
||||
final DatasourceProtos.Datasource.Metadata m = oaf.getEntity().getDatasource().getMetadata();
|
||||
final Datasource datasource = setOaf(new Datasource(), oaf);
|
||||
setEntity(datasource, oaf);
|
||||
datasource
|
||||
.setAccessinfopackage(
|
||||
m
|
||||
.getAccessinfopackageList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
datasource.setCertificates(mapStringField(m.getCertificates()));
|
||||
datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()));
|
||||
datasource.setContactemail(mapStringField(m.getContactemail()));
|
||||
datasource.setDatabaseaccessrestriction(mapStringField(m.getDatabaseaccessrestriction()));
|
||||
datasource.setDatabaseaccesstype(mapStringField(m.getDatabaseaccesstype()));
|
||||
datasource.setDataprovider(mapBoolField(m.getDataprovider()));
|
||||
datasource.setDatasourcetype(mapQualifier(m.getDatasourcetype()));
|
||||
datasource.setDatauploadrestriction(mapStringField(m.getDatauploadrestriction()));
|
||||
datasource.setCitationguidelineurl(mapStringField(m.getCitationguidelineurl()));
|
||||
datasource.setDatauploadtype(mapStringField(m.getDatauploadtype()));
|
||||
datasource.setDateofvalidation(mapStringField(m.getDateofvalidation()));
|
||||
datasource.setDescription(mapStringField(m.getDescription()));
|
||||
datasource.setEnglishname(mapStringField(m.getEnglishname()));
|
||||
datasource.setLatitude(mapStringField(m.getLatitude()));
|
||||
datasource.setLongitude(mapStringField(m.getLongitude()));
|
||||
datasource.setLogourl(mapStringField(m.getLogourl()));
|
||||
datasource.setMissionstatementurl(mapStringField(m.getMissionstatementurl()));
|
||||
datasource.setNamespaceprefix(mapStringField(m.getNamespaceprefix()));
|
||||
datasource
|
||||
.setOdcontenttypes(
|
||||
m
|
||||
.getOdcontenttypesList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
datasource
|
||||
.setOdlanguages(
|
||||
m
|
||||
.getOdlanguagesList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
datasource.setOdnumberofitems(mapStringField(m.getOdnumberofitems()));
|
||||
datasource.setOdnumberofitemsdate(mapStringField(m.getOdnumberofitemsdate()));
|
||||
datasource.setOdpolicies(mapStringField(m.getOdpolicies()));
|
||||
datasource.setOfficialname(mapStringField(m.getOfficialname()));
|
||||
datasource.setOpenairecompatibility(mapQualifier(m.getOpenairecompatibility()));
|
||||
datasource.setPidsystems(mapStringField(m.getPidsystems()));
|
||||
datasource
|
||||
.setPolicies(
|
||||
m.getPoliciesList().stream().map(ProtoConverter::mapKV).collect(Collectors.toList()));
|
||||
datasource.setQualitymanagementkind(mapStringField(m.getQualitymanagementkind()));
|
||||
datasource.setReleaseenddate(mapStringField(m.getReleaseenddate()));
|
||||
datasource.setServiceprovider(mapBoolField(m.getServiceprovider()));
|
||||
datasource.setReleasestartdate(mapStringField(m.getReleasestartdate()));
|
||||
datasource
|
||||
.setSubjects(
|
||||
m
|
||||
.getSubjectsList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStructuredProperty)
|
||||
.collect(Collectors.toList()));
|
||||
datasource.setVersioning(mapBoolField(m.getVersioning()));
|
||||
datasource.setWebsiteurl(mapStringField(m.getWebsiteurl()));
|
||||
datasource.setJournal(mapJournal(m.getJournal()));
|
||||
|
||||
return datasource;
|
||||
}
|
||||
|
||||
private static Project convertProject(OafProtos.Oaf oaf) {
|
||||
final ProjectProtos.Project.Metadata m = oaf.getEntity().getProject().getMetadata();
|
||||
final Project project = setOaf(new Project(), oaf);
|
||||
setEntity(project, oaf);
|
||||
project.setAcronym(mapStringField(m.getAcronym()));
|
||||
project.setCallidentifier(mapStringField(m.getCallidentifier()));
|
||||
project.setCode(mapStringField(m.getCode()));
|
||||
project.setContactemail(mapStringField(m.getContactemail()));
|
||||
project.setContactfax(mapStringField(m.getContactfax()));
|
||||
project.setContactfullname(mapStringField(m.getContactfullname()));
|
||||
project.setContactphone(mapStringField(m.getContactphone()));
|
||||
project.setContracttype(mapQualifier(m.getContracttype()));
|
||||
project.setCurrency(mapStringField(m.getCurrency()));
|
||||
project.setDuration(mapStringField(m.getDuration()));
|
||||
project.setEcarticle29_3(mapStringField(m.getEcarticle293()));
|
||||
project.setEcsc39(mapStringField(m.getEcsc39()));
|
||||
project.setOamandatepublications(mapStringField(m.getOamandatepublications()));
|
||||
project.setStartdate(mapStringField(m.getStartdate()));
|
||||
project.setEnddate(mapStringField(m.getEnddate()));
|
||||
project.setFundedamount(m.getFundedamount());
|
||||
project.setTotalcost(m.getTotalcost());
|
||||
project.setKeywords(mapStringField(m.getKeywords()));
|
||||
project
|
||||
.setSubjects(
|
||||
m
|
||||
.getSubjectsList()
|
||||
.stream()
|
||||
.map(sp -> mapStructuredProperty(sp))
|
||||
.collect(Collectors.toList()));
|
||||
project.setTitle(mapStringField(m.getTitle()));
|
||||
project.setWebsiteurl(mapStringField(m.getWebsiteurl()));
|
||||
project
|
||||
.setFundingtree(
|
||||
m.getFundingtreeList().stream().map(f -> mapStringField(f)).collect(Collectors.toList()));
|
||||
project.setJsonextrainfo(mapStringField(m.getJsonextrainfo()));
|
||||
project.setSummary(mapStringField(m.getSummary()));
|
||||
project.setOptional1(mapStringField(m.getOptional1()));
|
||||
project.setOptional2(mapStringField(m.getOptional2()));
|
||||
return project;
|
||||
}
|
||||
|
||||
private static Result convertResult(OafProtos.Oaf oaf) {
|
||||
switch (oaf.getEntity().getResult().getMetadata().getResulttype().getClassid()) {
|
||||
case "dataset":
|
||||
return createDataset(oaf);
|
||||
case "publication":
|
||||
return createPublication(oaf);
|
||||
case "software":
|
||||
return createSoftware(oaf);
|
||||
case "other":
|
||||
return createORP(oaf);
|
||||
default:
|
||||
Result result = setOaf(new Result(), oaf);
|
||||
setEntity(result, oaf);
|
||||
return setResult(result, oaf);
|
||||
}
|
||||
}
|
||||
|
||||
private static Software createSoftware(OafProtos.Oaf oaf) {
|
||||
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
|
||||
Software software = setOaf(new Software(), oaf);
|
||||
setEntity(software, oaf);
|
||||
setResult(software, oaf);
|
||||
|
||||
software
|
||||
.setDocumentationUrl(
|
||||
m
|
||||
.getDocumentationUrlList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
software
|
||||
.setLicense(
|
||||
m
|
||||
.getLicenseList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStructuredProperty)
|
||||
.collect(Collectors.toList()));
|
||||
software.setCodeRepositoryUrl(mapStringField(m.getCodeRepositoryUrl()));
|
||||
software.setProgrammingLanguage(mapQualifier(m.getProgrammingLanguage()));
|
||||
return software;
|
||||
}
|
||||
|
||||
private static OtherResearchProduct createORP(OafProtos.Oaf oaf) {
|
||||
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
|
||||
OtherResearchProduct otherResearchProducts = setOaf(new OtherResearchProduct(), oaf);
|
||||
setEntity(otherResearchProducts, oaf);
|
||||
setResult(otherResearchProducts, oaf);
|
||||
otherResearchProducts
|
||||
.setContactperson(
|
||||
m
|
||||
.getContactpersonList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
otherResearchProducts
|
||||
.setContactgroup(
|
||||
m
|
||||
.getContactgroupList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
otherResearchProducts
|
||||
.setTool(
|
||||
m.getToolList().stream().map(ProtoConverter::mapStringField).collect(Collectors.toList()));
|
||||
|
||||
return otherResearchProducts;
|
||||
}
|
||||
|
||||
private static Publication createPublication(OafProtos.Oaf oaf) {
|
||||
|
||||
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
|
||||
Publication publication = setOaf(new Publication(), oaf);
|
||||
setEntity(publication, oaf);
|
||||
setResult(publication, oaf);
|
||||
publication.setJournal(mapJournal(m.getJournal()));
|
||||
return publication;
|
||||
}
|
||||
|
||||
private static Dataset createDataset(OafProtos.Oaf oaf) {
|
||||
|
||||
ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
|
||||
Dataset dataset = setOaf(new Dataset(), oaf);
|
||||
setEntity(dataset, oaf);
|
||||
setResult(dataset, oaf);
|
||||
dataset.setStoragedate(mapStringField(m.getStoragedate()));
|
||||
dataset.setDevice(mapStringField(m.getDevice()));
|
||||
dataset.setSize(mapStringField(m.getSize()));
|
||||
dataset.setVersion(mapStringField(m.getVersion()));
|
||||
dataset.setLastmetadataupdate(mapStringField(m.getLastmetadataupdate()));
|
||||
dataset.setMetadataversionnumber(mapStringField(m.getMetadataversionnumber()));
|
||||
dataset
|
||||
.setGeolocation(
|
||||
m
|
||||
.getGeolocationList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapGeolocation)
|
||||
.collect(Collectors.toList()));
|
||||
return dataset;
|
||||
}
|
||||
|
||||
public static <T extends Oaf> T setOaf(T oaf, OafProtos.Oaf o) {
|
||||
oaf.setDataInfo(mapDataInfo(o.getDataInfo()));
|
||||
oaf.setLastupdatetimestamp(o.getLastupdatetimestamp());
|
||||
return oaf;
|
||||
}
|
||||
|
||||
public static <T extends OafEntity> T setEntity(T entity, OafProtos.Oaf oaf) {
|
||||
// setting Entity fields
|
||||
final OafProtos.OafEntity e = oaf.getEntity();
|
||||
entity.setId(e.getId());
|
||||
entity.setOriginalId(e.getOriginalIdList());
|
||||
entity
|
||||
.setCollectedfrom(
|
||||
e.getCollectedfromList().stream().map(ProtoConverter::mapKV).collect(Collectors.toList()));
|
||||
entity
|
||||
.setPid(
|
||||
e
|
||||
.getPidList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStructuredProperty)
|
||||
.collect(Collectors.toList()));
|
||||
entity.setDateofcollection(e.getDateofcollection());
|
||||
entity.setDateoftransformation(e.getDateoftransformation());
|
||||
entity
|
||||
.setExtraInfo(
|
||||
e
|
||||
.getExtraInfoList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapExtraInfo)
|
||||
.collect(Collectors.toList()));
|
||||
return entity;
|
||||
}
|
||||
|
||||
public static <T extends Result> T setResult(T entity, OafProtos.Oaf oaf) {
|
||||
// setting Entity fields
|
||||
final ResultProtos.Result.Metadata m = oaf.getEntity().getResult().getMetadata();
|
||||
entity
|
||||
.setAuthor(
|
||||
m.getAuthorList().stream().map(ProtoConverter::mapAuthor).collect(Collectors.toList()));
|
||||
entity.setResulttype(mapQualifier(m.getResulttype()));
|
||||
entity.setLanguage(mapQualifier(m.getLanguage()));
|
||||
entity
|
||||
.setCountry(
|
||||
m
|
||||
.getCountryList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapQualifierAsCountry)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setSubject(
|
||||
m
|
||||
.getSubjectList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStructuredProperty)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setTitle(
|
||||
m
|
||||
.getTitleList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStructuredProperty)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setRelevantdate(
|
||||
m
|
||||
.getRelevantdateList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStructuredProperty)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setDescription(
|
||||
m
|
||||
.getDescriptionList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
entity.setDateofacceptance(mapStringField(m.getDateofacceptance()));
|
||||
entity.setPublisher(mapStringField(m.getPublisher()));
|
||||
entity.setEmbargoenddate(mapStringField(m.getEmbargoenddate()));
|
||||
entity
|
||||
.setSource(
|
||||
m
|
||||
.getSourceList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setFulltext(
|
||||
m
|
||||
.getFulltextList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setFormat(
|
||||
m
|
||||
.getFormatList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setContributor(
|
||||
m
|
||||
.getContributorList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
entity.setResourcetype(mapQualifier(m.getResourcetype()));
|
||||
entity
|
||||
.setCoverage(
|
||||
m
|
||||
.getCoverageList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setContext(
|
||||
m.getContextList().stream().map(ProtoConverter::mapContext).collect(Collectors.toList()));
|
||||
|
||||
entity.setBestaccessright(getBestAccessRights(oaf.getEntity().getResult().getInstanceList()));
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
private static Qualifier getBestAccessRights(List<ResultProtos.Result.Instance> instanceList) {
|
||||
if (instanceList != null) {
|
||||
final Optional<FieldTypeProtos.Qualifier> min = instanceList
|
||||
.stream()
|
||||
.map(i -> i.getAccessright())
|
||||
.min(new LicenseComparator());
|
||||
|
||||
final Qualifier rights = min.isPresent() ? mapAccessRight(min.get()) : new Qualifier();
|
||||
|
||||
if (StringUtils.isBlank(rights.getClassid())) {
|
||||
rights.setClassid(UNKNOWN);
|
||||
}
|
||||
if (StringUtils.isBlank(rights.getClassname())
|
||||
|| UNKNOWN.equalsIgnoreCase(rights.getClassname())) {
|
||||
rights.setClassname(NOT_AVAILABLE);
|
||||
}
|
||||
if (StringUtils.isBlank(rights.getSchemeid())) {
|
||||
rights.setSchemeid(DNET_ACCESS_MODES);
|
||||
}
|
||||
if (StringUtils.isBlank(rights.getSchemename())) {
|
||||
rights.setSchemename(DNET_ACCESS_MODES);
|
||||
}
|
||||
|
||||
return rights;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static Context mapContext(ResultProtos.Result.Context context) {
|
||||
if (context == null || StringUtils.isBlank(context.getId())) {
|
||||
return null;
|
||||
}
|
||||
final Context entity = new Context();
|
||||
entity.setId(context.getId());
|
||||
entity
|
||||
.setDataInfo(
|
||||
context
|
||||
.getDataInfoList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapDataInfo)
|
||||
.collect(Collectors.toList()));
|
||||
return entity;
|
||||
}
|
||||
|
||||
public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) {
|
||||
if (kv == null || StringUtils.isBlank(kv.getKey()) & StringUtils.isBlank(kv.getValue())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final KeyValue keyValue = new KeyValue();
|
||||
keyValue.setKey(kv.getKey());
|
||||
keyValue.setValue(kv.getValue());
|
||||
keyValue.setDataInfo(mapDataInfo(kv.getDataInfo()));
|
||||
return keyValue;
|
||||
}
|
||||
|
||||
public static DataInfo mapDataInfo(FieldTypeProtos.DataInfo d) {
|
||||
final DataInfo dataInfo = new DataInfo();
|
||||
dataInfo.setDeletedbyinference(d.getDeletedbyinference());
|
||||
dataInfo.setInferenceprovenance(d.getInferenceprovenance());
|
||||
dataInfo.setInferred(d.getInferred());
|
||||
dataInfo.setInvisible(d.getInvisible());
|
||||
dataInfo.setProvenanceaction(mapQualifier(d.getProvenanceaction()));
|
||||
dataInfo.setTrust(d.getTrust());
|
||||
return dataInfo;
|
||||
}
|
||||
|
||||
public static Qualifier mapQualifier(FieldTypeProtos.Qualifier q) {
|
||||
final Qualifier qualifier = new Qualifier();
|
||||
qualifier.setClassid(q.getClassid());
|
||||
qualifier.setClassname(q.getClassname());
|
||||
qualifier.setSchemeid(q.getSchemeid());
|
||||
qualifier.setSchemename(q.getSchemename());
|
||||
return qualifier;
|
||||
}
|
||||
|
||||
public static AccessRight mapAccessRight(FieldTypeProtos.Qualifier q) {
|
||||
final AccessRight accessRight = new AccessRight();
|
||||
accessRight.setClassid(q.getClassid());
|
||||
accessRight.setClassname(q.getClassname());
|
||||
accessRight.setSchemeid(q.getSchemeid());
|
||||
accessRight.setSchemename(q.getSchemename());
|
||||
return accessRight;
|
||||
}
|
||||
|
||||
public static Country mapQualifierAsCountry(FieldTypeProtos.Qualifier q) {
|
||||
final Country c = new Country();
|
||||
c.setClassid(q.getClassid());
|
||||
c.setClassname(q.getClassname());
|
||||
c.setSchemeid(q.getSchemeid());
|
||||
c.setSchemename(q.getSchemename());
|
||||
c.setDataInfo(mapDataInfo(q.getDataInfo()));
|
||||
return c;
|
||||
}
|
||||
|
||||
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
|
||||
if (sp == null | StringUtils.isBlank(sp.getValue())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final StructuredProperty structuredProperty = new StructuredProperty();
|
||||
structuredProperty.setValue(sp.getValue());
|
||||
structuredProperty.setQualifier(mapQualifier(sp.getQualifier()));
|
||||
structuredProperty.setDataInfo(mapDataInfo(sp.getDataInfo()));
|
||||
return structuredProperty;
|
||||
}
|
||||
|
||||
public static ExtraInfo mapExtraInfo(FieldTypeProtos.ExtraInfo extraInfo) {
|
||||
final ExtraInfo entity = new ExtraInfo();
|
||||
entity.setName(extraInfo.getName());
|
||||
entity.setTypology(extraInfo.getTypology());
|
||||
entity.setProvenance(extraInfo.getProvenance());
|
||||
entity.setTrust(extraInfo.getTrust());
|
||||
entity.setValue(extraInfo.getValue());
|
||||
return entity;
|
||||
}
|
||||
|
||||
public static OAIProvenance mapOAIProvenance(FieldTypeProtos.OAIProvenance oaiProvenance) {
|
||||
final OAIProvenance entity = new OAIProvenance();
|
||||
entity.setOriginDescription(mapOriginalDescription(oaiProvenance.getOriginDescription()));
|
||||
return entity;
|
||||
}
|
||||
|
||||
public static OriginDescription mapOriginalDescription(
|
||||
FieldTypeProtos.OAIProvenance.OriginDescription originDescription) {
|
||||
final OriginDescription originDescriptionResult = new OriginDescription();
|
||||
originDescriptionResult.setHarvestDate(originDescription.getHarvestDate());
|
||||
originDescriptionResult.setAltered(originDescription.getAltered());
|
||||
originDescriptionResult.setBaseURL(originDescription.getBaseURL());
|
||||
originDescriptionResult.setIdentifier(originDescription.getIdentifier());
|
||||
originDescriptionResult.setDatestamp(originDescription.getDatestamp());
|
||||
originDescriptionResult.setMetadataNamespace(originDescription.getMetadataNamespace());
|
||||
return originDescriptionResult;
|
||||
}
|
||||
|
||||
public static Field<String> mapStringField(FieldTypeProtos.StringField s) {
|
||||
if (s == null || StringUtils.isBlank(s.getValue())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Field<String> stringField = new Field<>();
|
||||
stringField.setValue(s.getValue());
|
||||
stringField.setDataInfo(mapDataInfo(s.getDataInfo()));
|
||||
return stringField;
|
||||
}
|
||||
|
||||
public static Field<Boolean> mapBoolField(FieldTypeProtos.BoolField b) {
|
||||
if (b == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Field<Boolean> booleanField = new Field<>();
|
||||
booleanField.setValue(b.getValue());
|
||||
booleanField.setDataInfo(mapDataInfo(b.getDataInfo()));
|
||||
return booleanField;
|
||||
}
|
||||
|
||||
public static Journal mapJournal(FieldTypeProtos.Journal j) {
|
||||
final Journal journal = new Journal();
|
||||
journal.setConferencedate(j.getConferencedate());
|
||||
journal.setConferenceplace(j.getConferenceplace());
|
||||
journal.setEdition(j.getEdition());
|
||||
journal.setEp(j.getEp());
|
||||
journal.setIss(j.getIss());
|
||||
journal.setIssnLinking(j.getIssnLinking());
|
||||
journal.setIssnOnline(j.getIssnOnline());
|
||||
journal.setIssnPrinted(j.getIssnPrinted());
|
||||
journal.setName(j.getName());
|
||||
journal.setSp(j.getSp());
|
||||
journal.setVol(j.getVol());
|
||||
journal.setDataInfo(mapDataInfo(j.getDataInfo()));
|
||||
return journal;
|
||||
}
|
||||
|
||||
public static Author mapAuthor(FieldTypeProtos.Author author) {
|
||||
final Author entity = new Author();
|
||||
entity.setFullname(author.getFullname());
|
||||
entity.setName(author.getName());
|
||||
entity.setSurname(author.getSurname());
|
||||
entity.setRank(author.getRank());
|
||||
entity
|
||||
.setPid(
|
||||
author
|
||||
.getPidList()
|
||||
.stream()
|
||||
.map(
|
||||
kv -> {
|
||||
final StructuredProperty sp = new StructuredProperty();
|
||||
sp.setValue(kv.getValue());
|
||||
final Qualifier q = new Qualifier();
|
||||
q.setClassid(kv.getKey());
|
||||
q.setClassname(kv.getKey());
|
||||
sp.setQualifier(q);
|
||||
return sp;
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
entity
|
||||
.setAffiliation(
|
||||
author
|
||||
.getAffiliationList()
|
||||
.stream()
|
||||
.map(ProtoConverter::mapStringField)
|
||||
.collect(Collectors.toList()));
|
||||
return entity;
|
||||
}
|
||||
|
||||
public static GeoLocation mapGeolocation(ResultProtos.Result.GeoLocation geoLocation) {
|
||||
final GeoLocation entity = new GeoLocation();
|
||||
entity.setPoint(geoLocation.getPoint());
|
||||
entity.setBox(geoLocation.getBox());
|
||||
entity.setPlace(geoLocation.getPlace());
|
||||
return entity;
|
||||
}
|
||||
}
|
|
@ -1,172 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import eu.dnetlib.data.proto.OafProtos;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class TransformActions implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TransformActions.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final String SEPARATOR = "/";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
MigrateActionSet.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
final String inputPaths = parser.get("inputPaths");
|
||||
|
||||
if (StringUtils.isBlank(inputPaths)) {
|
||||
throw new RuntimeException("empty inputPaths");
|
||||
}
|
||||
log.info("inputPaths: {}", inputPaths);
|
||||
|
||||
final String targetBaseDir = getTargetBaseDir(isLookupUrl);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf, isSparkSessionManaged, spark -> transformActions(inputPaths, targetBaseDir, spark));
|
||||
}
|
||||
|
||||
private static void transformActions(String inputPaths, String targetBaseDir, SparkSession spark)
|
||||
throws IOException {
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
||||
|
||||
for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
|
||||
|
||||
LinkedList<String> pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
|
||||
|
||||
final String rawset = pathQ.pollLast();
|
||||
final String actionSetDirectory = pathQ.pollLast();
|
||||
|
||||
final Path targetDirectory = new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset);
|
||||
|
||||
if (fs.exists(targetDirectory)) {
|
||||
log.info("found target directory '{}", targetDirectory);
|
||||
fs.delete(targetDirectory, true);
|
||||
log.info("deleted target directory '{}", targetDirectory);
|
||||
}
|
||||
|
||||
log.info("transforming actions from '{}' to '{}'", sourcePath, targetDirectory);
|
||||
|
||||
sc
|
||||
.sequenceFile(sourcePath, Text.class, Text.class)
|
||||
.map(a -> eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString()))
|
||||
.map(TransformActions::doTransform)
|
||||
.filter(Objects::nonNull)
|
||||
.mapToPair(
|
||||
a -> new Tuple2<>(a.getClazz().toString(), OBJECT_MAPPER.writeValueAsString(a)))
|
||||
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
|
||||
.saveAsNewAPIHadoopFile(
|
||||
targetDirectory.toString(),
|
||||
Text.class,
|
||||
Text.class,
|
||||
SequenceFileOutputFormat.class,
|
||||
sc.hadoopConfiguration());
|
||||
}
|
||||
}
|
||||
|
||||
private static AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa)
|
||||
throws InvalidProtocolBufferException {
|
||||
|
||||
// dedup similarity relations had empty target value, don't migrate them
|
||||
if (aa.getTargetValue().length == 0) {
|
||||
return null;
|
||||
}
|
||||
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
|
||||
final Oaf oaf = ProtoConverter.convert(proto_oaf);
|
||||
switch (proto_oaf.getKind()) {
|
||||
case entity:
|
||||
switch (proto_oaf.getEntity().getType()) {
|
||||
case datasource:
|
||||
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
|
||||
case organization:
|
||||
return new AtomicAction<>(Organization.class, (Organization) oaf);
|
||||
case project:
|
||||
return new AtomicAction<>(Project.class, (Project) oaf);
|
||||
case result:
|
||||
final String resulttypeid = proto_oaf
|
||||
.getEntity()
|
||||
.getResult()
|
||||
.getMetadata()
|
||||
.getResulttype()
|
||||
.getClassid();
|
||||
switch (resulttypeid) {
|
||||
case "publication":
|
||||
return new AtomicAction<>(Publication.class, (Publication) oaf);
|
||||
case "software":
|
||||
return new AtomicAction<>(Software.class, (Software) oaf);
|
||||
case "other":
|
||||
return new AtomicAction<>(OtherResearchProduct.class, (OtherResearchProduct) oaf);
|
||||
case "dataset":
|
||||
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
|
||||
default:
|
||||
// can be an update, where the resulttype is not specified
|
||||
return new AtomicAction<>(Result.class, (Result) oaf);
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"invalid entity type: " + proto_oaf.getEntity().getType());
|
||||
}
|
||||
case relation:
|
||||
return new AtomicAction<>(Relation.class, (Relation) oaf);
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
|
||||
}
|
||||
}
|
||||
|
||||
private static String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
String XQUERY = "collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
|
||||
return isLookUp.getResourceProfileByQuery(XQUERY);
|
||||
}
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "is",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "sn",
|
||||
"paramLongName": "sourceNameNode",
|
||||
"paramDescription": "nameNode of the source cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tn",
|
||||
"paramLongName": "targetNameNode",
|
||||
"paramDescription": "namoNode of the target cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingDirectory",
|
||||
"paramDescription": "working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "nm",
|
||||
"paramLongName": "distcp_num_maps",
|
||||
"paramDescription": "maximum number of map tasks used in the distcp process",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mm",
|
||||
"paramLongName": "distcp_memory_mb",
|
||||
"paramDescription": "memory for distcp action copying actionsets from remote cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tt",
|
||||
"paramLongName": "distcp_task_timeout",
|
||||
"paramDescription": "timeout for distcp copying actions from remote cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tr",
|
||||
"paramLongName": "transform_only",
|
||||
"paramDescription": "activate tranform-only mode. Only apply transformation step",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -1,20 +0,0 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "is",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "inputPaths",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -1,30 +0,0 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sourceNN</name>
|
||||
<value>webhdfs://namenode2.hadoop.dm.openaire.eu:50071</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/applicationHistory</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -1,138 +0,0 @@
|
|||
<workflow-app xmlns='uri:oozie:workflow:0.5' name='migrate_actions'>
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourceNN</name>
|
||||
<description>the source name node</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
<description>the isLookup service endpoint</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingDirectory</name>
|
||||
<description>working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>distcp_memory_mb</name>
|
||||
<value>6144</value>
|
||||
<description>memory for distcp copying actionsets from remote cluster</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>distcp_task_timeout</name>
|
||||
<value>60000000</value>
|
||||
<description>timeout for distcp copying actions from remote cluster</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>distcp_num_maps</name>
|
||||
<value>1</value>
|
||||
<description>mmaximum number of map tasks used in the distcp process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>transform_only</name>
|
||||
<description>activate tranform-only mode. Only apply transformation step</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>mapreduce.job.queuename</name>
|
||||
<value>${queueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="migrate_actionsets"/>
|
||||
|
||||
<action name="migrate_actionsets">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.migration.MigrateActionSet</main-class>
|
||||
<java-opt>-Dmapred.task.timeout=${distcp_task_timeout}</java-opt>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--sourceNameNode</arg><arg>${sourceNN}</arg>
|
||||
<arg>--targetNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--workingDirectory</arg><arg>${workingDirectory}</arg>
|
||||
<arg>--distcp_num_maps</arg><arg>${distcp_num_maps}</arg>
|
||||
<arg>--distcp_memory_mb</arg><arg>${distcp_memory_mb}</arg>
|
||||
<arg>--distcp_task_timeout</arg><arg>${distcp_task_timeout}</arg>
|
||||
<arg>--transform_only</arg><arg>${transform_only}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="transform_actions" />
|
||||
<error to="fail" />
|
||||
</action>
|
||||
|
||||
<action name="transform_actions">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>transform_actions</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.migration.TransformActions</class>
|
||||
<jar>dhp-actionmanager-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--inputPaths</arg><arg>${wf:actionData('migrate_actionsets')['target_paths']}</arg>
|
||||
</spark>
|
||||
<ok to="end"/>
|
||||
<error to="fail"/>
|
||||
</action>
|
||||
|
||||
<kill name="fail">
|
||||
<message>migrate_actions failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<end name="end" />
|
||||
|
||||
</workflow-app>
|
31
pom.xml
31
pom.xml
|
@ -136,8 +136,7 @@
|
|||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
<version>2.2.5-SNAPSHOT</version>
|
||||
<classifier>stable_ids</classifier>
|
||||
<version>${dhp-schemas.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
|
@ -344,7 +343,7 @@
|
|||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-common</artifactId>
|
||||
<version>6.0.5</version>
|
||||
<version>${dnet-actionmanager-common.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
|
@ -355,29 +354,30 @@
|
|||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-api</artifactId>
|
||||
<version>[4.0.1,5.0.0)</version>
|
||||
<version>${dnet-actionmanager-api.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>cnr-misc-utils</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-openaire-data-protos</artifactId>
|
||||
<version>3.9.8-proto250</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-pace-core</artifactId>
|
||||
<version>4.0.5</version>
|
||||
<version>${dnet-pace-core.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>cnr-rmi-api</artifactId>
|
||||
<version>[2.0.0,3.0.0)</version>
|
||||
<version>${cnr-rmi-api.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dnet-openaire-broker-common</artifactId>
|
||||
<version>${dnet.openaire.broker.common}</version>
|
||||
<version>${dnet-openaire-broker-common.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
@ -725,7 +725,12 @@
|
|||
<mockito-core.version>3.3.3</mockito-core.version>
|
||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||
<vtd.version>[2.12,3.0)</vtd.version>
|
||||
<dnet.openaire.broker.common>3.1.6</dnet.openaire.broker.common>
|
||||
<dhp-schemas.version>[2.2.5]</dhp-schemas.version>
|
||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||
<dnet-pace-core.version>[4.0.5]</dnet-pace-core.version>
|
||||
<cnr-rmi-api.version>[2.6.1]</cnr-rmi-api.version>
|
||||
<solr.version>7.5.0</solr.version>
|
||||
<okhttp.version>4.7.2</okhttp.version>
|
||||
<common.compress.version>1.20</common.compress.version>
|
||||
|
|
Loading…
Reference in New Issue