Commit 0be2aab8 authored by Martti Käärik's avatar Martti Käärik
Browse files

Message receive handling between multiple receivers + exceptional behavior...

Message receive handling between multiple receivers + exceptional behavior handling corrected + interaction target assignment added + time variable type corrected + added support for DataElementUse and MemberAssignment + timer period calculation and timelabel handling corrected + TRI updates + exceptional behavior datause variables in correct scope +
parent 427d25b5
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
package org.etsi.mts.tdl.execution.java.rt.core;

import org.etsi.mts.tdl.execution.java.tri.StopException;

public interface Behaviour {
	public void execute();
	public void execute() throws StopException;
}
+1 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ public class ExceptionalBehaviour {
	public ExecutionCallable callable;
	
	/**
	 * Whether the behaviour is na interrupt or default as specified by TDL.
	 * Whether the behaviour is an interrupt or default as specified by TDL.
	 */
	public boolean isInterrupt;
	
+79 −99
Original line number Diff line number Diff line
package org.etsi.mts.tdl.execution.java.rt.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Stack;

import org.etsi.mts.tdl.execution.java.tri.Argument;
import org.etsi.mts.tdl.execution.java.tri.Connection;
import org.etsi.mts.tdl.execution.java.tri.Data;
import org.etsi.mts.tdl.execution.java.tri.StopException;
import org.etsi.mts.tdl.execution.java.tri.SystemAdapter;

public class ReceiverHub {
@@ -15,134 +16,113 @@ public class ReceiverHub {
	private boolean stopped = false;

	private SystemAdapter systemAdapter;
	private Connection connection;

	private Stack<Receivable> requests = new Stack<Receivable>();
	private Stack<Data> received = new Stack<Data>();
	private List<Expectable> expecting = Collections.synchronizedList(new ArrayList<>());
	private ExceptionalBehaviour anyReceiver;

	public ReceiverHub(SystemAdapter systemAdapter, Connection connection) {
		this.systemAdapter = systemAdapter;
		this.connection = connection;

		this.thread = new Thread(() -> run(), "Receive from SystemAdapter on " + connection.name);
		this.thread.setDaemon(true);
		this.thread.start();
	}

	private void run() {
		while (!stopped) {
			try {
				// XXX
				wait();
			} catch (InterruptedException e) {}
		}
	public ExceptionalBehaviour getAnyReceiver() {
		return anyReceiver;
	}

	public void stop() {
		this.stopped = true;
		this.thread.interrupt();
	public void setAnyReceiver(ExceptionalBehaviour anyReceiver) {
		this.anyReceiver = anyReceiver;
	}

	public Data receive(Data expected) {
		
		
		// XXX
		return null;
	}

	public Data ignoreUntil(Data expected) {
		// XXX
		return null;
	}
	
	public Data call(Object operation, Argument[] arguments, Data expectedReturn, Data expectedException) {
		// XXX
		return null;
	}

	/**
	 * Demultiplexer that receives data from system adapter in a single thread and
	 * passes it on to waiting threads. This way it is possible to cancel concurrent
	 * (alternative) waiting tasks before they get access to upcoming data (which
	 * they shouldn't).
	 */
	class ReceiverThread extends Thread {

		List<Receivable<?>> receivables = new ArrayList<Receivable<?>>();

		public boolean stopped = false;

		public ReceiverThread() {
			super("Receive from SystemAdapter");
			setDaemon(true);
			start();
		}

		@Override
		public void run() {
	private synchronized void run() {
		int currentlyExpectingIndex = 0;
		while (!stopped) {
				synchronized (receivables) {
			Expectable currentlyExpecting = null;
			synchronized (expecting) {
				while (expecting.size() <= currentlyExpectingIndex)
					try {
						receivables.wait();
						for (Receivable r : receivables) {
							r.data = r.receive.receive();
							if (r.data != null) {
								r.completed();
								break;
							}
						}
						expecting.wait();
					} catch (InterruptedException e) {
					} finally {
						receivables.forEach(r -> r.completed());
						receivables.clear();
						return;
					}
				currentlyExpecting = expecting.get(currentlyExpectingIndex);
				currentlyExpectingIndex++;
			}
			try {
				Data data = systemAdapter.receive(currentlyExpecting.expected, connection);
				if (data != null) {
					currentlyExpectingIndex = 0;
					if (currentlyExpecting.anyReceiver) {
						// Switch to the first one for reporting
						synchronized (expecting) {
							if (expecting.size() > 0)
								currentlyExpecting = expecting.get(0);
						}
					}

		public <T> Receivable<T> addReceivable(ReceiveMethod<T> receive) {
			Receivable<T> r = new Receivable<T>(receive);
			synchronized (receivables) {
				receivables.add(r);
				receivables.notifyAll();
					expecting.clear();
					synchronized (currentlyExpecting) {
						currentlyExpecting.received = data;
						currentlyExpecting.notifyAll();
					}
			return r;
				}
			} catch (InterruptedException e) {

//		public void removeReceivable(Receivable r) {
//			synchronized (receivables) {
//				receivables.remove(r);
//				interrupt();
//			}
//		}

		public void terminate() {
			this.stopped = true;
			this.interrupt();
			} catch (AssertionError e) {
				currentlyExpecting.error = e;
			}
		}

	public class Receivable<T> {
		private T data;
		ReceiveMethod<T> receive;

		public Receivable(ReceiveMethod<T> receive) {
			this.receive = receive;
	}

		public synchronized void completed() {
			notifyAll();
	public void stop() {
		this.stopped = true;
		this.thread.interrupt();
	}

		public synchronized T get() {
	public Data receive(Data expected) throws StopException {
		Expectable expectable = new Expectable(expected);
		synchronized (expecting) {
			expecting.add(expectable);
			expecting.notifyAll();
		}
		try {
			synchronized (expectable) {
				try {
				wait();
					expectable.wait();
				} catch (InterruptedException e) {
					return null;
				}
			return data;
				if (expectable.error != null)
					throw expectable.error;
				return expectable.received;
			}
		} finally {
			expecting.remove(expectable);
		}
	}

	public Data ignoreUntil(Data expected) {
		// XXX
		return null;
	}

	public interface ReceiveMethod<T> {
		T receive();
	public Data call(Object operation, Argument[] arguments, Data expectedReturn, Data expectedException) {
		// XXX
		return null;
	}

	class Expectable {
		Data expected;
		Data received;
		AssertionError error;
		boolean anyReceiver = false;

		public Expectable(Data expected) {
			this.expected = expected;
			this.anyReceiver = expected == null;
		}
	}
}
+39 −70
Original line number Diff line number Diff line
@@ -18,10 +18,9 @@ import org.etsi.mts.tdl.execution.java.tri.Connection;
import org.etsi.mts.tdl.execution.java.tri.Data;
import org.etsi.mts.tdl.execution.java.tri.GateReference;
import org.etsi.mts.tdl.execution.java.tri.PredefinedFunctions;
import org.etsi.mts.tdl.execution.java.tri.ProviderModule;
import org.etsi.mts.tdl.execution.java.tri.Receiver;
import org.etsi.mts.tdl.execution.java.tri.Reporter;
import org.etsi.mts.tdl.execution.java.tri.RuntimeHelper;
import org.etsi.mts.tdl.execution.java.tri.StopException;
import org.etsi.mts.tdl.execution.java.tri.SystemAdapter;
import org.etsi.mts.tdl.execution.java.tri.Validator;

@@ -29,6 +28,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;

public class TestControl {
	private Injector injector;

	public SystemAdapter systemAdapter;
	public Validator validator;
@@ -47,10 +47,10 @@ public class TestControl {
	private List<ExceptionalBehaviour> exceptionalBehaviours = Collections
			.synchronizedList(new ArrayList<ExceptionalBehaviour>());

	public TestControl(ProviderModule guiceModule) {
	public TestControl(com.google.inject.Module guiceModule) {
		super();

		Injector injector = Guice.createInjector(guiceModule);
		injector = Guice.createInjector(guiceModule);

		this.systemAdapter = injector.getInstance(SystemAdapter.class);
		this.validator = injector.getInstance(Validator.class);
@@ -58,13 +58,14 @@ public class TestControl {
		this.functions = injector.getInstance(PredefinedFunctions.class);
		this.runtimeHelper = injector.getInstance(RuntimeHelper.class);

//		this.interactionHub = new InteractionHub(this.systemAdapter);

		// XXX
		this.executor = Executors.newCachedThreadPool();
		this.completionService = new ExecutorCompletionService<ExecutionResult>(executor);
	}
	
	protected <A> A getInstance(Class<A> clazz) {
		return this.injector.getInstance(clazz);
	}
	
	private ReceiverHub getReceiver(Connection connection) {
		ReceiverHub hub = this.receiverHubs.get(connection);
		if (hub == null)
@@ -78,8 +79,24 @@ public class TestControl {
		
		this.receiverHubs.values().forEach(r -> r.stop());
		this.receiverHubs.clear();
		for (Connection c : connections)
			receiverHubs.put(c, new ReceiverHub(this.systemAdapter, c));
		for (Connection c : connections) {
			ReceiverHub hub = new ReceiverHub(this.systemAdapter, c);
			receiverHubs.put(c, hub);

			ExceptionalBehaviour anyReceiver = new ExceptionalBehaviour(false);
			anyReceiver.behaviour = () -> {
				throw new StopException("Unexpected message received");
			};
			anyReceiver.callable = new ExecutionCallable() {
				@Override
				public ExecutionResult call() throws Exception {
					Data data = hub.receive(null);
					return data != null ? new InteractionResult(data) : null;
				}
				
			};
			hub.setAnyReceiver(anyReceiver);
		}
	}

	public Connection getConnection(String testerComponentName, String testerGateName, String remoteComponentName,
@@ -140,15 +157,21 @@ public class TestControl {
		}
	}

	private abstract class ExecutionCallableWithDefaults extends ExecutionCallable {
	public abstract class ExecutionCallableWithDefaults extends ExecutionCallable {
		@Override
		public Future<ExecutionResult> execute() {
			Future<ExecutionResult> future = super.execute();
			exceptionalBehaviours.forEach(exc -> exc.execute());
			return future;
		}
	}
	
	public List<Future<ExecutionResult>> executeExceptionals() {
		List<Future<ExecutionResult>> futures = new ArrayList<>();
		exceptionalBehaviours.forEach(exc -> futures.add(exc.execute()));
		receiverHubs.values().forEach(hub -> futures.add(hub.getAnyReceiver().execute()));
		return futures;
	}

	public ExecutionCallable timeConstraint(Constraint constraint) {
		ExecutionCallable c = new ExecutionCallable() {
			@Override
@@ -171,7 +194,7 @@ public class TestControl {
			public ExecutionResult call() throws Exception {
				TimeLabel startLabel = timer.getStartLabel();
				try {
					long period = startLabel.currentTime() - (startLabel.previous() + timer.getPeriod());
					long period = startLabel.currentTime() - startLabel.previous() + timer.getPeriod();
					Thread.sleep(timer.getUnit().toMillis(period));
				} catch (InterruptedException e) {
					return null;
@@ -260,60 +283,6 @@ public class TestControl {
		return c;
	}

	class SyncReceiver implements Receiver {

		Connection connection;
		private String body;

		public SyncReceiver(Connection connection) {
			this.connection = connection;
		}

		@Override
		public void receive(String body, Connection connection) {
			if (this.connection == null || this.connection.equals(connection)) {
				synchronized (this) {
					this.body = body;
					notifyAll();
				}
			}
		}

		public synchronized String getData() throws InterruptedException {
			this.wait();
			return body;
		}

	}

	/**
	 * 
	 * @param connection A connection where data is expeced or null for receiving on
	 *                   all connections.
	 */
	public ExecutionCallable receiveAny(Connection connection) {
		ExecutionCallable c = new ExecutionCallable() {
			@Override
			public ExecutionResult call() throws Exception {
				SyncReceiver receiver = new SyncReceiver(connection);
				systemAdapter.addReceiver(receiver);
				try {
					String data = receiver.getData();
					if (data != null)
						return new InteractionResult(data);
					return null;

				} catch (InterruptedException e) {
					return null;

				} finally {
					systemAdapter.removeReceiver(receiver);
				}
			}
		};
		return c;
	}

	public void addExceptionalBehaviour(ExceptionalBehaviour b) {
		// TODO Inner exceptional behaviours have precedence
		synchronized (exceptionalBehaviours) {
@@ -339,10 +308,10 @@ public class TestControl {

	public Future<ExecutionResult> next() {
		try {
			Future<ExecutionResult> future = this.completionService.take();

			// Cancel exceptionals

			Future<ExecutionResult> future;
			do {
				future = this.completionService.take();
			} while (future.isCancelled());
			return future;
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
+4 −0
Original line number Diff line number Diff line
@@ -15,6 +15,10 @@ public enum TimeUnit {
		return time * getNsDenominator() / MilliSecond.getNsDenominator();
	}

	public long toSeconds(long time) {
		return time * getNsDenominator() / Second.getNsDenominator();
	}
	
	int getNsDenominator() {
		switch (this) {
		case Second:
Loading