Skip to content
/ pipe Public

Android library for building pipelines for executing background tasks

License

Notifications You must be signed in to change notification settings

udeyrishi/pipe

Repository files navigation


Build Status License Download

Pipe is an Android library for building pipelines for executing background tasks.

  • Declarative: Uses a Kotlin DSL or builder to declare pipeline schematics.
  • Powerful: Supports both simple steps and complex synchronization structures like barriers and aggregators.
  • Resilient: Pipe has resiliency baked-in. Has support for step retries in case of failures.
  • Android arch-friendly: The API is designed with the Android architecture components in mind, making integration into your apps easy.

Example

Consider a use case where we want to construct the following pipeline:

  1. Wait for a UI signal to start the pipeline (say, button click)
  2. Ensure that there is internet connectivity
  3. Download an image from the given URL
  4. Rotate the downloaded image by 90 degrees
  5. Scale the image to a 400px x 400px size
  6. Overdraw the scaled image on top if its sibling

Pipe uses a Kotlin DSL to declare pipeline schematics. We can express the above pipeline as:

data class ImagePipelineMember(val url: String? = null, val image: Bitmap? = null)
val JOBS_REPO = InMemoryRepository<Job<ImagePipelineMember>>()
val LOGGER = AndroidLogger("Pipe")

fun makePipeline() = buildPipeline<ImagePipelineMember> {
    setRepository(JOBS_REPO)
    setLogger(LOGGER)

    addManualBarrier("start_barrier")
    
    addManualBarrier("internet_barrier")

    addStep("download", attempts = 4) {
        ImagePipelineMember(image = downloadImage(it.url!!))
    }

    addStep("rotate") {
        ImagePipelineMember(image = rotateBitmap(it.image!!, 90.0f))
    }

    addStep("scale") {
        ImagePipelineMember(image = scale(it.image!!, 400, 400))
    }

    addCountedBarrier("overdraw", capacity = Int.MAX_VALUE) { allMembers ->
        allMembers.mapIndexed { index, member ->
            val siblingIndex = if (index == 0) allMembers.lastIndex else index - 1
            val resultingImage = overdraw(member.image!!, allMembers[siblingIndex].image!!)
            ImagePipelineMember(image = resultingImage)
        }
    }
}

We can then use this factory function to create an instance of the pipeline:

val pipeline = makePipeline()

And then schedule jobs into it:

const val JOB_TAG = "IMAGE_TRANSFORM_JOBS"

fun Pipeline<ImagePipelineMember>.createImageJobs(imageUrls: List<String>): List<Job<ImagePipelineMember>> {
    countedBarriers.forEach {
        it.setCapacity(imageUrls.size)
    }

    return imageUrls.map { url ->
        push(ImagePipelineMember(url = url), tag = JOB_TAG)
    }
}

We can subscribe to these jobs' state changes in our activities/fragments via LiveData:

val jobs = pipeline.createImageJobs(urls)
jobs.forEach { job ->
    job.state.observe(this, Observer {
        // Update UI in response to state changes
    })
}

See the state machine for the details.

We can also fetch any ongoing jobs from the repository (e.g., if the fragment is re-created):

JOBS_REPO[JOB_TAG].forEach { (job, _, _) ->
    // Perform actions on the jobs, such as interrupting them, unsubscribing from them, etc.
}

To lift the internet_barrier automatically when we have internet connectivity, we can use one of the BarrierExtensions:

pipeline.manualBarriers[1].liftWhenHasInternet(App.INSTANCE)

And finally, we can start the pipeline when a UI button is clicked:

someButton.setOnClickListener {
    pipeline.manualBarriers[0].lift()
}

State machine

The progress of a job is encoded via a state machine:

Learn more

To see a full Android app example using the above pipeline, see the sample app. Or see the javadocs here.