jworker example, update readme and example workflow

This commit is contained in:
dcore94 2022-10-21 15:28:59 +02:00
parent 3bf3b9e1cb
commit 05b6898dc4
7 changed files with 319 additions and 0 deletions

View File

@ -69,3 +69,20 @@ pyrest output is as follows:
}
```
## Run java worker example
Run
```
./upload-workflow.sh test_myworker_workflow.json
```
Increase the number stored by the init task in the variable cycles in order cause an exception (values > 7).
To start the my_worker implementing my_task enter directory jworker and run:
```
mvn clean compile exec:java
```
Access the UI and from the workbench run a test_myworker workflow. You should be able to see log information during execution of the task updating at every second.

85
jworker/pom.xml Normal file
View File

@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nubisware</groupId>
<artifactId>jworker</artifactId>
<version>1.0.0</version>
<name>jworker</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,156 @@
package com.nubisware;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.UUID;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
public class MyWorker {
protected static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
protected static final String taskdefUrl = "/api/metadata/taskdefs";
protected static final String taskPollUrl = "/api/tasks/poll";
protected static final String taskUpdateUrl = "/api/tasks";
final protected String url;
final protected String taskName;
final protected UUID workerId;
protected OkHttpClient client;
public MyWorker(String url, String taskName){
this.url = url;
this.taskName = taskName;
this.workerId = UUID.randomUUID();
this.client = new OkHttpClient();
this.uploadTaskDefinition();
}
protected void uploadTaskDefinition(){
JsonObject tdef = Json.createObjectBuilder()
.add("name", this.taskName)
.add("ownerEmail", "m.lettere@gmail.com")
.add("description", "Example task doing nothing actually")
.build();
JsonArray tdefs = Json.createArrayBuilder()
.add(tdef)
.build();
HttpUrl u = HttpUrl.parse(this.url + taskdefUrl);
RequestBody body = RequestBody.create(tdefs.toString(), JSON);
Request req = new Request.Builder()
.url(u)
.post(body)
.build();
try {
Response resp = this.client.newCall(req).execute();
System.out.println(resp.code());
System.out.println(resp.body().string());
} catch (Exception e) {
System.err.println(e);
}
}
protected JsonObject pollTask(){
JsonObject task = null;
HttpUrl u = HttpUrl.parse(this.url + taskPollUrl + "/" + this.taskName + "?workerid=" + this.workerId);
Request req = new Request.Builder().url(u).get().build();
System.out.println("Polling ... ");
while(task == null){
try {
//System.out.println("Polling ... ");
Response resp = this.client.newCall(req).execute();
//System.out.println(resp.code());
if(resp.code() == 200){
task = Json.createReader(new StringReader(resp.body().string())).readObject();
System.out.println("Task is " + task.getJsonObject("taskDefinition").getString("name"));
}else if(resp.code() != 204){
System.err.println("Error while polling ... " + resp.body().string());
}
Thread.sleep(1000);
} catch (Exception e) {
System.err.println(e);
}
}
return task;
}
protected void updateTask(JsonObject task, String status, ArrayList<String> logs){
JsonArrayBuilder logsBuilder = Json.createArrayBuilder();
for(String l: logs){
logsBuilder.add(l);
}
JsonObject tstat = Json.createObjectBuilder()
.add("status", status)
.add("logs", logsBuilder.build())
.add("description", "Example task doing nothing actually")
.add("taskId", task.getString("taskId"))
.add("workflowInstanceId", task.getString("workflowInstanceId"))
.build();
HttpUrl u = HttpUrl.parse(this.url + taskUpdateUrl);
RequestBody body = RequestBody.create(tstat.toString(), JSON);
Request req = new Request.Builder()
.url(u)
.post(body)
.build();
try {
Response resp = this.client.newCall(req).execute();
System.out.println(resp.code());
System.out.println(resp.body().string());
} catch (Exception e) {
System.err.println(e);
}
}
protected void handleTask(JsonObject task){
String name = task.getJsonObject("taskDefinition").getString("name");
String id = task.getString("taskId");
System.out.println("Handling task " + id + " " + name);
int cycles = task.getJsonObject("inputData").containsKey("cycles") ?
task.getJsonObject("inputData").getInt("cycles") : 1;
ArrayList<String> logs = new ArrayList<String>();
for(int i=0; i < cycles; i++){
try {
logs.add("Entering cycle " + i);
Thread.sleep(1000);
logs.add("After cycle " + i + " still ok");
this.updateTask(task, "IN_PROGRESS", logs);
logs.clear();
if(i % 7 == 6) throw new Exception("Force fail");
} catch (Exception e) {
e.printStackTrace();
logs.add("At cycle " + i + " I've been interrupted");
this.updateTask(task, "FAILED", logs);
}
}
this.updateTask(task, "COMPLETED", logs);
}
protected void run(){
while(true){
JsonObject t = this.pollTask();
this.handleTask(t);
}
}
public static void main( String[] args ){
System.out.println( "Starting MyWorker!" );
MyWorker w = new MyWorker("http://localhost:8080", "mytask");
w.run();
}
}

View File

@ -0,0 +1,20 @@
package com.nubisware;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
/**
* Unit test for simple App.
*/
public class AppTest
{
/**
* Rigorous Test :-)
*/
@Test
public void shouldAnswerWithTrue()
{
assertTrue( true );
}
}

Binary file not shown.

View File

@ -0,0 +1,41 @@
{
"ownerApp": null,
"createTime": 1666338703554,
"updateTime": 1666345110218,
"createdBy": null,
"updatedBy": null,
"name": "test_myworker",
"description": "Test myworker",
"version": 1,
"tasks": [
{
"name": "init",
"taskReferenceName": "init",
"description": null,
"inputParameters": {
"cycles": 5
},
"type": "SET_VARIABLE"
},
{
"name": "mytask",
"taskReferenceName": "test_mytask",
"description": "Task to test mytask",
"inputParameters": {
"cycles": "${workflow.variables.cycles}"
},
"type": "SIMPLE"
}
],
"inputParameters": [],
"outputParameters": {},
"failureWorkflow": null,
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"ownerEmail": "example@email.com",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {},
"inputTemplate": {}
}