diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt b/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt index ec7c112062..05d08f0bf4 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/process/Command.kt @@ -4,7 +4,7 @@ import net.corda.behave.await import net.corda.behave.file.currentDirectory import net.corda.behave.process.output.OutputListener import net.corda.behave.waitFor -import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.contextLogger import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import rx.Observable @@ -20,8 +20,10 @@ open class Command( private val directory: Path = currentDirectory, private val timeout: Duration = 2.minutes ): Closeable { - - protected val log = loggerFor() + companion object { + private val WAIT_BEFORE_KILL: Duration = 5.seconds + private val log = contextLogger() + } private val terminationLatch = CountDownLatch(1) @@ -36,21 +38,16 @@ open class Command( var exitCode = -1 private set - val output: Observable = Observable.create({ emitter -> + val output: Observable = Observable.create { emitter -> outputListener = object : OutputListener { - override fun onNewLine(line: String) { - emitter.onNext(line) - } - - override fun onEndOfStream() { - emitter.onCompleted() - } + override fun onNewLine(line: String) = emitter.onNext(line) + override fun onEndOfStream() = emitter.onCompleted() } - }).share() + }.share() private val thread = Thread(Runnable { try { - log.info("Command: $command") + log.info("Executing command: $command from directory: $directory") val processBuilder = ProcessBuilder(command) .directory(directory.toFile()) .redirectErrorStream(true) @@ -132,12 +129,13 @@ open class Command( } override fun close() { + if (process?.isAlive == true) { + kill() + } waitFor() } - fun run() = use { _ -> } - - fun use(action: (Command) -> Unit): Int { + fun run(action: (Command) -> Unit = { }): Int { use { start() action(this) @@ -145,8 +143,8 @@ open class Command( return exitCode } - fun use(subscriber: Subscriber, action: (Command, Observable) -> Unit = { _, _ -> }): Int { - use { + fun run(subscriber: Subscriber, action: (Command, Observable) -> Unit = { _, _ -> }): Int { + run { output.subscribe(subscriber) start() action(this, output) @@ -155,11 +153,4 @@ open class Command( } override fun toString() = "Command(${command.joinToString(" ")})" - - companion object { - - private val WAIT_BEFORE_KILL: Duration = 5.seconds - - } - } diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/service/ContainerService.kt b/experimental/behave/src/main/kotlin/net/corda/behave/service/ContainerService.kt index 71f52a6c5a..70763d06f1 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/service/ContainerService.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/service/ContainerService.kt @@ -54,7 +54,6 @@ abstract class ContainerService( log.info("Container $id info: $info") client.startContainer(id) - true } catch (e: Exception) { id = null @@ -79,9 +78,7 @@ abstract class ContainerService( override fun checkPrerequisites() { if (!client.listImages().any { true == it.repoTags()?.contains(imageReference) }) { log.info("Pulling image $imageReference ...") - client.pull(imageReference, { _ -> - run { } - }) + client.pull(imageReference) { } log.info("Image $imageReference downloaded") } } diff --git a/experimental/behave/src/test/kotlin/net/corda/behave/process/CommandTests.kt b/experimental/behave/src/test/kotlin/net/corda/behave/process/CommandTests.kt index 747c82cdae..88152420dd 100644 --- a/experimental/behave/src/test/kotlin/net/corda/behave/process/CommandTests.kt +++ b/experimental/behave/src/test/kotlin/net/corda/behave/process/CommandTests.kt @@ -20,7 +20,7 @@ class CommandTests { @Test fun `output stream for command can be observed`() { val subscriber = TestSubscriber() - val exitCode = Command(listOf("ls", "/")).use(subscriber) { _, _ -> + val exitCode = Command(listOf("ls", "/")).run(subscriber) { _, _ -> subscriber.awaitTerminalEvent() subscriber.assertCompleted() subscriber.assertNoErrors()