Loading plugins/org.etsi.mts.tdl.execution.java.runtime/src/org/etsi/mts/tdl/execution/java/adapters/http/HttpSystemAdapter.java +357 −101 Original line number Diff line number Diff line Loading @@ -3,27 +3,38 @@ package org.etsi.mts.tdl.execution.java.adapters.http; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; import org.etsi.mts.tdl.execution.java.rt.core.PojoData; import org.etsi.mts.tdl.execution.java.tri.Argument; import org.etsi.mts.tdl.execution.java.tri.ComponentInstanceRole; import org.etsi.mts.tdl.execution.java.tri.Connection; import org.etsi.mts.tdl.execution.java.tri.Data; import org.etsi.mts.tdl.execution.java.tri.ElementAnnotation; import org.etsi.mts.tdl.execution.java.tri.GateReference; import org.etsi.mts.tdl.execution.java.tri.Mapping; import org.etsi.mts.tdl.execution.java.tri.NamedElement; import org.etsi.mts.tdl.execution.java.tri.Procedure; Loading @@ -39,30 +50,52 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; public class HttpSystemAdapter implements SystemAdapter { protected Validator validator; protected Reporter reporter; private HttpClient.Builder builder; private static final String defaultSut = "<SUT>"; private static final String defaultGate = "<http>"; private HttpClient.Builder clientBuilder; private HttpClient client; private ObjectMapper mapper; private String baseUri = "https://example.com"; private List<HttpHeader> defaultHeaders = new ArrayList<HttpHeader>(); private Connection[] connections; private Set<HttpEndpoint> endpoints = new HashSet<HttpSystemAdapter.HttpEndpoint>(); private String serverBaseUri = "/"; private Map<HttpEndpoint, HttpServer> servers = new Hashtable<HttpSystemAdapter.HttpEndpoint, HttpServer>(); private Queue<HttpResponse<String>> unhandledResponses = new ConcurrentLinkedQueue<>(); private Queue<HttpInput> unhandledInputs = new ConcurrentLinkedQueue<>(); private Map<Connection, Queue<HttpExchange>> unhandledRequests = new Hashtable<Connection, Queue<HttpExchange>>(); public HttpSystemAdapter(Validator validator, Reporter reporter) { this.validator = validator; this.reporter = reporter; this.endpoints.add(new HttpEndpoint(defaultSut, defaultGate, "https://example.com", 0)); } public void setBaseUri(String baseUri) { this.baseUri = baseUri; getEndpoint(defaultSut, defaultGate).address = baseUri; } public void setEndpoint(String componentName, String gateName, String baseUri, int serverPort) { this.endpoints.add(new HttpEndpoint(componentName, gateName, baseUri, serverPort)); } private HttpEndpoint getEndpoint(String componentName, String gateName) { for (HttpEndpoint endpoint : this.endpoints) { if (endpoint.componentName.equals(componentName) && endpoint.gateName == gateName) return endpoint; } return null; } /** Loading @@ -79,14 +112,29 @@ public class HttpSystemAdapter implements SystemAdapter { @Override public void configure(Connection[] connections) { if (connections.length > 1) System.err.println("TODO: multiple connections not supported"); // TODO multiple connections this.connections = connections; for (Connection connection : connections) { for (GateReference gate : connection.getEndPoints()) { if (gate.getComponentRole() != ComponentInstanceRole.Tester) continue; HttpEndpoint endpoint = this.getEndpoint(gate.getComponent().getName(), gate.getGate().getName()); if (endpoint == null || this.servers.containsKey(endpoint)) continue; int serverPort = endpoint.port; try { HttpServer server = HttpServer.create(new InetSocketAddress(serverPort), 0); server.createContext(serverBaseUri, request -> this.handleRequest(request, endpoint, connection)); server.setExecutor(null); server.start(); this.servers.put(endpoint, server); } catch (IOException e) { throw new RuntimeException("Failed to initialize HTTP server with port " + serverPort, e); } } } builder = HttpClient.newBuilder(); client = builder.build(); clientBuilder = HttpClient.newBuilder(); client = clientBuilder.build(); mapper = new ObjectMapper(); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); Loading @@ -105,13 +153,26 @@ public class HttpSystemAdapter implements SystemAdapter { } private void handleResponse(HttpResponse<String> response, Throwable t) { public void stop() { for (HttpServer server: this.servers.values()) server.stop(5); } private void handleRequest(HttpExchange request, HttpEndpoint source, Connection connection) throws IOException { synchronized (unhandledInputs) { unhandledInputs.add(new HttpInput(null, request, source, connection)); unhandledInputs.notifyAll(); } } private void handleResponse(HttpResponse<String> response, Throwable t, HttpEndpoint source, Connection connection) { if (t != null) handleError(t); else { synchronized (unhandledResponses) { unhandledResponses.add(response); unhandledResponses.notifyAll(); synchronized (unhandledInputs) { unhandledInputs.add(new HttpInput(response, null, source, connection)); unhandledInputs.notifyAll(); } } } Loading @@ -130,9 +191,46 @@ public class HttpSystemAdapter implements SystemAdapter { } } private void applyDefaults(Headers responseHeaders) { responseHeaders.add("Content-Type", "application/json"); responseHeaders.add("Accept", "application/json"); for (HttpHeader h : defaultHeaders) { responseHeaders.add(h.name, h.value); } } @Override public void send(Data message, Connection connection, NamedElement source) { HttpRequestData httpData = getRequestData(message); Object data = null; if (message.getType() instanceof Type) data = getMappedObject(message); else data = message.getValue(); if (!(data instanceof HttpRequestData) && !(data instanceof HttpResponseData)) throw new RuntimeException("Request/response data in unsupported format"); GateReference target = null; for (GateReference gate : connection.getEndPoints()) if (!gate.getComponent().equals(source)) { target = gate; break; } HttpEndpoint targetEndpoint = this.getEndpoint(target.getComponent().getName(), target.getGate().getName()); if (targetEndpoint == null) // Assume default targetEndpoint = this.getEndpoint(defaultSut, defaultGate); boolean isRequest = data instanceof HttpRequestData; try { if (isRequest) { String remoteUri = targetEndpoint.address; if (targetEndpoint.port != 0) remoteUri += ":" + targetEndpoint.port; HttpRequestData httpData = (HttpRequestData) data; HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(); applyDefaults(requestBuilder); Loading Loading @@ -161,7 +259,7 @@ public class HttpSystemAdapter implements SystemAdapter { } } } URI uri = URI.create(baseUri + httpData.uri); URI uri = URI.create(remoteUri + httpData.uri); try { if (query != null) { if (uri.getQuery() != null) Loading @@ -172,8 +270,6 @@ public class HttpSystemAdapter implements SystemAdapter { } catch (URISyntaxException e) { handleError(e); } try { requestBuilder.uri(uri); // TODO logging Loading @@ -188,7 +284,58 @@ public class HttpSystemAdapter implements SystemAdapter { CompletableFuture<HttpResponse<String>> responseFuture = client.sendAsync(requestBuilder.build(), HttpResponse.BodyHandlers.ofString(UTF_8)); responseFuture.whenCompleteAsync(this::handleResponse); HttpEndpoint endpoint = targetEndpoint; responseFuture .whenCompleteAsync((response, t) -> this.handleResponse(response, t, endpoint, connection)); } else { // Request pending HttpExchange unhandledExchange = null; synchronized (this.unhandledRequests) { if (this.unhandledRequests.containsKey(connection)) { Queue<HttpExchange> requests = this.unhandledRequests.get(connection); unhandledExchange = requests.poll(); } } if (unhandledExchange == null) throw new RuntimeException("No pending request to respond to"); HttpResponseData httpData = (HttpResponseData) data; Headers headers = unhandledExchange.getResponseHeaders(); this.applyDefaults(headers); for (HttpHeader header : httpData.headers) headers.add(header.name, header.value); // TODO cookie parameters? /* for (HttpRequestParameter p : httpData.parameters) { if (p.value == null) continue; switch (p.location) { case cookie: { System.err.println("TODO: cookie parameters not handled"); break; } } } */ byte[] body = encodeBody(httpData.body); int length = body.length; // TODO logging reporter.comment("Outgoing (headers): " + httpData.status + " | " + length); unhandledExchange.sendResponseHeaders(httpData.status, length); try (OutputStream os = unhandledExchange.getResponseBody()) { os.write(body); os.close(); } } } catch (IOException e) { handleError(e); Loading @@ -196,18 +343,35 @@ public class HttpSystemAdapter implements SystemAdapter { } @Override public Data receive(Data expected, Connection connection, NamedElement target) throws InterruptedException, AssertionError { HttpResponseData expectedHttpData = expected != null ? getResponseData(expected) : null; synchronized (unhandledResponses) { HttpResponse<String> response = unhandledResponses.peek(); while (response == null) { unhandledResponses.wait(); response = unhandledResponses.peek(); public Data receive(Data expected, Connection connection, NamedElement target) throws InterruptedException, AssertionError { synchronized (unhandledInputs) { HttpInput input = null; do { for (HttpInput i : unhandledInputs) if (i.connection.equals(connection)) if (!i.source.componentName.equals(target.getName())) { input = i; break; } if (input == null) unhandledInputs.wait(); } while (input == null); if (input.response != null) { HttpResponseData expectedHttpData = null; if (expected != null) if (expected.getValue() instanceof HttpResponseData) expectedHttpData = (HttpResponseData) expected.getValue(); HttpResponse<String> response = input.response; String body = response.body(); // TODO logging reporter.comment("Incoming: " + response.statusCode() + " | " + response.body()); reporter.comment("Incoming: " + response.statusCode() + " | " + body); try { Loading @@ -222,16 +386,74 @@ public class HttpSystemAdapter implements SystemAdapter { Data received = new PojoData<>(receivedHttpData); if (expected != null) { if (expectedHttpData.body != null) { receivedHttpData.body = decodeBody(response.body(), expectedHttpData.body.getClass()); receivedHttpData.body = decodeBody(body, expectedHttpData.body.getClass()); } if (this.validator.matches(expected, received)) { unhandledResponses.remove(); unhandledInputs.remove(input); return received; } } else { receivedHttpData.body = response.body(); unhandledResponses.remove(); receivedHttpData.body = body; unhandledInputs.remove(input); return received; } } catch (IOException e) { handleError(e); } } else { HttpRequestData expectedHttpData = null; if (expected != null) if (expected.getValue() instanceof HttpRequestData) expectedHttpData = (HttpRequestData) expected.getValue(); HttpExchange exchange = input.exchange; try (InputStream is = exchange.getRequestBody()) { // TODO other encodings Charset cs = StandardCharsets.UTF_8; String body = new String(is.readAllBytes(), cs); // TODO logging reporter.comment("Incoming: " + exchange.getRequestMethod() + " | " + body); HttpRequestData receivedHttpData = new HttpRequestData(); receivedHttpData.uri = exchange.getRequestURI().toString(); receivedHttpData.method = getHttpMethod(exchange.getRequestMethod()); Headers headers = exchange.getRequestHeaders(); receivedHttpData.headers = new ArrayList<>(); for (String header : headers.keySet()) { receivedHttpData.headers.add(new HttpHeader(header, headers.get(header))); } Data received = new PojoData<>(receivedHttpData); boolean isReceived = false; if (expected != null) { if (expectedHttpData.body != null) { receivedHttpData.body = decodeBody(body, expectedHttpData.body.getClass()); } if (this.validator.matches(expected, received)) { isReceived = true; } } else { receivedHttpData.body = body; isReceived = true; } if (isReceived) { unhandledInputs.remove(input); synchronized (unhandledRequests) { Queue<HttpExchange> requests = unhandledRequests.get(connection); if (requests == null) unhandledRequests.put(connection, requests = new LinkedBlockingQueue<HttpExchange>()); requests.add(exchange); unhandledRequests.notifyAll(); } return received; } Loading @@ -239,6 +461,7 @@ public class HttpSystemAdapter implements SystemAdapter { handleError(e); } } } return null; } Loading @@ -261,12 +484,14 @@ public class HttpSystemAdapter implements SystemAdapter { } @Override public Data[] receiveCall(Procedure operation, Data[] expectedArguments, Connection connection, NamedElement target) { public Data[] receiveCall(Procedure operation, Data[] expectedArguments, Connection connection, NamedElement target) { throw new UnsupportedOperationException("Procedure-based communication is not supported by this adapter."); } @Override public void replyCall(Procedure operation, Data returnValue, Data exception, Connection connection, NamedElement source) { public void replyCall(Procedure operation, Data returnValue, Data exception, Connection connection, NamedElement source) { throw new UnsupportedOperationException("Procedure-based communication is not supported by this adapter."); } Loading Loading @@ -340,10 +565,41 @@ public class HttpSystemAdapter implements SystemAdapter { return obj; } protected HttpResponseData getResponseData(Data message) { if (message.getValue() instanceof HttpResponseData) return (HttpResponseData) message.getValue(); // XXX private HttpMethod getHttpMethod(String method) { for (HttpMethod m : HttpMethod.values()) if (m.toString().equalsIgnoreCase(method)) return m; return null; } class HttpInput { HttpResponse<String> response; HttpExchange exchange; HttpEndpoint source; Connection connection; public HttpInput(HttpResponse<String> response, HttpExchange exchange, HttpEndpoint source, Connection connection) { this.response = response; this.exchange = exchange; this.source = source; this.connection = connection; } } class HttpEndpoint { String componentName; String gateName; String address; int port; public HttpEndpoint(String componentName, String gateName, String address, int port) { super(); this.componentName = componentName; this.gateName = gateName; this.address = address; this.port = port; } } } No newline at end of file Loading
plugins/org.etsi.mts.tdl.execution.java.runtime/src/org/etsi/mts/tdl/execution/java/adapters/http/HttpSystemAdapter.java +357 −101 Original line number Diff line number Diff line Loading @@ -3,27 +3,38 @@ package org.etsi.mts.tdl.execution.java.adapters.http; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; import org.etsi.mts.tdl.execution.java.rt.core.PojoData; import org.etsi.mts.tdl.execution.java.tri.Argument; import org.etsi.mts.tdl.execution.java.tri.ComponentInstanceRole; import org.etsi.mts.tdl.execution.java.tri.Connection; import org.etsi.mts.tdl.execution.java.tri.Data; import org.etsi.mts.tdl.execution.java.tri.ElementAnnotation; import org.etsi.mts.tdl.execution.java.tri.GateReference; import org.etsi.mts.tdl.execution.java.tri.Mapping; import org.etsi.mts.tdl.execution.java.tri.NamedElement; import org.etsi.mts.tdl.execution.java.tri.Procedure; Loading @@ -39,30 +50,52 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; public class HttpSystemAdapter implements SystemAdapter { protected Validator validator; protected Reporter reporter; private HttpClient.Builder builder; private static final String defaultSut = "<SUT>"; private static final String defaultGate = "<http>"; private HttpClient.Builder clientBuilder; private HttpClient client; private ObjectMapper mapper; private String baseUri = "https://example.com"; private List<HttpHeader> defaultHeaders = new ArrayList<HttpHeader>(); private Connection[] connections; private Set<HttpEndpoint> endpoints = new HashSet<HttpSystemAdapter.HttpEndpoint>(); private String serverBaseUri = "/"; private Map<HttpEndpoint, HttpServer> servers = new Hashtable<HttpSystemAdapter.HttpEndpoint, HttpServer>(); private Queue<HttpResponse<String>> unhandledResponses = new ConcurrentLinkedQueue<>(); private Queue<HttpInput> unhandledInputs = new ConcurrentLinkedQueue<>(); private Map<Connection, Queue<HttpExchange>> unhandledRequests = new Hashtable<Connection, Queue<HttpExchange>>(); public HttpSystemAdapter(Validator validator, Reporter reporter) { this.validator = validator; this.reporter = reporter; this.endpoints.add(new HttpEndpoint(defaultSut, defaultGate, "https://example.com", 0)); } public void setBaseUri(String baseUri) { this.baseUri = baseUri; getEndpoint(defaultSut, defaultGate).address = baseUri; } public void setEndpoint(String componentName, String gateName, String baseUri, int serverPort) { this.endpoints.add(new HttpEndpoint(componentName, gateName, baseUri, serverPort)); } private HttpEndpoint getEndpoint(String componentName, String gateName) { for (HttpEndpoint endpoint : this.endpoints) { if (endpoint.componentName.equals(componentName) && endpoint.gateName == gateName) return endpoint; } return null; } /** Loading @@ -79,14 +112,29 @@ public class HttpSystemAdapter implements SystemAdapter { @Override public void configure(Connection[] connections) { if (connections.length > 1) System.err.println("TODO: multiple connections not supported"); // TODO multiple connections this.connections = connections; for (Connection connection : connections) { for (GateReference gate : connection.getEndPoints()) { if (gate.getComponentRole() != ComponentInstanceRole.Tester) continue; HttpEndpoint endpoint = this.getEndpoint(gate.getComponent().getName(), gate.getGate().getName()); if (endpoint == null || this.servers.containsKey(endpoint)) continue; int serverPort = endpoint.port; try { HttpServer server = HttpServer.create(new InetSocketAddress(serverPort), 0); server.createContext(serverBaseUri, request -> this.handleRequest(request, endpoint, connection)); server.setExecutor(null); server.start(); this.servers.put(endpoint, server); } catch (IOException e) { throw new RuntimeException("Failed to initialize HTTP server with port " + serverPort, e); } } } builder = HttpClient.newBuilder(); client = builder.build(); clientBuilder = HttpClient.newBuilder(); client = clientBuilder.build(); mapper = new ObjectMapper(); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); Loading @@ -105,13 +153,26 @@ public class HttpSystemAdapter implements SystemAdapter { } private void handleResponse(HttpResponse<String> response, Throwable t) { public void stop() { for (HttpServer server: this.servers.values()) server.stop(5); } private void handleRequest(HttpExchange request, HttpEndpoint source, Connection connection) throws IOException { synchronized (unhandledInputs) { unhandledInputs.add(new HttpInput(null, request, source, connection)); unhandledInputs.notifyAll(); } } private void handleResponse(HttpResponse<String> response, Throwable t, HttpEndpoint source, Connection connection) { if (t != null) handleError(t); else { synchronized (unhandledResponses) { unhandledResponses.add(response); unhandledResponses.notifyAll(); synchronized (unhandledInputs) { unhandledInputs.add(new HttpInput(response, null, source, connection)); unhandledInputs.notifyAll(); } } } Loading @@ -130,9 +191,46 @@ public class HttpSystemAdapter implements SystemAdapter { } } private void applyDefaults(Headers responseHeaders) { responseHeaders.add("Content-Type", "application/json"); responseHeaders.add("Accept", "application/json"); for (HttpHeader h : defaultHeaders) { responseHeaders.add(h.name, h.value); } } @Override public void send(Data message, Connection connection, NamedElement source) { HttpRequestData httpData = getRequestData(message); Object data = null; if (message.getType() instanceof Type) data = getMappedObject(message); else data = message.getValue(); if (!(data instanceof HttpRequestData) && !(data instanceof HttpResponseData)) throw new RuntimeException("Request/response data in unsupported format"); GateReference target = null; for (GateReference gate : connection.getEndPoints()) if (!gate.getComponent().equals(source)) { target = gate; break; } HttpEndpoint targetEndpoint = this.getEndpoint(target.getComponent().getName(), target.getGate().getName()); if (targetEndpoint == null) // Assume default targetEndpoint = this.getEndpoint(defaultSut, defaultGate); boolean isRequest = data instanceof HttpRequestData; try { if (isRequest) { String remoteUri = targetEndpoint.address; if (targetEndpoint.port != 0) remoteUri += ":" + targetEndpoint.port; HttpRequestData httpData = (HttpRequestData) data; HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(); applyDefaults(requestBuilder); Loading Loading @@ -161,7 +259,7 @@ public class HttpSystemAdapter implements SystemAdapter { } } } URI uri = URI.create(baseUri + httpData.uri); URI uri = URI.create(remoteUri + httpData.uri); try { if (query != null) { if (uri.getQuery() != null) Loading @@ -172,8 +270,6 @@ public class HttpSystemAdapter implements SystemAdapter { } catch (URISyntaxException e) { handleError(e); } try { requestBuilder.uri(uri); // TODO logging Loading @@ -188,7 +284,58 @@ public class HttpSystemAdapter implements SystemAdapter { CompletableFuture<HttpResponse<String>> responseFuture = client.sendAsync(requestBuilder.build(), HttpResponse.BodyHandlers.ofString(UTF_8)); responseFuture.whenCompleteAsync(this::handleResponse); HttpEndpoint endpoint = targetEndpoint; responseFuture .whenCompleteAsync((response, t) -> this.handleResponse(response, t, endpoint, connection)); } else { // Request pending HttpExchange unhandledExchange = null; synchronized (this.unhandledRequests) { if (this.unhandledRequests.containsKey(connection)) { Queue<HttpExchange> requests = this.unhandledRequests.get(connection); unhandledExchange = requests.poll(); } } if (unhandledExchange == null) throw new RuntimeException("No pending request to respond to"); HttpResponseData httpData = (HttpResponseData) data; Headers headers = unhandledExchange.getResponseHeaders(); this.applyDefaults(headers); for (HttpHeader header : httpData.headers) headers.add(header.name, header.value); // TODO cookie parameters? /* for (HttpRequestParameter p : httpData.parameters) { if (p.value == null) continue; switch (p.location) { case cookie: { System.err.println("TODO: cookie parameters not handled"); break; } } } */ byte[] body = encodeBody(httpData.body); int length = body.length; // TODO logging reporter.comment("Outgoing (headers): " + httpData.status + " | " + length); unhandledExchange.sendResponseHeaders(httpData.status, length); try (OutputStream os = unhandledExchange.getResponseBody()) { os.write(body); os.close(); } } } catch (IOException e) { handleError(e); Loading @@ -196,18 +343,35 @@ public class HttpSystemAdapter implements SystemAdapter { } @Override public Data receive(Data expected, Connection connection, NamedElement target) throws InterruptedException, AssertionError { HttpResponseData expectedHttpData = expected != null ? getResponseData(expected) : null; synchronized (unhandledResponses) { HttpResponse<String> response = unhandledResponses.peek(); while (response == null) { unhandledResponses.wait(); response = unhandledResponses.peek(); public Data receive(Data expected, Connection connection, NamedElement target) throws InterruptedException, AssertionError { synchronized (unhandledInputs) { HttpInput input = null; do { for (HttpInput i : unhandledInputs) if (i.connection.equals(connection)) if (!i.source.componentName.equals(target.getName())) { input = i; break; } if (input == null) unhandledInputs.wait(); } while (input == null); if (input.response != null) { HttpResponseData expectedHttpData = null; if (expected != null) if (expected.getValue() instanceof HttpResponseData) expectedHttpData = (HttpResponseData) expected.getValue(); HttpResponse<String> response = input.response; String body = response.body(); // TODO logging reporter.comment("Incoming: " + response.statusCode() + " | " + response.body()); reporter.comment("Incoming: " + response.statusCode() + " | " + body); try { Loading @@ -222,16 +386,74 @@ public class HttpSystemAdapter implements SystemAdapter { Data received = new PojoData<>(receivedHttpData); if (expected != null) { if (expectedHttpData.body != null) { receivedHttpData.body = decodeBody(response.body(), expectedHttpData.body.getClass()); receivedHttpData.body = decodeBody(body, expectedHttpData.body.getClass()); } if (this.validator.matches(expected, received)) { unhandledResponses.remove(); unhandledInputs.remove(input); return received; } } else { receivedHttpData.body = response.body(); unhandledResponses.remove(); receivedHttpData.body = body; unhandledInputs.remove(input); return received; } } catch (IOException e) { handleError(e); } } else { HttpRequestData expectedHttpData = null; if (expected != null) if (expected.getValue() instanceof HttpRequestData) expectedHttpData = (HttpRequestData) expected.getValue(); HttpExchange exchange = input.exchange; try (InputStream is = exchange.getRequestBody()) { // TODO other encodings Charset cs = StandardCharsets.UTF_8; String body = new String(is.readAllBytes(), cs); // TODO logging reporter.comment("Incoming: " + exchange.getRequestMethod() + " | " + body); HttpRequestData receivedHttpData = new HttpRequestData(); receivedHttpData.uri = exchange.getRequestURI().toString(); receivedHttpData.method = getHttpMethod(exchange.getRequestMethod()); Headers headers = exchange.getRequestHeaders(); receivedHttpData.headers = new ArrayList<>(); for (String header : headers.keySet()) { receivedHttpData.headers.add(new HttpHeader(header, headers.get(header))); } Data received = new PojoData<>(receivedHttpData); boolean isReceived = false; if (expected != null) { if (expectedHttpData.body != null) { receivedHttpData.body = decodeBody(body, expectedHttpData.body.getClass()); } if (this.validator.matches(expected, received)) { isReceived = true; } } else { receivedHttpData.body = body; isReceived = true; } if (isReceived) { unhandledInputs.remove(input); synchronized (unhandledRequests) { Queue<HttpExchange> requests = unhandledRequests.get(connection); if (requests == null) unhandledRequests.put(connection, requests = new LinkedBlockingQueue<HttpExchange>()); requests.add(exchange); unhandledRequests.notifyAll(); } return received; } Loading @@ -239,6 +461,7 @@ public class HttpSystemAdapter implements SystemAdapter { handleError(e); } } } return null; } Loading @@ -261,12 +484,14 @@ public class HttpSystemAdapter implements SystemAdapter { } @Override public Data[] receiveCall(Procedure operation, Data[] expectedArguments, Connection connection, NamedElement target) { public Data[] receiveCall(Procedure operation, Data[] expectedArguments, Connection connection, NamedElement target) { throw new UnsupportedOperationException("Procedure-based communication is not supported by this adapter."); } @Override public void replyCall(Procedure operation, Data returnValue, Data exception, Connection connection, NamedElement source) { public void replyCall(Procedure operation, Data returnValue, Data exception, Connection connection, NamedElement source) { throw new UnsupportedOperationException("Procedure-based communication is not supported by this adapter."); } Loading Loading @@ -340,10 +565,41 @@ public class HttpSystemAdapter implements SystemAdapter { return obj; } protected HttpResponseData getResponseData(Data message) { if (message.getValue() instanceof HttpResponseData) return (HttpResponseData) message.getValue(); // XXX private HttpMethod getHttpMethod(String method) { for (HttpMethod m : HttpMethod.values()) if (m.toString().equalsIgnoreCase(method)) return m; return null; } class HttpInput { HttpResponse<String> response; HttpExchange exchange; HttpEndpoint source; Connection connection; public HttpInput(HttpResponse<String> response, HttpExchange exchange, HttpEndpoint source, Connection connection) { this.response = response; this.exchange = exchange; this.source = source; this.connection = connection; } } class HttpEndpoint { String componentName; String gateName; String address; int port; public HttpEndpoint(String componentName, String gateName, String address, int port) { super(); this.componentName = componentName; this.gateName = gateName; this.address = address; this.port = port; } } } No newline at end of file