Composer lets you organize and perform multiple interdependent asynchronous input / output tasks such as web service calls, database read / write, and file I / O along with support for parallelism java.util.concurrent APIs. It is compatible with Java 8 and above on all JVM-based platforms, including Android.

Here is an example of how you can use Composer to create a chain of tasks. Imagine a scenario where you want to get the related Twitter account details for your app user, get various types of Twitter data for that user, view it in the app UI, and then track the event in your analytics database. All of these tasks are asynchronous (except for updating the user interface) and interdependent.

Composer.startWith(currentUser.getUserId(), err -> logger.error("Error executing tasks", err)) .thenExecute(userId -> { return accountService.getTwitterAccountDetails(userId); }) .thenContinueIf(response -> response.status.isOk()) .thenExecuteTogether( Results::new, response -> twitterService.getTweets(response.username), response -> twitterService.getMedia(response.username), response -> twitterService.getFollowers(response.username), ) .thenWaitFor(results -> { refreshUI(results); }) .thenExecute(() -> { analyticsDb.trackEvent("get_twitter_details"); }) .thenFinish();

Please note that Composer does not aim to provide an extensible API for managing asynchronous tasks. Instead, it seeks to provide a minimal, easy-to-use API that can be useful in scenarios where the interdependence between such tasks forces you to write code to manage status, check conditions, or handle errors. Most client-side mobile / web applications and backend services that communicate with each other require an extensible framework in which interdependent asynchronous tasks can be glued together. Composer is for specific use cases only and may not be suitable for all use cases, especially if an asynchronous Extensibe framework is critical to the application.

For details on how to use it, see the Getting Started section.

contents

To install

repositories { jcenter()
}
dependencies { implementation 'com.krupalshah:composer:2.0.1'
}
<dependency> <groupId>com.krupalshah</groupId> <artifactId>composer</artifactId> <version>2.0.1</version> <type>pom</type>
</dependency>
<dependency org='com.krupalshah' name='composer' rev='2.0.1'> <artifact name='composer' ext='pom' />
</dependency>

Development snapshots are available in JFrog artifactory.

Started

overview

The API consists of an interface Composable and its implementation Composer. The implementation serves as an entry point and returns Composable at each execution step until the chaining is broken.

To use startWith() to create your first one Composable As below:

Composer.startWith(someInputOrTask, err -> logger.error("Error executing tasks", err))

The first parameter is only required if you want to pass a previously known value as input or a task that may produce the same value.
The second parameter ErrorStream receives all errors during execution.

If you don’t have any known inputs or tasks, you can just create your first one Composable by just providing one ErrorStream As below:

Composer.startWith(err -> logger.error("Error executing tasks", err))

To use thenFinish() to end further chaining and complete the expected execution of the task. Between startWith and thenFinish, chain your tasks according to their dependencies.

Concatenation tasks

A Task can be of the type

  • SimpleTask when it takes no input and returns no output.
  • ConsumingTask<Input> when it takes input but returns no output.
  • ProducingTask<Output> when it takes no input but returns output.
  • TransformingTask<Input,Output> when it takes an input and converts it to an output.

Imagine a very simple scenario where some independent data is to be obtained through a web service from a remote data source, converted to CSV format, written to a file, and then an email is triggered.

Given this information, a chain can be written like this:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err)) .thenExecute(response -> { return converter.convertToCsv(response.data); }) .thenExecute(csv -> { writer.writeCsvFile(csv); }) .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); }) .thenFinish();

Every step returns Composablethat can be removed and glued if necessary:

Composable<String> myComposable = Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err)) .thenExecute(response -> { return converter.convertToCsv(response.data); })
doSomething();
doSomethingMore();String csv = myComposable.thenExecute(csv -> { writer.writeCsvFile(csv); }) .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); }) .thenFinish();

Please note that chained tasks are executed asynchronously by default. So there is no guarantee of this in the example above doSomething() will be executed after the data has been converted to csv. If something in between needs to be executed synchronously, chain it as indicated in the section Executing tasks synchronously.

Perform tasks at the same time

Different method variants have been provided to perform several tasks at the same time. All you need to do is specify a collection of tasks to run in parallel. The order of execution is never guaranteed.

Consider a minor change in the previous scenario where the converted CSV is kept in the database along with a file.

In this case, both tasks can be carried out at the same time thenExecuteTogether() Variants as below:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err)) .thenExecute(response -> { return converter.convertToCsv(response.data); }) .thenExecuteTogether( csv -> writer.writeCsvFile(csv), db.storeCsv(csv) //both tasks will be executed concurrently ) .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); }) .thenFinish();
  • Collect expenses from multiple tasks

In cases where a task produces output, concurrent variants can run any number of tasks with the same output type, or a maximum of three tasks with different output types.

Such tasks require a Collector as an additional parameter. A Collector collects results from multiple producer tasks and returns something that may contain those results.

Imagine a change in the first scenario where you want to convert data to multiple formats like CSV, XML, and Yaml. In this case, we can use simultaneous method variants and collect results as follows:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err)) .thenExecuteTogether( (response, csv, xml, yaml) -> new ConvertedData(csv, xml, yaml), //ConvertedData is a pojo returned from collector to hold outputs from concurrently executing tasks response -> converter.convertToCsv(response.data), response -> converter.convertToXml(response.data), response -> converter.convertToYaml(response.data) ) .thenExecuteTogether( convertedData -> writer.writeCsvFile(convertedData.csv), convertedData -> writer.writeXmlFile(convertedData.xml), convertedData -> writer.writeYamlFile(convertedData.yaml) ) .thenExecute(() -> mailer.sendEmail("All Tasks Completed")) .thenFinish();
  • Iteration over previous results

In those cases where an upstream output contains a collection and you want to do a task for each value in that collection at the same time, use thenExecuteForEach() Variants.

Imagine a scenario where you need to get some posts from a service and then need to get comments for each post in the response. In this case, you need to expand the upstream response to a collection of posts, provide the concurrent task for each post, and finally collect the comments grouped by posts like the following:

Composer.startWith(() -> service.fetchPosts(), err -> logger.error("Error executing tasks", err)) .thenExecuteForEach( response -> response.getPosts(), //provide a collection to iterate over post -> service.fetchComments(post), //this task will be applied for each post in the list (response, postAndComments) -> new GroupedData(postAndComments) //collector will receive results as pairs of <Post,List<Comment>> assuming that the service is retuning the list of comments for a specific post ) .thenExecute(data -> { db.insertPostsAndComments(data); }) .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); }) .thenFinish();

Review the task output

A task issue must be non-null. Any task in a chain that receives null as input, further execution is interrupted.

To use thenContinueIf() to validate the task output before using it as input for dependent tasks. If the specified condition returns false, you will get a ComposerException on the ErrorStream provided. Further execution is discontinued and downstream consumption tasks are received null as a result.

In the first scenario, suppose you want to check the status and size of the data in response before converting to csv:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err)) .thenContinueIf(response -> response.status.isOk() && !response.data.isEmpty()) //this will discontinue further execution if the specified condition returns false. .thenExecute(response -> { return converter.convertToCsv(response.data); }) .thenExecute(csv -> { writer.writeCsvFile(csv); }) .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); }) .thenFinish();

Execute the task synchronously

By default, all tasks are performed asynchronously. If you wanted to do something synchronously on the same thread, the method was called (in most cases – the main thread of the application). thenWaitFor Variants can be used as follows:

Composer.startWith(() -> produceSomething(), err -> logger.error("Error executing tasks", err)) .thenWaitFor(data -> { showOnUI(data); }) .thenFinish();

Provision of a custom executor service

Finally, Composer uses a ExecutorService This creates a cached thread pool internally. If you want to provide your custom executor service, pass it as the third parameter of startWith() as below (not recommended unless required):

Composer.startWith(() -> produceSomething(), err -> logger.error("Error executing tasks", err), customExecutorService)

changelog

  • 2.0.1

    • Minor changes to avoid compiler warnings.
  • 2.0.0

    • This version contains important changes. One of the most important API refactorings is renaming all of the methods to reduce verbosity.
    • Acquisition parameters in then..Together Variants have been replaced by varargs.
  • 1.0.1

    • Fixed a bug where a ErrorStream did not transmit errors synchronously.

License

Copyright 2020 Krupal Shah
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

LEAVE A REPLY

Please enter your comment!
Please enter your name here