Skip to content

Commit

Permalink
Refactor to support arbitrary clusters (#3)
Browse files Browse the repository at this point in the history
* Refactor to support  arbitrary clusters

* Update README.md
  • Loading branch information
jxnu-liguobin authored Dec 13, 2023
1 parent a7bfb0b commit 110a916
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 170 deletions.
30 changes: 19 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Support Java 8+, Scala 3, Scala 2.13 and Scala 2.12

**sbt**:
```scala
"io.github.jxnu-liguobin" %% "testcontainers-nebula" % 'latest version' % Test
"io.github.jxnu-liguobin" %% "testcontainers-nebula" % "latest version" % Test
```

**maven**:
Expand All @@ -41,20 +41,28 @@ testImplementation group: 'io.github.jxnu-liguobin', name: 'testcontainers-nebul

## Usage Instructions

Create a cluster with three nodes, this means that 3 metad nodes, 3 storaged nodes, 3 graphd nodes and 1 console will be created:
```java
// Logs and data are mounted to the current directory.
NebulaClusterContainer cluster = new ArbitraryNebulaCluster(3, "v3.6.0", Optional.of("./"));
cluster.start();
```

In general, it is only necessary to create a cluster with one node, this means that 1 metad node, 1 storaged node, 1 graphd node and 1 console will be created:
```java
// Logs and data are mounted to the current directory.
NebulaClusterContainer cluster = new ArbitraryNebulaCluster(1, "v3.6.0", Optional.of("./"));
cluster.start();
```
![testcontainers_nebula](testcontainers_nebula.png)

Java example: [SimpleNebulaCluster](./examples/src/main/java/testcontainers/containers/SimpleNebulaCluster.java)

ZIO example: [NebulaSpec](./zio/src/test/scala/testcontainers/containers/znebula/NebulaSpec.scala)

The zio module provides default configurations for better integration with zio-nebula, just adding dependency:
```scala
"io.github.jxnu-liguobin" %% "testcontainers-nebula-zio" % 'latest version'
"io.github.jxnu-liguobin" %% "testcontainers-nebula-zio" % "latest version"
// testcontainers-nebula-zio depends on zio-nebula dependency
"io.github.jxnu-liguobin" %% "zio-nebula" % 'latest version'
```

Details:

1. `NebulaSimpleClusterContainer.scala` creates four container instances: graphd,metad,storaged,console.
2. `NebulaClusterContainer.scala` provides a generic definition, and any number of clusters can be created by implementing its abstraction methods, ports and volumes can be modified.

![testcontainers_nebula](testcontainers_nebula.png)
"io.github.jxnu-liguobin" %% "zio-nebula" % "latest version"
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package testcontainers.containers

import scala.jdk.OptionConverters._

/**
* The simplest Nebula service, with one storaged, one metad, and one graphd.
* @param cluster
* The number of graphd/metad/storaged in the cluster
* @param version
* The image version/tag
* @param absoluteHostPathPrefix
* The prefix of your host path, eg: prefix/data/meta1:/data/meta, prefix/logs/meta1:/logs
*/
final case class ArbitraryNebulaCluster(
cluster: Int = 1,
version: String = Nebula.DefaultTag,
absoluteHostPathPrefix: Option[String] = None,
subnetIp: String = "172.28.0.0/16"
) extends NebulaClusterContainer(cluster, version, absoluteHostPathPrefix, subnetIp) {

def this(version: String) =
this(1, version, java.util.Optional.empty().toScala)

def this(cluster: Int) =
this(cluster, Nebula.DefaultTag, java.util.Optional.empty().toScala)

def this(cluster: Int, version: String, absoluteBindingPath: java.util.Optional[String]) =
this(cluster, version, absoluteBindingPath.toScala)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@ import testcontainers.containers.Nebula.dockerClient
* @version 1.0,2023/9/18
*/
object NebulaClusterContainer {
private val logger = LoggerFactory.getLogger(classOf[NebulaClusterContainer])
}

abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
protected def generateIpAddrs(ipPortMapping: List[(String, Int)]): String =
ipPortMapping.map(kv => s"${kv._1}:${kv._2}").mkString(",")

import NebulaClusterContainer._
protected def increaseLastIp(ip: String, num: Int): String = {
if (ip == null) {
throw new IllegalStateException("IPAddress cannot be null!")
}
val ipSplits = ip.split("\\.").toList
val last = ipSplits.last.toInt
ipSplits.updated(ipSplits.size - 1, last + num).mkString(".")
}

protected def gatewayIp: String = {
protected def gatewayIp(subnetIp: String): String = {
if (subnetIp == null) {
throw new IllegalStateException("subnetIp cannot be null")
}
Expand All @@ -44,7 +50,29 @@ abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
increaseLastIp(sub, 1)
}

protected val nebulaNet: Network =
private def getClusterPortList(cluster: Int, port: Int): Seq[Int] =
0 until cluster map { i =>
port + i
}

private def getClusterIpList(cluster: Int, subnetIp: String, offset: Int): Seq[String] =
1 to cluster map { i =>
increaseLastIp(gatewayIp(subnetIp), i + offset)
}

private val logger = LoggerFactory.getLogger(classOf[NebulaClusterContainer])
}

abstract class NebulaClusterContainer(
cluster: Int,
version: String,
absoluteHostPathPrefix: Option[String],
subnetIp: String
) extends Startable {

import NebulaClusterContainer._

private val nebulaNet: Network =
Network
.builder()
.createNetworkCmdModifier { cmd =>
Expand All @@ -53,42 +81,81 @@ abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
.withIpam(
new Ipam()
.withDriver(Nebula.NetworkType)
.withConfig(new Ipam.Config().withSubnet(subnetIp).withGateway(gatewayIp))
.withConfig(new Ipam.Config().withSubnet(subnetIp).withGateway(gatewayIp(subnetIp)))
)
}
.build()
protected val metaIpPortMapping: List[(String, Int)]
protected val storageIpMapping: List[(String, Int)]
protected val graphIpMapping: List[(String, Int)]

protected lazy val metaAddrs: String = generateIpAddrs(metaIpPortMapping)
private val metaIpPortMapping: List[(String, Int)] =
getClusterIpList(cluster, subnetIp, 0).zip(getClusterPortList(cluster, Nebula.MetadExposedPort)).toList

protected def generateIpAddrs(ipPortMapping: List[(String, Int)]): String =
ipPortMapping.map(kv => s"${kv._1}:${kv._2}").mkString(",")
// Does the IP have to be self-incrementing?
private val storageIpMapping: List[(String, Int)] =
getClusterIpList(cluster, subnetIp, metaIpPortMapping.size)
.zip(getClusterPortList(cluster, Nebula.StoragedExposedPort))
.toList

private val graphIpMapping: List[(String, Int)] =
getClusterIpList(cluster, subnetIp, storageIpMapping.size + metaIpPortMapping.size)
.zip(getClusterPortList(cluster, Nebula.GraphdExposedPort))
.toList

protected lazy val metaAddrs: String = generateIpAddrs(metaIpPortMapping)

private lazy val ryukContainerId: String = {
val containersResponse = Nebula.TestcontainersRyukContainer
containersResponse.map(_.getId).orNull
}

protected def increaseLastIp(ip: String, num: Int): String = {
if (ip == null) {
throw new IllegalStateException("IPAddress cannot be null!")
}
val ipSplits = ip.split("\\.").toList
val last = ipSplits.last.toInt
ipSplits.updated(ipSplits.size - 1, last + num).mkString(".")
}

protected val metads: List[GenericContainer[_]]

protected val storageds: List[GenericContainer[_]]

protected val graphds: List[GenericContainer[_]]
logger.info(s"Nebula meta nodes started at ip: ${generateIpAddrs(metaIpPortMapping)}")
logger.info(s"Nebula storage nodes started at ip: ${generateIpAddrs(storageIpMapping)}")
logger.info(s"Nebula graph nodes started at ip: ${generateIpAddrs(graphIpMapping)}")

private lazy val metads: List[GenericContainer[_]] =
metaIpPortMapping.zipWithIndex.map { case ((ip, port), i) =>
new NebulaMetadContainer(
version,
ip,
metaAddrs,
NebulaMetadContainer.defaultPortBindings(port),
absoluteHostPathPrefix.fold(List.empty[NebulaVolume])(p => NebulaMetadContainer.defaultVolumeBindings(p, i)),
i
)
}.map(_.withNetwork(nebulaNet))

private lazy val storageds: List[GenericContainer[_]] =
storageIpMapping.zipWithIndex.map { case ((ip, port), i) =>
new NebulaStoragedContainer(
version,
ip,
metaAddrs,
NebulaStoragedContainer.defaultPortBindings(port),
absoluteHostPathPrefix.fold(List.empty[NebulaVolume])(p => NebulaStoragedContainer.defaultVolumeBindings(p, i)),
i
)
}.map(_.dependsOn(metads: _*)).map(_.withNetwork(nebulaNet))

private lazy val graphds: List[GenericContainer[_]] =
graphIpMapping.zipWithIndex.map { case ((ip, port), i) =>
new NebulaGraphdContainer(
version,
ip,
metaAddrs,
NebulaGraphdContainer.defaultPortBindings(port),
absoluteHostPathPrefix.fold(List.empty[NebulaVolume])(p => NebulaGraphdContainer.defaultVolumeBindings(p, i)),
i
)
}.map(_.dependsOn(metads: _*)).map(_.withNetwork(nebulaNet))

protected val console: NebulaConsoleContainer
private lazy val console: NebulaConsoleContainer = new NebulaConsoleContainer(
version,
graphdIp = graphIpMapping.head._1,
graphdPort = graphIpMapping.head._2,
storagedAddrs = storageIpMapping
).withNetwork(nebulaNet)

def existsRunningContainer: Boolean
def existsRunningContainer: Boolean =
metads.exists(_.isRunning) || storageds.exists(_.isRunning) || graphds.exists(_.isRunning) || console.isRunning

private def awaitMappedPort[S <: GenericContainer[S]](container: GenericContainer[S], exposedPort: Int): Int = {
val containerId = await()
Expand All @@ -112,6 +179,18 @@ abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
}

final override def start(): Unit = {
metads.foreach { md =>
md.start()
Unreliables.retryUntilTrue(
Nebula.StartTimeout.getSeconds.toInt,
TimeUnit.SECONDS,
() => {
val g = md.execInContainer("ps", "-ef").getStdout
g != null && g.contains(Nebula.MetadName)
}
)
}

storageds.foreach { sd =>
sd.start()
Unreliables.retryUntilTrue(
Expand Down Expand Up @@ -142,6 +221,7 @@ abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
() => {
// we are waiting to try the storage service online
val g = console.execInContainer(console.showHostsCommand: _*).getStdout
logger.debug(g)
g != null && g.contains("ONLINE") && !g.contains("OFFLINE")
}
)
Expand All @@ -157,40 +237,40 @@ abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
running = containerInfo.getState != null && true == containerInfo.getState.getRunning
} catch {
case e: NotFoundException =>
logger.trace(s"Was going to stop container but it apparently no longer exists: ${ryukContainerId}")
logger.debug(s"Was going to stop container but it apparently no longer exists: $ryukContainerId")
return
case e: Exception =>
logger.trace(
logger.debug(
s"Error encountered when checking container for shutdown (ID: $ryukContainerId) - it may not have been stopped, or may already be stopped. Root cause: ${Throwables.getRootCause(e).getMessage}"
)
return
}

if (running) try {
logger.trace(s"Stopping container: $ryukContainerId")
logger.debug(s"Stopping container: $ryukContainerId")
dockerClient.killContainerCmd(ryukContainerId).exec
logger.trace(s"Stopped container: ${Nebula.Ryuk.stripPrefix("/")}")
logger.debug(s"Stopped container: ${Nebula.Ryuk.stripPrefix("/")}")
} catch {
case e: Exception =>
logger.trace(
logger.debug(
s"Error encountered shutting down container (ID: $ryukContainerId) - it may not have been stopped, or may already be stopped. Root cause: ${Throwables.getRootCause(e).getMessage}"
)
}

try dockerClient.inspectContainerCmd(ryukContainerId).exec
catch {
case e: Exception =>
logger.trace(s"Was going to remove container but it apparently no longer exists: $ryukContainerId")
logger.debug(s"Was going to remove container but it apparently no longer exists: $ryukContainerId")
return
}

try {
logger.trace(s"Removing container: $ryukContainerId")
logger.debug(s"Removing container: $ryukContainerId")
dockerClient.removeContainerCmd(ryukContainerId).withRemoveVolumes(true).withForce(true).exec
logger.debug(s"Removed container and associated volume(s): ${Nebula.Ryuk.stripPrefix("/")}")
} catch {
case e: Exception =>
logger.trace(
logger.debug(
s"Error encountered shutting down container (ID: $ryukContainerId) - it may not have been stopped, or may already be stopped. Root cause: ${Throwables.getRootCause(e).getMessage}"
)
}
Expand All @@ -206,7 +286,7 @@ abstract class NebulaClusterContainer(subnetIp: String) extends Startable {
}
} catch {
case e: Throwable =>
logger.error("Stopped all containers failed", e)
logger.warn(s"Stopped all containers failed: ${e.getMessage}")
}

final def allContainers: List[GenericContainer[_]] = metads ++ storageds ++ graphds ++ List(console)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object NebulaGraphdContainer {
)
)

def defaultVolumeBindings(absoluteHostPathPrefix: String, instanceIndex: Int = 1): List[NebulaVolume] =
def defaultVolumeBindings(absoluteHostPathPrefix: String, instanceIndex: Int): List[NebulaVolume] =
List(
NebulaVolume(
absoluteHostPathPrefix + Nebula.GraphdLogPath + instanceIndex,
Expand All @@ -48,7 +48,7 @@ final class NebulaGraphdContainer(
metaAddrs: String,
portsBindings: List[PortBinding],
bindings: List[NebulaVolume],
instanceIndex: Int = 1
instanceIndex: Int
) =
this(Nebula.DefaultGraphdImageName.withTag(version), containerIp, metaAddrs, portsBindings, bindings, instanceIndex)

Expand All @@ -57,7 +57,7 @@ final class NebulaGraphdContainer(
override def commands(containerIp: String, metaAddrs: String): Seq[String] =
Seq(
s"--meta_server_addrs=$metaAddrs",
s"--port=$GraphdExposedPort",
s"--port=${portsBindings.headOption.flatMap(_.getBinding.getHostPortSpec.split(":").headOption).getOrElse(Nebula.GraphdExposedPort)}",
s"--local_ip=$containerIp",
s"--log_dir=$GraphdLogPath",
s"--v=$LOGLevel",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object NebulaMetadContainer {
)
)

def defaultVolumeBindings(absoluteHostPathPrefix: String, instanceIndex: Int = 1): List[NebulaVolume] =
def defaultVolumeBindings(absoluteHostPathPrefix: String, instanceIndex: Int): List[NebulaVolume] =
List(
NebulaVolume(
absoluteHostPathPrefix + Nebula.MetadLogPath + instanceIndex,
Expand Down Expand Up @@ -52,7 +52,7 @@ final class NebulaMetadContainer(
metaAddrs: String,
portsBindings: List[PortBinding],
bindings: List[NebulaVolume],
instanceIndex: Int = 1
instanceIndex: Int
) =
this(Nebula.DefaultMetadImageName.withTag(version), containerIp, metaAddrs, portsBindings, bindings, instanceIndex)

Expand All @@ -62,7 +62,7 @@ final class NebulaMetadContainer(
Seq(
s"--meta_server_addrs=$metaAddrs",
s"--local_ip=$containerIp",
s"--port=$MetadExposedPort",
s"--port=${portsBindings.headOption.flatMap(_.getBinding.getHostPortSpec.split(":").headOption).getOrElse(Nebula.MetadExposedPort)}",
s"--log_dir=$MetadLogPath",
s"--v=$LOGLevel",
s"--minloglevel=$MinLogLevel"
Expand Down
Loading

0 comments on commit 110a916

Please sign in to comment.