Guides:our thoughts

Maintaining complexity using Laravel pipelines

Chris Rossi
Chris Rossi

Senior Developer

Checking read time...
Maintaining complexity using Laravel pipelines

Pipelines are arguably one of Laravel's most overlooked features, but something every Laravel developer should keep in their tool belt.

The Laravel docs only include a brief section on this feature, but this is more than enough to get started.

Pipelines are actually used under the hood by Laravel Middleware, so if you have written your own middleware you will already have some concept of the way pipelines work.

In this article, we show how we use this powerful feature to wrangle complex business processes and make our codebases significantly easier to maintain.

Getting started with pipelines

The Laravel Pipeline facade allows you to pass a value through multiple "pipes", which can then read and modify the value as needed at each step. Each of these steps pass their output into the next step, allowing complex logic to be broken down into simple and maintainable classes.

As an example, imagine our codebase is for a timesheet management platform, which multiple employers can use to manage their time-and-attendance and payroll functions. We have a Timesheet model, and this represents an employee's shift at work, for which they expect to get paid.

Each of these timesheets may require a review, to ensure the employee is getting paid the correct amount for their rostered hours, with some adjustments for if they finished early or later than rostered.

To automate the payroll process as much as possible for the employers, we will implement a pipeline that runs a series of checks on the timesheet when the employee finishes their shift.

If any of the checks fail, we need them to be recorded against the flags attribute. This is an array of strings with human-readable language that will be displayed to the payroll department, so they can manually review the flagged timesheets prior to running payroll.

We could have the following checks defined as individual classes:

  1. LongDurationsCheck: Adds a flag if the total duration exceeds a predefined value (for example, 12 hours).
  2. MandatoryReviewCheck: Adds a flag if the employer has enforced manual reviews of all timesheets.
  3. AutomaticallyFinishedCheck: Adds a flag if the employee forgot to sign out from their shift, so the system automatically finished the shift for them.

... and so on.

The pipeline

Our pipeline could look something like this:

2 ->through([
3 LongDurationsCheck::class,
4 MandatoryReviewCheck::class,
5 AutomaticallyFinishedCheck::class,
6 ])
7 ->then(function (Timesheet $timesheet) {
8 // Save the changes
9 $timesheet->save();
11 // TODO: We could dispatch a notification/email here to alert the employer
12 // that there is a Timesheet requiring review
13 });

When the employee finishes their shift, this code could be called from the controller, or more likely run in a queued job dispatched from the controller, as some of the checks may take a bit of processing time.

Perhaps better still would be to add a $timesheet->runChecks() method on the Timesheet model directly, and that dispatches a job to run the checks on the queue.

Individual pipes

The pipe for checking for long durations could look like this:

1class LongDurationsCheck
3 const MAX_DURATION_IN_SECONDS = 12 * 60 * 60; // 12 hours (in seconds)
5 public function handle(Timesheet $timesheet, Closure $next): void
6 {
7 if ($timesheet->duration >= self::MAX_DURATION_IN_SECONDS) {
8 $timesheet->addFlag('Long shift');
9 }
11 $next($timesheet);
12 }

The pipe for mandatory reviews should only add the flag if the employer has the setting enabled:

1class MandatoryReviewCheck
3 public function handle(Timesheet $timesheet, Closure $next): void
4 {
5 if ($timesheet->employer->requires_manual_review) {
6 $timesheet->addFlag('Requires manual review');
7 }
9 $next($timesheet);
10 }

Using pipelines for advanced collection filtering

In one of our more complex scenarios, we need to take the details for a new job position just posted by an employer, and then find all of the potential employees, which we call candidates, that meet all of the criteria.

This requires more than 10 different checks to be performed in order to determine if each candidate is eligible for that specific job. To make this process more manageable, we create and use a custom FilterContext class to hold all the required data for the filtering.

1class FilterContext
3 private Collection $candidates;
5 public function __construct(
6 private Job $job,
7 ) {
8 $this->candidates = collect();
9 }
11 public function getJob(): Job
12 {
13 return $this->job;
14 }
16 public function getCandidates(): Collection
17 {
18 return $this->candidates;
19 }
21 public function setCandidates(Collection $candidates): self
22 {
23 $this->candidates = $candidates;
25 return $this;
26 }

Any time we need to get the list of available candidates for a given job, we create an instance of the FilterContext and pass it through our pipeline:

1// Create a job with a start/finish time, a location, and several mandatory skills.
2// You can imagine that this information came in from a HTTP request, and the Job was created inside a controller.
3$job = Job::create([
4 'title' => 'Lead Forklift Driver',
5 'start' => '2024-05-01 09:00:00',
6 'finish' => '2024-05-01 17:00:00',
7 'location_id' => 12345,
8 'skills' => [
9 // First aid training and forklift driving are mandatory for this job.
10 'first-aid',
11 'forklift',
12 ],
15$context = new FilterContext($job);
17$candidates = Pipeline::send($context)
18 ->through([
19 // At this point, there are no candidates in our context, so the StartingFilter will populate it with every
20 // possible available candidate.
21 StartingFilter::class,
23 // At this point, the context now has every possible available candidate, so each of the following filters just
24 // remove candidates based on more refined eligibility logic.
25 MatchingSkillsFilter::class,
26 ProximityToLocationFilter::class,
27 ExceededAllowedHoursFilter::class,
29 // Additional filters go here...
30 ])
31 ->via('filter')
32 ->then(function (FilterContext $context) {
33 return $context->getCandidates();
34 });

In this case, our first filter StartingFilter receives a FilterContext instance which contains no candidates.

It performs multiple queries using SQL to build up a collection of candidates which are potentially eligible for the job:

1class StartingFilter
3 public function filter(FilterContext $context, Closure $next): void
4 {
5 // Fetch a collection of matching candidates for this job.
6 $candidates = $this->getCandidatesForJob($context->getJob());
8 // Store them in the context.
9 $context->setCandidates($candidates);
11 // Pass the context to the next filter in the pipeline.
12 $next($context);
13 }
15 private function getCandidatesForJob(Job $job): Collection
16 {
17 // Here we run a query to find any candidates which meet the basic requirements
18 // for the job, such as being active within the system, and not already being rostered onto another job at the
19 // same time.
21 // For illustrative purposes, we are just fetching 25 candidates from the database, but you would write your
22 // own logic here.
23 return Candidate::query()
24 ->limit(25)
25 ->get();
26 }

The next filter, MatchingSkillsFilter, now needs to further reduce the list of Candidates down to only those which have the skills required for the job:

1class MatchingSkillsFilter
3 public function filter(FilterContext $context, Closure $next): void
4 {
5 // Run the filtering logic defined in this class to reduce our collection of candidates to just those that have
6 // matching skills.
7 $candidates = $this->filterCandidatesBySkills(
8 $context->getCandidates(),
9 $context->getJob()->skills,
10 );
12 // Store them in the context.
13 $context->setCandidates($candidates);
15 // Pass the context to the next filter in the pipeline.
16 $next($context);
17 }
19 private function filterCandidatesBySkills(Collection $candidates, array $skills): Collection
20 {
21 if (empty($skills)) {
22 // This job has no required skills.
23 return $candidates;
24 }
26 return $candidates
27 ->filter(function(Candidate $candidate) use ($skills) {
28 // Here we filter the collection to only candidates which have ALL of the $skills.
29 })
30 ->values();
31 }

For some of these filters we can apply logic conditionally. An example is where, if the state where the job is located, has different employment rules from the rest of the country, we can conditionally run or not run the logic related to those employment rules.

The final return value of the pipeline is a collection containing only candidates that are have passed every eligibility check.

1$context = new FilterContext($job);
3$candidates = Pipeline::send($context)
4 ->through([
5 // At this point, there are no candidates in our context, so the StartingFilter will populate it with every
6 // possible available candidate.
7 StartingFilter::class,
9 // At this point, the context now has every possible available candidate, so each of the following filters just
10 // remove candidates based on more refined eligibility logic.
11 MatchingSkillsFilter::class,
12 ProximityToLocationFilter::class,
13 ExceededAllowedHoursFilter::class,
15 // Additional filters go here...
16 ])
17 ->via('filter')
18 ->then(function (FilterContext $context) {
19 return $context->getCandidates();
20 });

Keeping these filters as isolated classes means we can easily add/remove/skip logic as the requirements change.

We can also implement robust test coverage by generating sample candidates, and confirming that only the eligible ones are included in the result of each filter.

Improvements and considerations

  • Consider reducing the size of your data

    Note that in the above example our FilterContext holds a collection of Candidate models, but in reality we use a separate DTO class called AvailableCandidate. We do this to ensure that we only keep in memory the minimum data required for filtering within the collection, such as the id, name, address, and status.

    This allows us to filter thousands of candidates in real-time without hitting against PHP memory limits.

  • Add an interface and optionally an abstract class

    This will ensure consistency and enforce that all of the filter classes implement the same interface, and/or extend an AbstractFilter class.

  • Do as much logic in the database as possible

    In our use case, the filters need to query data across third-party APIs and remote database servers, so it wasn't possible to run all of the filtering logic in one giant query.

    If all your filtering data is located within the same database schema, you could have your filters build a single database query (each would conditionally add joins/subqueries as needed) through the pipeline. This query can then be executed at the end of your pipeline to return your filtered results with a single query.

    For more on this, take a look at Eloquent Performance Patterns by Jonathan Reinink.

Making our pipeline even more useful

Sometimes a specific candidate can not be found within the filtered results, and an employer may want to determine why they were filtered out.

To achieve this we can add a diagnose() method to all of our filter classes, in addition to the filter() method.

1class OnboardingFilter
3 public function filter(FilterContext $context, Closure $next): void
4 {
5 $candidates = $this->filterCandidates($context);
7 $context->setCandidates($candidates);
9 $next($context);
10 }
12 public function diagnose(FilterContext $context, Closure $next): void
13 {
14 if ($this->filterCandidates($context)->isEmpty()) {
15 $context->addDiagnosis('The candidate must have completed their onboarding.');
16 }
18 $next($context);
19 }
21 private function filterCandidates(FilterContext $context): Collection
22 {
23 return $context
24 ->getCandidates()
25 ->filter(function (Candidate $candidate) {
26 return $candidate->is_onboarding_completed;
27 })
28 ->values();
29 }

In the above example, our FilterContext contains a single Candidate, which is the one we want to diagnose.

We will run them through the filterCandidates() method, which will either return the candidate or filter them out.

Our diagnose() method just adds a diagnosis to the context containing the reason why the candidate was filtered out.

This collection of reasons can then be displayed to the user.

1$context = new FilterContext($job);
3// Add the candidate to the context.
6$diagnosis = Pipeline::send($context)
7 ->through([
8 StartingFilter::class,
9 MatchingSkillsFilter::class,
10 ProximityToLocationFilter::class,
11 ExceededAllowedHoursFilter::class,
13 // Additional filters go here...
14 ])
15 ->via('diagnose') // Note that this time we call the diagnose method, not the filter method.
16 ->then(function (FilterContext $context) {
17 return $context->getDiagnosis();
18 });

This only requires minor changes to our filter classes so that they can run through either pipeline.

Taking it further

  • Rendering a checklist for the employer

    We could have every filter add a diagnosis with a boolean value, based on whether the candidate was filtered out, so we receive a collection of all filters as the result:

    2 "The candidate must have completed their onboarding.": true,
    3 "The candidate must have a valid police check.": false,
    4 "The candidate must have a valid working visa.": true,
    5 "The candidate must have a forklift license.": true

    This could easily be rendered as a checkbox list to make it clear to the employer which requirements the candidate did not pass.

  • Conditionally applying filters

    We could add an isApplicable() method to our filter classes to check whether the filter applies, based on employer settings or the job requirements, so that it isn't included in the diagnosis if it's not relevant.

  • Tracking totals for each filter

    For analytical purposes, we could also track the number of candidates still remaining at the end of each filter as it passes through the pipeline, and then use this to output statistics at the end.

The possibilities with pipelines are endless!

Want to receive updates from us?

Sign up for our newsletter to stay up to date with Laravel, Inertia, and Expo development.