Run Multiple Streams in One Graph
| Field | Value |
|---|---|
| Difficulty | Advanced |
| Estimated Read Time | 20-25 minutes |
| Labels | graph, multistream, scheduler, join |
Concept
Run multiple logical streams through one public Graph and combine two named inputs into one deterministic bundle output. This is the pattern behind multi-camera or multi-source systems where related inputs must be aligned before downstream processing.
The graph here is created with:
graph = pyneat.graphs.combine(["left", "right"],
"combined",
pyneat.CombinePolicy.ByFrame)
Each pushed sample carries a stream_id and frame_id. CombinePolicy.ByFrame waits until both named inputs have produced the same frame_id, then emits one bundle from run.pull("combined").
APIs introduced
pyneat.graphs.combine(inputs, output, policy)— build a reusable public Graph fragment.pyneat.CombinePolicy.ByFrame— combine samples whoseSample.frame_idvalues match.pyneat.CombinePolicy.ByPts— combine samples whoseSample.pts_nspresentation timestamps match.run.push("left", [sample])/run.push("right", [sample])— named multi-input push.run.pull("combined")— named output pull.
When to use this
- Multi-camera ingestion where each stream must make progress independently.
- Parallel branch processing (e.g. two models running side-by-side) that must rejoin outputs correctly.
- Diagnosing dropped or misaligned stream outputs under load.
Prerequisites Chapter 012 (Graph basics). Chapter 009 (bundle samples) helps for join semantics.
References
Learning Process
- Generate deterministic per-stream/per-frame samples with explicit tags.
- Build a public combine Graph with named inputs and one named output.
- Push all expected inputs and pull joined outputs.
- Validate output count and bundle cardinality.
Run
Python:
python3 share/sima-neat/tutorials/014_run_multiple_streams/run_multiple_streams.py \
--streams 8 --frames 4
C++ (prebuilt):
./lib/sima-neat/tutorials/tutorial_014_run_multiple_streams \
--streams 8 --frames 4
C++ (build from source):
./build.sh --target tutorial_014_run_multiple_streams
./build/tutorials-standalone/tutorial_014_run_multiple_streams \
--streams 8 --frames 4
To integrate this chapter's C++ source into your own project with a custom CMakeLists.txt (no extras folder required), see How to Run Tutorials on the landing page.
Code
// Multistream public Graph: named inputs -> Combine(ByFrame) -> named output bundle.
//
// Usage:
// tutorial_014_run_multiple_streams [--streams 8] [--frames 4]
#include "neat.h"
#include <cstdint>
#include <iostream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
namespace {
bool get_arg(int argc, char** argv, const std::string& key, std::string& out) {
for (int i = 1; i + 1 < argc; ++i) {
if (key == argv[i]) {
out = argv[i + 1];
return true;
}
}
return false;
}
int parse_int_arg(int argc, char** argv, const std::string& key, int def) {
std::string value;
if (!get_arg(argc, argv, key, value))
return def;
return std::stoi(value);
}
std::vector<int64_t> contiguous_strides_bytes(const std::vector<int64_t>& shape,
int64_t elem_bytes) {
std::vector<int64_t> strides(shape.size(), 0);
int64_t stride = elem_bytes;
for (int i = static_cast<int>(shape.size()) - 1; i >= 0; --i) {
strides[static_cast<size_t>(i)] = stride;
stride *= shape[static_cast<size_t>(i)];
}
return strides;
}
simaai::neat::Sample make_rgb_sample(const std::string& stream_id, int frame_id) {
const int w = 8;
const int h = 6;
const int c = 3;
const std::size_t bytes = static_cast<std::size_t>(w) * h * c;
simaai::neat::Tensor t;
t.device = {simaai::neat::DeviceType::CPU, 0};
t.dtype = simaai::neat::TensorDType::UInt8;
t.layout = simaai::neat::TensorLayout::HWC;
t.shape = {h, w, c};
t.semantic.image = simaai::neat::ImageSpec{simaai::neat::ImageSpec::PixelFormat::RGB, ""};
t.storage = simaai::neat::make_cpu_owned_storage(bytes);
t.strides_bytes = contiguous_strides_bytes(t.shape, 1);
t.read_only = false;
{
auto map = t.map(simaai::neat::MapMode::Write);
auto* p = static_cast<std::uint8_t*>(map.data);
for (std::size_t i = 0; i < bytes; ++i)
p[i] = static_cast<std::uint8_t>(i % 255);
}
t.read_only = true;
simaai::neat::Sample sample;
sample.kind = simaai::neat::SampleKind::Tensor;
sample.tensor = std::move(t);
sample.frame_id = frame_id;
sample.stream_id = stream_id;
return sample;
}
} // namespace
int main(int argc, char** argv) {
try {
const int streams = parse_int_arg(argc, argv, "--streams", 8);
const int frames = parse_int_arg(argc, argv, "--frames", 4);
// CORE LOGIC
// `graphs::Combine` is a normal public Graph fragment. It declares two
// named inputs ("left", "right") and one named output ("combined"). ByFrame
// means the runtime emits one bundle only after both inputs have delivered
// samples with the same Sample::frame_id.
simaai::neat::Graph graph = simaai::neat::graphs::Combine({"left", "right"}, "combined",
simaai::neat::CombinePolicy::ByFrame);
std::cout << graph.describe() << "\n";
simaai::neat::Run run = graph.build();
for (int frame = 0; frame < frames; ++frame) {
for (int sid = 0; sid < streams; ++sid) {
const int logical_frame = frame * streams + sid;
if (!run.push("left", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("left push failed: " + run.last_error());
}
if (!run.push("right", make_rgb_sample(std::to_string(sid), logical_frame))) {
throw std::runtime_error("right push failed: " + run.last_error());
}
}
}
const int expected = streams * frames;
int received = 0;
int first_fields = -1;
for (int i = 0; i < expected; ++i) {
auto maybe_bundle = run.pull("combined", /*timeout_ms=*/2000);
if (!maybe_bundle.has_value()) {
throw std::runtime_error("timed out waiting for combined output");
}
const auto& bundle = *maybe_bundle;
if (first_fields < 0)
first_fields = static_cast<int>(bundle.fields.size());
++received;
if (i < 4) {
std::cout << "bundle stream=" << bundle.stream_id << " fields=" << bundle.fields.size()
<< "\n";
}
}
run.close();
if (received != expected)
throw std::runtime_error("expected=" + std::to_string(expected) +
" received=" + std::to_string(received));
if (first_fields != 2)
throw std::runtime_error("join should emit an image+bbox bundle");
std::cout << "received=" << received << " fields=" << first_fields << "\n";
std::cout << "[OK] 014_run_multiple_streams\n";
return 0;
} catch (const std::exception& e) {
std::cerr << "[FAIL] " << e.what() << "\n";
return 1;
}
}