Skip to content

Tutorial 5: Pipeline branching & shm

Jin Heo edited this page Dec 16, 2023 · 4 revisions

How to run

$ cd FlexPipe/examples
$ bash 5_local_shm_pipeline.sh

In this tutorial, by using the given SourceKernel and SinkKernel B/NB (with blocking/non-blocking input), we create a pipeline. The pipeline topology looks below.

|Source|[o1, B]--------->[i1, B]|Sink|
|      |[o2, B]---shm┐                  // o2 is dynamically branched from o1 by the kernel user.
                     └-->[i1, NB]|Sink| // This sink in another process.

where B: blocking, NB: non-blocking

Port Registration by Kernel Developer

Using the same kernels, it is the same as the previous tutorial.

Port Activation by Kernel User

The given source kernel is implemented only with a single output port. FlexPipe allows the kernel user to branch the registered port dynamically; the single registered port can be branched into multiple ports of different connection types and semantics.

/* examples/5_local_shm_pipeline_source.cc */
int main()
{
  raft::map pipeline;

  /* 1. Create instances of the given kernels */
  SourceKernel *sourceKernel = new SourceKernel("sourceKernel");
  SinkKernelNB *sinkKernelNB = new SinkKernelNB("sinkKernel1");

  /* 2. Activate ports that are registered by the kernel developers */
  sourceKernel->portManager.activateOutPortAsLocal<MsgType>("o1", flexpipe::PortDependency::BLOCKING);
  sinkKernelNB->portManager.activateInPortAsLocal<MsgType>("i1");

  // The port which is registered by the kernel developer can be branched without changing the kernel codes by a kernel user when the kernel is used in a pipeline.
  sourceKernel->portManager.duplicateOutPortAsShm<MsgType>("o1", "o2", "5_local_shm", 1, MSG_SIZE*2, flexpipe::PortDependency::BLOCKING);

  /* 3. Link kernels that are connected as flexpipe */
  pipeline.link(sourceKernel, "o1", sinkKernelNB, "i1", 1);

  /* 4. Run the pipeline */
  pipeline.exe();

  return 0;
}

In this example, the registered output port "o1" is activated for local pipeline connection, and it is also branched into another output port "o2". It is important that branching can be flexibly done by a user without any code changes for different connection type and blocking/non-blocking semantics.

/* examples/5_local_shm_pipeline_sink.cc */
int main()
{
  SinkKernelB *sinkKernelB = new SinkKernelB("sinkKernel2");

  // This kernel is emulated as another program which is not a part of the pipeline.
  // Another program can be integrated into the pipeline by using the shared-memory-based IPC.
  sinkKernelB->portManager.activateInPortAsShm<MsgType>("i1", "5_local_shm", 1, MSG_SIZE*2);

  std::vector<std::thread> singleKernelThreads;
  vector<flexpipe::Kernel*> singleKernels;
  singleKernels.push_back(sinkKernelB);

  for(int i = 0; i < singleKernels.size(); i++)
  {
    std::thread singleKernelThread(flexpipe::runSingleKernel, singleKernels[i]);
    singleKernelThreads.push_back(std::move(singleKernelThread));
  }

  for(int i = 0; i < singleKernelThreads.size(); i++) singleKernelThreads[i].join();

  return 0;
}

To emulate another process that communicates with FlexPipe pipeline via shared-memory-based IPC, we create another process with a sink. Its input port is activated as the shared memory. You can imagine that any other processes supporting the shared memory IPC of FlexPipe can operate seamlessly with FlexPipe. Or, the port interface can be extended with a new IPC port to make FlexPipe compatible with other software.