One way to tackle these challenges is through systems that self-organise.
There are properties that we cannot renounce:
We have a tool that is natively modular and natively composable:
What if we find a set of abstractions compatible with functional programming that allow us to build self-organising systems?
rep
nbr
if
The precursor of aggregate programming, based on the idea of amorphous computing.
(def gradient (src)
(letfed ((n infinity (mux src 0 (min-hood (+ (nbr n) (nbr-range))))))
n))
The first higher-order aggregate programming language.
def distanceTo(source) {
share (distance <- POSITIVE_INFINITY) {
mux (source) {
0
} else {
foldMin(POSITIVE_INFINITY, distance + self.nbrRange())
}
}
}
The first internal DSL implementing aggregate programming.
def distanceTo(source: Boolean): Double =
rep(Double.PositiveInfinity) (d => {
mux (source) { 0.0 } {
foldHoodPlus(Double.PositiveInfinity)(Math.min) {
nbr(d) + nbrRange
}
}
})
The first native (C++14) implementation of aggregate programming.
DEF() double abf(ARGS, bool source) { CODE
return nbr(CALL, INF, [&] (field<double> d) {
double v = source ? 0.0 : INF;
return min_hood(CALL, d + node.nbr_dist(), v);
});
}
Properties | Protelis | ScaFi | FCPP |
---|---|---|---|
JVM compatibility | |||
Android compatibility | ~ | ~ | |
JS compatibility | |||
Native compatibility | ~ | ||
iOS compatibility | |||
Strictly typed | |||
Transparent alignment | |||
Complete alignment | |||
Exchange support | ~ | ||
Reified fields |
Internal DSLs have several desirable features:
but they also have some critical issues:
structurally-equal programs can communicate
branching must break alignment
Different frameworks make different choices:
foldHood
)
but there is no way to have a Field
-typed objectaggregate
function), exposing a low-level mechanism and compromising performanceCollektive answers the question:
what if we modify the host language compiler to align code strongly and transparently?
Collektive used Kotlin to do so as:
Properties | Protelis | ScaFi | FCPP | Collektive |
---|---|---|---|---|
JVM compatibility | ||||
Android compatibility | ~ | ~ | ||
JS compatibility | ||||
Native compatibility | ||||
iOS compatibility | ||||
Strictly typed | ||||
Transparent alignment | ||||
Complete alignment | ||||
Exchange support | ~ | |||
Reified fields |
fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean) = share(Double.POSITIVE_INFINITY) { distances ->
val throughNeighbor = distances.minValue(Double.POSITIVE_INFINITY) + 1
when {
source -> 0.0
else -> throughNeighbor
}
}
throughNeighbor
inside the when
?Int
s or Long
s instead of Double
s?fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
share(Double.POSITIVE_INFINITY) { distances ->
val throughNeighbor = (distances + metric).minValue(Double.POSITIVE_INFINITY)
when {
source -> 0.0
else -> throughNeighbor
}
}
// Utility class, we can use Kotlin data types freely
data class DistanceValue<T>(val distance: Double, val value: T) : Comparable<DistanceValue<T>> {
operator fun plus(distance: Double): DistanceValue<T> = DistanceValue(this.distance + distance, value)
override fun compareTo(other: DistanceValue<T>): Int = distance.compareTo(other.distance)
override fun toString(): String = "$value@$distance"
}
// Simple broadcast implementation
inline fun <ID: Any, reified T> Aggregate<ID>.broadcast(source: Boolean, value: T): T {
val top = DistanceValue(infinity, value)
val myDistanceValue = share(top) { distancesToValues ->
val closest = distancesToValues.minValue() ?: top
if (source) DistanceValue(0.0, value) else closest + 1.0
}
return myDistanceValue.value
}
fun <ID: Any> Aggregate<ID>.distance(source: Boolean, destination: Boolean, metric: Field<ID, Double>) = broadcast(source, distanceTo(destination, metric))
fun <ID: Any> Aggregate<ID>.channel(source: Boolean, destination: Boolean, width: Double, metric: Field<ID, Double>): Boolean =
distanceTo(source, metric) + distanceTo(destination, metric) < distance(source, destination, metric) + width
// Short-circuiting boolean operations work as branches!
fun <ID: Any> Aggregate<ID>.channelAroundObstacles(isObstacle: Boolean, source: Boolean, destination: Boolean, width: Double, metric: Field<ID, Double>): Boolean =
!isObstacle && channel(source, destination, width, metric)
Field
Aggregate
PurelyLocal
project(Field)
Collektive introduces the following important abstractions:
Field
: a view of a value, enclosing is local value and the neighboring values
map
and combined with alignedMap
Map
s, using toMap
or excludeSelf
fold
and reduce
operationsAggregate
: the context of aggregate operations. Provides (internally or through extension functions)
alignedOn(pivot: Any?, () -> Result): Result
exchanging(initial: Shared, body: YieldingScope<Field<ID, Shared>, Returned>): Field<ID, Shared>
exchange
that shares a value and can return arbitrary valuesalignedOn
could be rewritten in terms of exchanging
, but it would be inefficientexchange(initial: Shared, body: (Field<ID, Shared>) -> Field<ID, Shared>): Field<ID, Shared>
exchange
shares a value, computes over the neighborhood view of such value,
and returns a Field
whose contents are sent back to every neighborneighboring(local: Shared): Field<ID, Shared>
mapNeighborhood(local: (ID) -> T): Field<ID, T>
share(initial: Shared, body: (Field<ID, Shared>) -> Shared): Shared
exchange
that sends the same value to all neighborssharing(initial: Shared, body: YieldingScope<Field<ID, Shared>, Returned>) -> YieldingResult<Shared, Returned>): Returned
share
that can return arbitrary valuesevolve(initial: Stored, transform: (Stored) -> Stored): Stored
initial
and computing transform
at each roundevolving(initial: Stored, transform: YieldingScope<Stored, Returned>): Returned
evolve
, returning a Result
Collektive is designed for the aggregate code to meld into Kotlin natively. Names have been selected favoring a Kotlin-friendly syntax instead of the literature terms.
Literature | Collektive |
---|---|
rep |
evolve |
nbr |
neighboring |
share |
share |
exchange |
exchange |
All computations use Kotlin’s native types.
@Serializable
kotlinx.serialization
to serialize the dataThere is a missing item in the previous table:
Literature | Collektive |
---|---|
rep |
evolve |
nbr |
neighboring |
share |
share |
exchange |
exchange |
if |
???? |
Branching in aggregate programming is domain segmentation: operations inside a branch are aligned only with the devices that are in the same branch.
// Device with ID 0
fun Aggregate<Int>.myAlignmentTest(): Unit {
val myField = mapNeighborhood { 1 }
println(myField) // φ(localId = 0, localValue = 1), neighbors = { 1 -> 1, 2 -> 1, 3 -> 1 }
when (localId % 2) {
1 -> println(myField) // Branch not taken
else -> {
println(mapNeighborhood { 2 }) // φ(localId = 0, localValue = 2), neighbors = { 2 -> 2 }
println(myField) // φ(localId = 0, localValue = 1), neighbors = { 2 -> 1 }
}
}
}
If we were okay with dealing with alignment manually, we could have used a “plain” DSL.
This is what a Bellman-Ford gradient would have looked like:
fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
alignedOn("Aggregate.distanceTo(Boolean)") { // We need to manually align to avoid clashing with other functions with a similar structure
share(Double.POSITIVE_INFINITY) { distances ->
alignedOn("share(Boolean)") { // We need to manually align again
val actualMetrics = project(metric) // The field comes from another context, hence needs projection
val throughNeighbor = distances.alignedMapValues(actualMetrics, Double::plus)
when {
source -> alignedOn(true) { 0.0 }
else -> alignedOn(false) { throughNeighbor } // We cannot run the computation here or the source will never send data!
}
}
}
}
Manually handling alignment and projection is akin to manual memory management in pure C: exposes a low-level mechanism and is very error-prone.
The Kotlin compiler is designed to be extended via compiler plugins.
@Serializable
generates a serializer for that class under the hood.serialize
and deserialize
are generated at compile time, and appear in the IDEThe compiler plugin automatically injects calls to the alignment and projection functions where needed, so that the designer can write in “normal Kotlin”, letting the magic happen in the background:
fun <ID: Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
share(Double.POSITIVE_INFINITY) { distances ->
val throughNeighbor = distances.alignedMapValues(metric, Double::plus)
if (source) 0.0 else throughNeighbor
}
}
We can write our compiler plugin to statically analyze the code and provide hints. For instance, consider:
fun <ID : Any> Aggregate<ID>.distanceTo(source: Boolean, metric: Field<ID, Double>) =
evolve(Double.POSITIVE_INFINITY) {
val throughNeighbor = (neighboring(it) + metric).minValue(base = Double.POSITIVE_INFINITY)
if (source) 0.0 else throughNeighbor
}
Kotlin is typically compiled with Gradle, a build system that supports Kotlin natively.
The Collektive Kotlin compiler plugin needs to be applied to the Kotlin compilation process for aggregate code to be generated.
The standard way is to build a Gradle plugin that under the hood applies the Kotlin compiler plugin.
Declaring the plugin in the plugins
block applies Collektive to the project:
plugins {
kotlin("jvm") // Or kotlin("multiplatform")
id("it.unibo.collektive.collektive-plugin") version "<collektive version>"
}
Once the plugin is applied, we need to import the Collektive DSL to write aggregate code.
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
}
In multiplatform projects:
kotlin {
sourceSets {
val commonMain by getting {
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
}
}
}
}
You are ready! Code using Aggregate contexts will get aligned and projected automatically by the compiler plugin.
The DSL module contains the bare minimum to write aggregate code.
When creating richer applications, a standard library is needed to provide common operations.
The standard library in collektive is built on top of the DSL module and provides:
The standard library can be imported in the same way as the DSL module:
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
implementation("it.unibo.collektive:collektive-stdlib:<collektive version>")
}
In multiplatform projects:
kotlin {
sourceSets {
val commonMain by getting {
dependencies {
implementation("it.unibo.collektive:collektive-dsl:<collektive version>")
implementation("it.unibo.collektive:collektive-stdlib:<collektive version>")
}
}
}
}
Collektivize (logo is temporary) is a Gradle plugin that generates “fielded” methods automatically.
When using aggregate programming, we would like to manipulate fields and other data structures as if they were “scalars”.
val x: Field<*, Double> = TODO()
val y: Field<*, Double> = TODO()
x.alignedMapValues(y) { a, b -> a + b } // Verbose!
x + y // Shorter and more readable!
x.mapValues { it * 3 } // Verbose!
x * 3 // Shorter and more readable!
neighboring(File(TODO()).readText()).map { it.lines().first() } // Verbose!
neighboring(File(TODO()).readText()).first()
Collektivize runs through existing Kotlin code and generates the “fielded” methods automatically. The project is still experimental, but we currently use it to generate fielded methods for primitives.
When developing classic software systems, we use tests and debuggers to guide us through the development process and to verify the correctness of our code.
Typically:
Simulation is a fundamental part of the aggregate software development process, and it is used in several ways:
There are two macro-categories of simulators for aggregate programming:
Often, languages have both, with the internal simulator often used for testing and, at most, prototypation.
https://alchemistsimulator.github.io/
Alchemist is a general-purpose simulator for networked systems.
Your code will be portable across simulators and real-world devices with no changes!
A playground has been prepared at https://github.com/DanySK/collektive-exercises. It includes:
Two files in the src/main/kotlin
folder in the collektive.exercises
package:
Playground.kt
Entrypoint.kt
We can play with it!
MAX_SEEDS=1 ./gradlew runAllGraphic
./gradlew runAllGraphic
Message
(pre-implemented) carries a payload. Can be
serialized or
in-memory (for simulations).OutboundEnvelope
(pre-implemented),
can prepare messages for delivery to a specific neighborMailbox
– platform-specific implementation!
NeighborsData
with messages that are currently validMessage
for a specific neighborNeighborsData
– typically implemented on the fly as an anonymous object
inside the Mailbox
implementation
Mailbox
Mailbox
, a local identifier, (optionally) a SerializationFactory
,
and the programval myAggregateDevice = Collektive(myId, myMailbox) { // Here the aggregate context is available!
val myMetric = neighboring(gpsPosition()).mapValues { it.distanceTo(gpsPosition()) }
myAggregateFunction(myMetric)
}
myAggregateDevice.cycle() // Runs the cycle
val myAggregateDevice = Collektive(myId, myMailbox) { 1 }
val roundResult: Int = myAggregateDevice.cycle()
We prepared a template project for Collektive on Android: https://github.com/Collektive/collektive-example-android-bt
@RequiresPermission(allOf = [Manifest.permission.BLUETOOTH_ADVERTISE, Manifest.permission.BLUETOOTH_SCAN])
private suspend fun collektiveProgram(): Collektive<Uuid, Set<Uuid>> {
val mailbox =
MqttMailbox(deviceId, "broker.hivemq.com", dispatcher = dispatcher, context = getApplication())
return Collektive(deviceId, mailbox) {
neighboring(localId).neighbors.toSet()
}
}
Use this example to create your Android Aggregate applications!