forked from D-Net/dnet-hadoop
cleanup unused classes, adjustments in the oozie wf definition
This commit is contained in:
parent
b4e3389432
commit
18aa323ee9
|
@ -1,44 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision;
|
|
||||||
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import com.google.common.collect.ComparisonChain;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class RelationComparator implements Comparator<Relation> {
|
|
||||||
|
|
||||||
private static final Map<String, Integer> weights = Maps.newHashMap();
|
|
||||||
|
|
||||||
static {
|
|
||||||
weights.put(ModelConstants.OUTCOME, 0);
|
|
||||||
weights.put(ModelConstants.SUPPLEMENT, 1);
|
|
||||||
weights.put(ModelConstants.REVIEW, 2);
|
|
||||||
weights.put(ModelConstants.CITATION, 3);
|
|
||||||
weights.put(ModelConstants.AFFILIATION, 4);
|
|
||||||
weights.put(ModelConstants.RELATIONSHIP, 5);
|
|
||||||
weights.put(ModelConstants.PUBLICATION_DATASET, 6);
|
|
||||||
weights.put(ModelConstants.SIMILARITY, 7);
|
|
||||||
|
|
||||||
weights.put(ModelConstants.PROVISION, 8);
|
|
||||||
weights.put(ModelConstants.PARTICIPATION, 9);
|
|
||||||
weights.put(ModelConstants.DEDUP, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer getWeight(Relation o) {
|
|
||||||
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compare(Relation o1, Relation o2) {
|
|
||||||
return ComparisonChain
|
|
||||||
.start()
|
|
||||||
.compare(getWeight(o1), getWeight(o2))
|
|
||||||
.result();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,25 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.PriorityQueue;
|
|
||||||
import java.util.Queue;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class RelationList implements Serializable {
|
|
||||||
|
|
||||||
private Queue<Relation> relations;
|
|
||||||
|
|
||||||
public RelationList() {
|
|
||||||
this.relations = new PriorityQueue<>(new RelationComparator());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Queue<Relation> getRelations() {
|
|
||||||
return relations;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setRelations(Queue<Relation> relations) {
|
|
||||||
this.relations = relations;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,81 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
|
||||||
import com.google.common.collect.ComparisonChain;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class SortableRelation extends Relation implements Comparable<SortableRelation>, Serializable {
|
|
||||||
|
|
||||||
private static final Map<String, Integer> weights = Maps.newHashMap();
|
|
||||||
|
|
||||||
static {
|
|
||||||
weights.put(ModelConstants.OUTCOME, 0);
|
|
||||||
weights.put(ModelConstants.SUPPLEMENT, 1);
|
|
||||||
weights.put(ModelConstants.REVIEW, 2);
|
|
||||||
weights.put(ModelConstants.CITATION, 3);
|
|
||||||
weights.put(ModelConstants.AFFILIATION, 4);
|
|
||||||
weights.put(ModelConstants.RELATIONSHIP, 5);
|
|
||||||
weights.put(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, 6);
|
|
||||||
weights.put(ModelConstants.SIMILARITY, 7);
|
|
||||||
|
|
||||||
weights.put(ModelConstants.PROVISION, 8);
|
|
||||||
weights.put(ModelConstants.PARTICIPATION, 9);
|
|
||||||
weights.put(ModelConstants.DEDUP, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 34753984579L;
|
|
||||||
|
|
||||||
private String groupingKey;
|
|
||||||
|
|
||||||
public static SortableRelation create(Relation r, String groupingKey) {
|
|
||||||
SortableRelation sr = new SortableRelation();
|
|
||||||
sr.setGroupingKey(groupingKey);
|
|
||||||
sr.setSource(r.getSource());
|
|
||||||
sr.setTarget(r.getTarget());
|
|
||||||
sr.setRelType(r.getRelType());
|
|
||||||
sr.setSubRelType(r.getSubRelType());
|
|
||||||
sr.setRelClass(r.getRelClass());
|
|
||||||
sr.setDataInfo(r.getDataInfo());
|
|
||||||
sr.setCollectedfrom(r.getCollectedfrom());
|
|
||||||
sr.setLastupdatetimestamp(r.getLastupdatetimestamp());
|
|
||||||
sr.setProperties(r.getProperties());
|
|
||||||
sr.setValidated(r.getValidated());
|
|
||||||
sr.setValidationDate(r.getValidationDate());
|
|
||||||
|
|
||||||
return sr;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnore
|
|
||||||
public Relation asRelation() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(SortableRelation o) {
|
|
||||||
return ComparisonChain
|
|
||||||
.start()
|
|
||||||
.compare(getGroupingKey(), o.getGroupingKey())
|
|
||||||
.compare(getWeight(this), getWeight(o))
|
|
||||||
.result();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer getWeight(SortableRelation o) {
|
|
||||||
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getGroupingKey() {
|
|
||||||
return groupingKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setGroupingKey(String groupingKey) {
|
|
||||||
this.groupingKey = groupingKey;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,8 +1,6 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision.model;
|
package eu.dnetlib.dhp.oa.provision.model;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.substringBefore;
|
|
||||||
|
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -16,12 +14,9 @@ import org.jetbrains.annotations.Nullable;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyTerm;
|
||||||
import eu.dnetlib.dhp.oa.provision.RelationList;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.SortableRelation;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
|
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
@ -55,10 +50,7 @@ public class ProvisionModelSupport {
|
||||||
.newArrayList(
|
.newArrayList(
|
||||||
RelatedEntityWrapper.class,
|
RelatedEntityWrapper.class,
|
||||||
JoinedEntity.class,
|
JoinedEntity.class,
|
||||||
RelatedEntity.class,
|
RelatedEntity.class));
|
||||||
SortableRelationKey.class,
|
|
||||||
SortableRelation.class,
|
|
||||||
RelationList.class));
|
|
||||||
return modelClasses.toArray(new Class[] {});
|
return modelClasses.toArray(new Class[] {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -125,7 +125,7 @@
|
||||||
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
|
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
|
||||||
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
|
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
|
||||||
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
|
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
|
||||||
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
|
<case to="create_payloads">${wf:conf('resumeFrom') eq 'create_payloads'}</case>
|
||||||
<case to="drop_solr_collection">${wf:conf('resumeFrom') eq 'drop_solr_collection'}</case>
|
<case to="drop_solr_collection">${wf:conf('resumeFrom') eq 'drop_solr_collection'}</case>
|
||||||
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
|
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
|
||||||
<default to="prepare_relations"/>
|
<default to="prepare_relations"/>
|
||||||
|
@ -587,19 +587,20 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_join_phase2" to="convert_to_xml"/>
|
<join name="wait_join_phase2" to="create_payloads"/>
|
||||||
|
|
||||||
<action name="convert_to_xml">
|
<action name="create_payloads">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>convert_to_xml</name>
|
<name>create_payloads</name>
|
||||||
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
|
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
|
||||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
@ -607,7 +608,7 @@
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
|
<arg>--inputPath</arg><arg>/user/claudio.atzori/data/beta_provision/join_entities</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/xml_json</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/xml_json</arg>
|
||||||
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
|
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
|
|
Loading…
Reference in New Issue