
One way to tackle these challenges is through systems that self-organise.
There are properties that we cannot renounce:
We have a tool that is natively modular and natively composable:
What if we find a set of abstractions compatible with functional programming that allow us to build self-organising systems?
repnbrif
The precursor of aggregate programming, based on the idea of amorphous computing.
(def gradient (src)
(letfed ((n infinity (mux src 0 (min-hood (+ (nbr n) (nbr-range))))))
n))
The first higher-order aggregate programming language.
def distanceTo(source) {
share (distance <- POSITIVE_INFINITY) {
mux (source) {
0
} else {
foldMin(POSITIVE_INFINITY, distance + self.nbrRange())
}
}
}
The first internal DSL implementing aggregate programming.
def distanceTo(source: Boolean): Double =
rep(Double.PositiveInfinity) (d => {
mux (source) { 0.0 } {
foldHoodPlus(Double.PositiveInfinity)(Math.min) {
nbr(d) + nbrRange
}
}
})
The first native (C++14) implementation of aggregate programming.
DEF() double abf(ARGS, bool source) { CODE
return nbr(CALL, INF, [&] (field<double> d) {
double v = source ? 0.0 : INF;
return min_hood(CALL, d + node.nbr_dist(), v);
});
}
| Properties | Protelis | ScaFi | FCPP |
|---|---|---|---|
| JVM compatibility | |||
| Android compatibility | ~ | ~ | ~ |
| JS compatibility | |||
| Native compatibility | ~ | ||
| iOS compatibility | |||
| Strictly typed | |||
| Transparent alignment | |||
| Complete alignment | |||
| Exchange support | ~ | ||
| Reified fields |
Internal DSLs have several desirable features:
but they also have some critical issues:
structurally-equal programs can communicate
branching must break alignment
Different frameworks make different choices:
foldHood)
but there is no way to have a Field-typed objectaggregate function), exposing a low-level mechanism and compromising performanceCollektive answers the question:
what if we modify the host language compiler to align code strongly and transparently?
Collektive used Kotlin to do so as:
| Properties | Protelis | ScaFi | FCPP | Collektive |
|---|---|---|---|---|
| JVM compatibility | ||||
| Android compatibility | ~ | ~ | ||
| JS compatibility | ||||
| Native compatibility | ||||
| iOS compatibility | ||||
| Strictly typed | ||||
| Transparent alignment | ||||
| Complete alignment | ||||
| Exchange support | ~ | |||
| Reified fields |
fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean) = share(Double.POSITIVE_INFINITY) { distances ->
val throughNeighbor = distances.minValue(Double.POSITIVE_INFINITY) + 1
when {
source -> 0.0
else -> throughNeighbor
}
}
throughNeighbor inside the when?Ints or Longs instead of Doubles?fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
share(Double.POSITIVE_INFINITY) { distances ->
val throughNeighbor = (distances + metric).minValue(Double.POSITIVE_INFINITY)
when {
source -> 0.0
else -> throughNeighbor
}
}
// Utility class, we can use Kotlin data types freely
data class DistanceValue<T>(val distance: Double, val value: T) : Comparable<DistanceValue<T>> {
operator fun plus(distance: Double): DistanceValue<T> = DistanceValue(this.distance + distance, value)
override fun compareTo(other: DistanceValue<T>): Int = distance.compareTo(other.distance)
override fun toString(): String = "$value@$distance"
}
// Simple broadcast implementation
inline fun <ID: Any, reified T> Aggregate<ID>.broadcast(source: Boolean, value: T): T {
val top = DistanceValue(infinity, value)
val myDistanceValue = share(top) { distancesToValues ->
val closest = distancesToValues.minValue() ?: top
if (source) DistanceValue(0.0, value) else closest + 1.0
}
return myDistanceValue.value
}
fun <ID: Any> Aggregate<ID>.distance(source: Boolean, destination: Boolean, metric: Field<ID, Double>) = broadcast(source, distanceTo(destination, metric))
fun <ID: Any> Aggregate<ID>.channel(source: Boolean, destination: Boolean, width: Double, metric: Field<ID, Double>): Boolean =
distanceTo(source, metric) + distanceTo(destination, metric) < distance(source, destination, metric) + width
// Short-circuiting boolean operations work as branches!
fun <ID: Any> Aggregate<ID>.channelAroundObstacles(isObstacle: Boolean, source: Boolean, destination: Boolean, width: Double, metric: Field<ID, Double>): Boolean =
!isObstacle && channel(source, destination, width, metric)
FieldAggregatePurelyLocalproject(Field)Collektive introduces the following important abstractions:
Field: a view of a value, enclosing is local value and the neighboring values
map and combined with alignedMapMaps, using toMap or excludeSelffold and reduce operationsAggregate: the context of aggregate operations. Provides (internally or through extension functions)
alignedOn(pivot: Any?, () -> Result): Result
exchanging(initial: Shared, body: YieldingScope<Field<ID, Shared>, Returned>): Field<ID, Shared>
exchange that shares a value and can return arbitrary valuesalignedOn could be rewritten in terms of exchanging, but it would be inefficientexchange(initial: Shared, body: (Field<ID, Shared>) -> Field<ID, Shared>): Field<ID, Shared>
exchange shares a value, computes over the neighborhood view of such value,
and returns a Field whose contents are sent back to every neighborneighboring(local: Shared): Field<ID, Shared>
mapNeighborhood(local: (ID) -> T): Field<ID, T>
share(initial: Shared, body: (Field<ID, Shared>) -> Shared): Shared
exchange that sends the same value to all neighborssharing(initial: Shared, body: YieldingScope<Field<ID, Shared>, Returned>) -> YieldingResult<Shared, Returned>): Returned
share that can return arbitrary valuesevolve(initial: Stored, transform: (Stored) -> Stored): Stored
initial and computing transform at each roundevolving(initial: Stored, transform: YieldingScope<Stored, Returned>): Returned
evolve, returning a ResultCollektive is designed for the aggregate code to meld into Kotlin natively. Names have been selected favoring a Kotlin-friendly syntax instead of the literature terms.
| Literature | Collektive |
|---|---|
rep |
evolve |
nbr |
neighboring |
share |
share |
exchange |
exchange |
All computations use Kotlin’s native types.
@Serializable
kotlinx.serialization to serialize the dataThere is a missing item in the previous table:
| Literature | Collektive |
|---|---|
rep |
evolve |
nbr |
neighboring |
share |
share |
exchange |
exchange |
if |
???? |
Branching in aggregate programming is domain segmentation: operations inside a branch are aligned only with the devices that are in the same branch.
// Device with ID 0
fun Aggregate<Int>.myAlignmentTest(): Unit {
val myField = mapNeighborhood { 1 }
println(myField) // φ(localId = 0, localValue = 1), neighbors = { 1 -> 1, 2 -> 1, 3 -> 1 }
when (localId % 2) {
1 -> println(myField) // Branch not taken
else -> {
println(mapNeighborhood { 2 }) // φ(localId = 0, localValue = 2), neighbors = { 2 -> 2 }
println(myField) // φ(localId = 0, localValue = 1), neighbors = { 2 -> 1 }
}
}
}
If we were okay with dealing with alignment manually, we could have used a “plain” DSL.
This is what a Bellman-Ford gradient would have looked like:
fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
alignedOn("Aggregate.distanceTo(Boolean)") { // We need to manually align to avoid clashing with other functions with a similar structure
share(Double.POSITIVE_INFINITY) { distances ->
alignedOn("share(Boolean)") { // We need to manually align again
val actualMetrics = project(metric) // The field comes from another context, hence needs projection
val throughNeighbor = distances.alignedMapValues(actualMetrics, Double::plus)
when {
source -> alignedOn(true) { 0.0 }
else -> alignedOn(false) { throughNeighbor } // We cannot run the computation here or the source will never send data!
}
}
}
}
Manually handling alignment and projection is akin to manual memory management in pure C: exposes a low-level mechanism and is very error-prone.
The Kotlin compiler is designed to be extended via compiler plugins.
@Serializable generates a serializer for that class under the hood.serialize and deserialize are generated at compile time, and appear in the IDEThe compiler plugin automatically injects calls to the alignment and projection functions where needed, so that the designer can write in “normal Kotlin”, letting the magic happen in the background:
fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
share(Double.POSITIVE_INFINITY) { distances ->
val throughNeighbor = distances.alignedMapValues(metric, Double::plus)
if (source) 0.0 else throughNeighbor
}
}
We can write our compiler plugin to statically analyze the code and provide hints. For instance, consider:
fun <ID : Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
evolve(Double.POSITIVE_INFINITY) {
val throughNeighbor = (neighboring(it) + metric).minValue(base = Double.POSITIVE_INFINITY)
if (source) 0.0 else throughNeighbor
}


Kotlin is typically compiled with Gradle, a build system that supports Kotlin natively.
The Collektive Kotlin compiler plugin needs to be applied to the Kotlin compilation process for aggregate code to be generated.
The standard way is to build a Gradle plugin that under the hood applies the Kotlin compiler plugin.
Declaring the plugin in the plugins block applies Collektive to the project:
plugins {
kotlin("jvm") // Or kotlin("multiplatform")
id("it.unibo.collektive.collektive-plugin") version "<collektive version>"
}
Once the plugin is applied, we need to import the Collektive DSL to write aggregate code.
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
}
In multiplatform projects:
kotlin {
sourceSets {
val commonMain by getting {
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
}
}
}
}
You are ready! Code using Aggregate contexts will get aligned and projected automatically by the compiler plugin.
The DSL module contains the bare minimum to write aggregate code.
When creating richer applications, a standard library is needed to provide common operations.
The standard library in collektive is built on top of the DSL module and provides:
The standard library can be imported in the same way as the DSL module:
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
implementation("it.unibo.collektive:collektive-stdlib:<collektive version>")
}
In multiplatform projects:
kotlin {
sourceSets {
val commonMain by getting {
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
implementation("it.unibo.collektive:collektive-stdlib:<collektive version>")
}
}
}
}
Collektivize (logo is temporary) is a Gradle plugin that generates “fielded” methods automatically.
When using aggregate programming, we would like to manipulate fields and other data structures as if they were “scalars”.
val x: Field<*, Double> = TODO()
val y: Field<*, Double> = TODO()
x.alignedMapValues(y) { a, b -> a + b } // Verbose!
x + y // Shorter and more readable!
x.mapValues { it * 3 } // Verbose!
x * 3 // Shorter and more readable!
neighboring(File(TODO()).readText()).map { it.lines().first() } // Verbose!
neighboring(File(TODO()).readText()).first()
Collektivize runs through existing Kotlin code and generates the “fielded” methods automatically. The project is still experimental, but we currently use it to generate fielded methods for primitives.
When developing classic software systems, we use tests and debuggers to guide us through the development process and to verify the correctness of our code.
Typically:
Simulation is a fundamental part of the aggregate software development process, and it is used in several ways:
There are two macro-categories of simulators for aggregate programming:
Often, languages have both, with the internal simulator often used for testing and, at most, prototypation.
https://alchemistsimulator.github.io/
Alchemist is a general-purpose simulator for networked systems.
Your code will be portable across simulators and real-world devices with no changes!
A playground has been prepared at https://github.com/DanySK/collektive-exercises. It includes:
Two files in the src/main/kotlin folder in the collektive.exercises package:
Playground.kt
Entrypoint.kt
We can play with it!
MAX_SEEDS=1 ./gradlew runAllGraphic./gradlew runAllGraphicMessage (pre-implemented) carries a payload. Can be
serialized or
in-memory (for simulations).OutboundEnvelope (pre-implemented),
can prepare messages for delivery to a specific neighborMailbox – platform-specific implementation!
NeighborsData with messages that are currently validMessage for a specific neighborNeighborsData – typically implemented on the fly as an anonymous object inside the Mailbox implementation
MailboxMailbox, a local identifier, (optionally) a SerializationFactory,
and the programval myAggregateDevice = Collektive(myId, myMailbox) { // Here the aggregate context is available!
val myMetric = neighboring(gpsPosition()).mapValues { it.distanceTo(gpsPosition()) }
myAggregateFunction(myMetric)
}
myAggregateDevice.cycle() // Runs the cycle
val myAggregateDevice = Collektive(myId, myMailbox) { 1 }
val roundResult: Int = myAggregateDevice.cycle()
We prepared a template project for Collektive on Android: https://github.com/Collektive/collektive-example-android-bt
@RequiresPermission(allOf = [Manifest.permission.BLUETOOTH_ADVERTISE, Manifest.permission.BLUETOOTH_SCAN])
private suspend fun collektiveProgram(): Collektive<Uuid, Set<Uuid>> {
val mailbox =
MqttMailbox(deviceId, "broker.hivemq.com", dispatcher = dispatcher, context = getApplication())
return Collektive(deviceId, mailbox) {
neighboring(localId).neighbors.toSet()
}
}
Use this example to create your Android Aggregate applications!