mirror of
https://github.com/corda/corda.git
synced 2025-01-18 10:46:38 +00:00
Merging changes in groovy to new java file
This commit is contained in:
parent
c32e74b462
commit
33dd494e30
1
.idea/codeStyles/codeStyleConfig.xml
generated
1
.idea/codeStyles/codeStyleConfig.xml
generated
@ -1,5 +1,6 @@
|
||||
<component name="ProjectCodeStyleConfiguration">
|
||||
<state>
|
||||
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
|
||||
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
|
||||
</state>
|
||||
</component>
|
@ -3,8 +3,9 @@ package net.corda.testing;
|
||||
import io.fabric8.kubernetes.api.model.*;
|
||||
import io.fabric8.kubernetes.client.*;
|
||||
import io.fabric8.kubernetes.client.dsl.ExecListener;
|
||||
import io.fabric8.kubernetes.client.dsl.ExecWatch;
|
||||
import io.fabric8.kubernetes.client.dsl.PodResource;
|
||||
import io.fabric8.kubernetes.client.utils.Serialization;
|
||||
import net.corda.testing.retry.Retry;
|
||||
import okhttp3.Response;
|
||||
import org.gradle.api.DefaultTask;
|
||||
import org.gradle.api.tasks.TaskAction;
|
||||
@ -22,7 +23,10 @@ import java.util.stream.IntStream;
|
||||
public class KubesTest extends DefaultTask {
|
||||
|
||||
static final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor();
|
||||
/**
|
||||
* Name of the k8s Secret object that holds the credentials to access the docker image registry
|
||||
*/
|
||||
static final String REGISTRY_CREDENTIALS_SECRET_NAME = "regcred";
|
||||
|
||||
String dockerTag;
|
||||
String fullTaskToExecutePath;
|
||||
@ -46,12 +50,11 @@ public class KubesTest extends DefaultTask {
|
||||
String buildId = System.getProperty("buildId", "0");
|
||||
String currentUser = System.getProperty("user.name", "UNKNOWN_USER");
|
||||
|
||||
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase();
|
||||
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase();
|
||||
String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode()));
|
||||
String suffix = rnd64Base36(new Random());
|
||||
|
||||
final KubernetesClient client = getKubernetesClient();
|
||||
|
||||
|
||||
try {
|
||||
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
|
||||
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
|
||||
@ -63,10 +66,9 @@ public class KubesTest extends DefaultTask {
|
||||
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
|
||||
}
|
||||
|
||||
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
|
||||
String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase();
|
||||
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
|
||||
return runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
|
||||
List<Future<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
|
||||
String podName = taskToExecuteName.toLowerCase()+ "-" + stableRunId + "-" + suffix + "-" + i;
|
||||
return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
this.testOutput = Collections.synchronizedList(futures.stream().map(it -> {
|
||||
@ -98,85 +100,137 @@ public class KubesTest extends DefaultTask {
|
||||
return new DefaultKubernetesClient(config);
|
||||
}
|
||||
|
||||
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
|
||||
String namespace,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries) {
|
||||
|
||||
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<>();
|
||||
executorService.submit(() -> {
|
||||
int tryCount = 0;
|
||||
Pod createdPod = null;
|
||||
while (tryCount < numberOfRetries) {
|
||||
try {
|
||||
Pod podRequest = buildPod(podName);
|
||||
getProject().getLogger().lifecycle("requesting pod: " + podName);
|
||||
createdPod = client.pods().inNamespace(namespace).create(podRequest);
|
||||
getProject().getLogger().lifecycle("scheduled pod: " + podName);
|
||||
File outputFile = Files.createTempFile("container", ".log").toFile();
|
||||
attachStatusListenerToPod(client, namespace, podName);
|
||||
schedulePodForDeleteOnShutdown(podName, client, createdPod);
|
||||
waitForPodToStart(podName, client, namespace);
|
||||
PipedOutputStream stdOutOs = new PipedOutputStream();
|
||||
PipedInputStream stdOutIs = new PipedInputStream(4096);
|
||||
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
|
||||
KubePodResult result = new KubePodResult(createdPod, outputFile);
|
||||
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>();
|
||||
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result);
|
||||
stdOutIs.connect(stdOutOs);
|
||||
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
|
||||
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
|
||||
.writingOutput(stdOutOs)
|
||||
.writingErrorChannel(errChannelStream)
|
||||
.usingListener(execListener).exec(buildCommand);
|
||||
|
||||
startLogPumping(outputFile, stdOutIs, podIdx, printOutput);
|
||||
KubePodResult execResult = waiter.join();
|
||||
getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ")");
|
||||
getLogger().lifecycle("Gathering test results from " + execResult.getCreatedPod().getMetadata().getName());
|
||||
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, execResult.getCreatedPod());
|
||||
getLogger().lifecycle("deleting: " + execResult.getCreatedPod().getMetadata().getName());
|
||||
client.resource(execResult.getCreatedPod()).delete();
|
||||
result.setBinaryResults(binaryResults);
|
||||
toReturn.complete(result);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Encountered error during testing cycle on pod " + podName + " (" + podIdx / numberOfPods + ")", e);
|
||||
try {
|
||||
if (createdPod != null) {
|
||||
client.pods().delete(createdPod);
|
||||
while (client.pods().inNamespace(namespace).list().getItems().stream().anyMatch(p -> Objects.equals(p.getMetadata().getName(), podName))) {
|
||||
getLogger().warn("pod " + podName + " has not been deleted, waiting 1s");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
tryCount++;
|
||||
getLogger().lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times");
|
||||
}
|
||||
}
|
||||
if (tryCount >= numberOfRetries) {
|
||||
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"));
|
||||
}
|
||||
});
|
||||
return toReturn;
|
||||
static String rnd64Base36(Random rnd) {
|
||||
return new BigInteger(64, rnd)
|
||||
.toString(36)
|
||||
.toLowerCase();
|
||||
}
|
||||
|
||||
Future<KubePodResult> submitBuild(
|
||||
KubernetesClient client,
|
||||
String namespace,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries
|
||||
) {
|
||||
return executorService.submit(new Callable<KubePodResult>() {
|
||||
@Override
|
||||
public KubePodResult call() throws Exception {
|
||||
PersistentVolumeClaim pvc = createPvc(client, podName);
|
||||
return buildRunPodWithRetriesOrThrow(client, namespace, pvc, numberOfPods, podIdx, podName, printOutput, numberOfRetries);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void addShutdownHook(Runnable hook) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(hook));
|
||||
}
|
||||
|
||||
PersistentVolumeClaim createPvc(KubernetesClient client, String name) {
|
||||
PersistentVolumeClaim pvc = client.persistentVolumeClaims()
|
||||
.inNamespace(namespace)
|
||||
.createNew()
|
||||
|
||||
.editOrNewMetadata().withName(name).endMetadata()
|
||||
|
||||
.editOrNewSpec()
|
||||
.withAccessModes("ReadWriteOnce")
|
||||
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
|
||||
.endSpec()
|
||||
|
||||
.done();
|
||||
|
||||
addShutdownHook(() -> {
|
||||
System.out.println("Deleing PVC: " + pvc.getMetadata().getName());
|
||||
client.persistentVolumeClaims().delete(pvc);
|
||||
});
|
||||
return pvc;
|
||||
}
|
||||
|
||||
KubePodResult buildRunPodWithRetriesOrThrow(
|
||||
KubernetesClient client,
|
||||
String namespace,
|
||||
PersistentVolumeClaim pvc,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries
|
||||
) {
|
||||
addShutdownHook(() -> {
|
||||
System.out.println("deleting pod: " + podName);
|
||||
client.pods().inNamespace(namespace).withName(podName).delete();
|
||||
});
|
||||
|
||||
try {
|
||||
// pods might die, so we retry
|
||||
return Retry.fixed(numberOfRetries).run(()-> {
|
||||
// remove pod if exists
|
||||
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
|
||||
if (oldPod.get() != null) {
|
||||
getLogger().lifecycle("deleting pod: $podName");
|
||||
oldPod.delete();
|
||||
while (oldPod.get() != null) {
|
||||
getLogger().info("waiting for pod $podName to be removed");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
// recreate and run
|
||||
getProject().getLogger().lifecycle("creating pod: " + podName);
|
||||
Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName, pvc));
|
||||
getProject().getLogger().lifecycle("scheduled pod: " + podName);
|
||||
|
||||
attachStatusListenerToPod(client, createdPod);
|
||||
waitForPodToStart(client, createdPod);
|
||||
|
||||
PipedOutputStream stdOutOs = new PipedOutputStream();
|
||||
PipedInputStream stdOutIs = new PipedInputStream(4096);
|
||||
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
|
||||
|
||||
CompletableFuture<Integer> waiter = new CompletableFuture<>();
|
||||
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
|
||||
stdOutIs.connect(stdOutOs);
|
||||
client.pods().inNamespace(namespace).withName(podName)
|
||||
.writingOutput(stdOutOs)
|
||||
.writingErrorChannel(errChannelStream)
|
||||
.usingListener(execListener)
|
||||
.exec(getBuildCommand(numberOfPods, podIdx));
|
||||
|
||||
File podOutput = startLogPumping(stdOutIs, podIdx, printOutput);
|
||||
int resCode = waiter.join();
|
||||
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results");
|
||||
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, createdPod);
|
||||
return new KubePodResult(resCode, podOutput, binaryResults);
|
||||
});
|
||||
} catch (Retry.RetryException e) {
|
||||
throw new RuntimeException("Failed to build in pod " + podName + " ("+podIdx+"/"+numberOfPods+") in " + numberOfRetries + " attempts", e);
|
||||
}
|
||||
}
|
||||
|
||||
Pod buildPodRequest(String podName, PersistentVolumeClaim pvc) {
|
||||
return new PodBuilder()
|
||||
.withNewMetadata().withName(podName).endMetadata()
|
||||
|
||||
Pod buildPod(String podName) {
|
||||
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
|
||||
.withNewSpec()
|
||||
|
||||
.addNewVolume()
|
||||
.withName("gradlecache")
|
||||
.withNewHostPath()
|
||||
.withPath("/tmp/gradle")
|
||||
.withType("DirectoryOrCreate")
|
||||
.withPath("/tmp/gradle")
|
||||
.endHostPath()
|
||||
.endVolume()
|
||||
|
||||
.addNewVolume()
|
||||
.withName("testruns")
|
||||
.withNewPersistentVolumeClaim()
|
||||
.withClaimName(pvc.getMetadata().getName())
|
||||
.endPersistentVolumeClaim()
|
||||
.endVolume()
|
||||
|
||||
.addNewContainer()
|
||||
.withImage(dockerTag)
|
||||
.withCommand("bash")
|
||||
@ -192,18 +246,19 @@ public class KubesTest extends DefaultTask {
|
||||
.addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString()))
|
||||
.addToRequests("memory", new Quantity(memoryGbPerFork.toString() + "Gi"))
|
||||
.endResources()
|
||||
.addNewVolumeMount()
|
||||
.withName("gradlecache")
|
||||
.withMountPath("/tmp/gradle")
|
||||
.endVolumeMount()
|
||||
.addNewVolumeMount().withName("gradlecache").withMountPath("/tmp/gradle").endVolumeMount()
|
||||
.addNewVolumeMount().withName("testruns").withMountPath("/test-runs").endVolumeMount()
|
||||
.endContainer()
|
||||
.withImagePullSecrets(new LocalObjectReference("regcred"))
|
||||
|
||||
.addNewImagePullSecret(REGISTRY_CREDENTIALS_SECRET_NAME)
|
||||
.withRestartPolicy("Never")
|
||||
|
||||
.endSpec()
|
||||
.build();
|
||||
}
|
||||
|
||||
void startLogPumping(File outputFile, InputStream stdOutIs, Integer podIdx, boolean printOutput) {
|
||||
File startLogPumping(InputStream stdOutIs, int podIdx, boolean printOutput) throws IOException {
|
||||
File outputFile = Files.createTempFile("container", ".log").toFile();
|
||||
Thread loggingThread = new Thread(() -> {
|
||||
try (BufferedWriter out = new BufferedWriter(new FileWriter(outputFile));
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(stdOutIs))) {
|
||||
@ -222,10 +277,11 @@ public class KubesTest extends DefaultTask {
|
||||
|
||||
loggingThread.setDaemon(true);
|
||||
loggingThread.start();
|
||||
return outputFile;
|
||||
}
|
||||
|
||||
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
|
||||
return client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
|
||||
Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) {
|
||||
return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
|
||||
@Override
|
||||
public void eventReceived(Watcher.Action action, Pod resource) {
|
||||
getProject().getLogger().lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name() + " (" + resource.getStatus().getPhase() + ")");
|
||||
@ -237,14 +293,14 @@ public class KubesTest extends DefaultTask {
|
||||
});
|
||||
}
|
||||
|
||||
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
|
||||
getProject().getLogger().lifecycle("Waiting for pod " + podName + " to start before executing build");
|
||||
void waitForPodToStart(KubernetesClient client, Pod pod) {
|
||||
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
|
||||
try {
|
||||
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
|
||||
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
getProject().getLogger().lifecycle("pod " + podName + " has started, executing build");
|
||||
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
|
||||
}
|
||||
|
||||
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
|
||||
@ -291,15 +347,7 @@ public class KubesTest extends DefaultTask {
|
||||
return folders;
|
||||
}
|
||||
|
||||
void schedulePodForDeleteOnShutdown(String podName, KubernetesClient client, Pod createdPod) {
|
||||
getProject().getLogger().info("attaching shutdown hook for pod " + podName);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
System.out.println("Deleting pod: " + podName);
|
||||
client.pods().delete(createdPod);
|
||||
}));
|
||||
}
|
||||
|
||||
ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
|
||||
ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture) {
|
||||
|
||||
return new ExecListener() {
|
||||
final Long start = System.currentTimeMillis();
|
||||
@ -326,8 +374,7 @@ public class KubesTest extends DefaultTask {
|
||||
.flatMap(c -> c.stream().findFirst())
|
||||
.map(StatusCause::getMessage)
|
||||
.map(Integer::parseInt).orElse(0);
|
||||
result.setResultCode(resultCode);
|
||||
waitingFuture.complete(result);
|
||||
waitingFuture.complete(resultCode);
|
||||
} catch (Exception e) {
|
||||
waitingFuture.completeExceptionally(e);
|
||||
}
|
||||
@ -335,5 +382,4 @@ public class KubesTest extends DefaultTask {
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user