A DoD-Compatible Real-Time Multi-Sensor Fusion Platform for Missile Defense Simulation
Document Version: 1.1
Classification: UNCLASSIFIED//FOR OFFICIAL USE
ONLY
Last Updated: May 2026
A DoD-Compatible Real-Time Multi-Sensor Fusion Platform for Missile Defense Simulation
FORGE (Federated Operations Research & Grid Environment) is an integrated simulation platform for missile defense research, training, and operational planning. Built with DoD interoperability requirements, FORGE provides real-time multi-sensor fusion, track correlation, threat assessment, and engagement coordination through a modular, cloud-native architecture.
The system addresses a critical need: a realistic, standards-compliant environment integrating diverse sensor simulations, processing track data at scale, and coordinating defensive responses—while adhering to military messaging standards including JREAP (MIL-STD-3011) and Link 16 (MIL-STD-6016).
FORGE implements a distributed microservices architecture designed for high-throughput, low-latency track processing. The architecture separates concerns into distinct components that communicate via Apache Kafka message streams, enabling horizontal scaling and fault tolerance.
┌─────────────────────────────────────────────────────────────────────────────┐
│ FORGE SYSTEM ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ OPIR Sensor │ │ Radar System │ │ External Feed│ │
│ │ Simulation │ │ Simulation │ │ (AIS/ADS-B) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ FORGE-Sensors │ │
│ │ (Normalization) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ ┌──────────────┐ │
│ │ Apache Kafka │────▶│ TimescaleDB │ │
│ │ Message Broker │ │ (Time-Series)│ │
│ └────────┬─────────┘ └──────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ FORGE-C2 │ │ FORGE- │ │ FORGE- │ │
│ │ (Command & │ │ Consumer │ │ Analytics │ │
│ │ Control) │ │ (Pipeline) │ │ (Reporting) │ │
│ └──────┬──────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ├─────────────┬─────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ JREAP │ │ Link 16 │ │ WebSocket │ │
│ │ Formatter │ │ Formatter │ │ Streaming │ │
│ │(MIL-STD- │ │(MIL-STD- │ │ to Clients │ │
│ │ 3011) │ │ 6016) │ │ │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ VIMI Integration │ │
│ │ (Track Correlator)│ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
INFRASTRUCTURE LAYER
┌─────────────────────────────────────────────────────────────────────────────┐
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Kubernetes │ │ Prometheus │ │ Grafana │ │ Velero │ │
│ │ Orchestration│ │ Federation │ │ Dashboards │ │ Backups │ │
│ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
FORGE-C2 serves as the central coordination hub for real-time track management, threat assessment, and engagement coordination. It implements JREAP for beyond-line-of-sight communications and Link 16 tactical data link formatting for DoD interoperability.
Key capabilities include JREAP-A/B/C message handling for satellite, line-of-sight, and IP network transports; Link 16 message formatting (J2, J3, J5, J7, J9 series); WebSocket streaming for real-time client updates; and alert generation for threat notifications.
FORGE-Sensors provides a unified interface for diverse sensor simulations including OPIR satellite simulations for boost-phase detection, radar system simulations for mid-course and terminal phase tracking, and external data feeds (AIS/ADS-B). The component normalizes incoming data into a common track format before publishing to Kafka.
FORGE-Consumer manages the data processing pipeline, consuming normalized sensor data from Kafka for track correlation, state estimation, position prediction, and historical archival to TimescaleDB.
Kafka provides the distributed event streaming backbone, enabling decoupled component design with independent scaling, high-throughput track processing with sub-second latency, and message persistence for replay and recovery.
TimescaleDB provides PostgreSQL-compatible time-series storage optimized for high-volume, timestamped track data, enabling historical analysis, simulation replay, and trend analysis.
FORGE implements critical military messaging standards to ensure interoperability with existing DoD infrastructure:
| Type | Transport | Application |
|---|---|---|
| JREAP-A | Satellite (SATCOM) | Beyond-line-of-sight tracking for distributed operations |
| JREAP-B | Line-of-Sight (LOS) | Direct range extension for tactical units |
| JREAP-C | IP Network (TCP/UDP) | Ground network integration and simulation environments |
Key message series include J2 (track management), J3 (air tracks), J5 (electronic combat), J7 (weapon coordination), and J9 (engagement status).
FORGE operates on Kubernetes with Prometheus federation for metrics, Grafana for dashboards, and Velero for backups—ensuring high availability, scalability, and disaster recovery capabilities.
FORGE fuses track data from multiple sensor sources into a unified operational picture, associating observations from OPIR satellites, radar systems, and external feeds to maintain coherent track identities.
Real-time threat assessment algorithms evaluate incoming tracks against threat profiles, prioritizing engagement decisions based on trajectory analysis, launch point attribution, and impact prediction.
FORGE-C2 provides coordination for defensive response planning: weapon-target pairing optimization, engagement zone management, intercept probability calculations, and kill assessment integration.
FORGE is operational on the miner infrastructure node (207.244.226.151) with integration to the VIMI track correlator. Core components—FORGE-C2, FORGE-Sensors, Kafka, and TimescaleDB—are deployed and functional. JREAP handler implementation and authentication/authorization layers are in active development.
FORGE addresses the critical need for realistic, standards-compliant missile defense simulation. By combining cloud-native architecture with military messaging standards, FORGE enables realistic training environments, algorithm development and testing, interoperability validation against DoD standards, scalable research platforms, and comprehensive after-action analysis.
The platform serves as a foundation for continued research in missile defense technology, sensor fusion algorithms, and engagement coordination strategies.
Document Version: 1.0
Classification: UNCLASSIFIED//FOR OFFICIAL USE ONLY
Last Updated: April 2026
FORGE implements a layered, event-driven architecture designed for high-throughput sensor data processing with real-time track correlation and command-and-control capabilities. The system decomposes naturally into five distinct layers: sensor simulation, messaging, processing, storage, and infrastructure. This separation enables independent scaling, clear ownership boundaries, and the flexibility to replace or upgrade components without disrupting the overall system.
The architecture follows a pipes-and-filters pattern: sensor simulators produce detection events, Kafka provides the reliable transport backbone, processing services consume and transform these streams, and persistent storage captures both time-series telemetry and relational state. Kubernetes orchestrates the deployment, while Prometheus and Grafana provide observability across all layers.
The sensor layer comprises two categories of simulators: OPIR (Overhead Persistent Infrared) and Radar. Each simulator produces realistic detection data conforming to actual sensor characteristics, enabling training and testing without access to classified systems.
SBIRS-GEO (Space-Based Infrared System - Geostationary)
The SBIRS-GEO simulator models high-Earth orbit infrared sensors providing continuous hemispheric coverage. Key characteristics include:
DSP (Defense Support Program)
The legacy DSP simulator provides backward compatibility and comparison scenarios:
NEXTGEN OPIR
The next-generation OPIR simulator models advanced capabilities:
UEWR (Upgraded Early Warning Radar)
The UEWR simulator models large phased-array radars providing long-range detection:
TPY-2 (AN/TPY-2)
The TPY-2 simulator models the transportable X-band radar:
SBX (Sea-Based X-Band Radar)
The SBX simulator models the floating X-band platform:
All simulators produce standardized JSON messages conforming to a common schema:
{
"sensor_id": "SBIRS-GEO-01",
"sensor_type": "OPIR",
"timestamp": "2026-04-10T01:05:32.123Z",
"detection": {
"track_id": "TGT-001-A",
"azimuth": 45.67,
"elevation": 12.34,
"intensity": 0.847,
"confidence": 0.92
},
"metadata": {
"mode": "TRACK",
"scan_pattern": "STEP_SCAN"
}
}FORGE uses Apache Kafka as its central message bus, operating in KRaft mode (Kafka Raft metadata) which eliminates the need for a separate ZooKeeper ensemble. This simplifies deployment and reduces operational complexity while maintaining full Kafka capabilities.
Cluster Configuration:
opir-detections
Raw detection events from all OPIR simulators. High-volume topic receiving continuous streams from SBIRS-GEO, DSP, and NEXTGEN simulators.
radar-tracks
Track data from radar simulators. Lower volume but higher per-message complexity due to multi-target tracking.
correlated-tracks
Fused track data combining OPIR and radar sources. This is the primary output of the correlation process.
c2-messages
Command and control messages for system direction and scenario control.
┌─────────────────────────────────────────────────────┐
│ KAFKA CLUSTER │
│ (KRaft Mode) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Broker │ │ Broker │ │ Broker │ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────┴───────────────┴───────────────┴──────┐ │
│ │ TOPICS │ │
│ │ • opir-detections │ │
│ │ • radar-tracks │ │
│ │ • correlated-tracks │ │
│ │ • c2-messages │ │
│ └─────────────────────────────────────────────┘ │
└───────────────────────┬─────────────────────────────┘
│
┌───────────────────────────────────┼───────────────────────────────┐
│ │ │
▼ ▼ ▼
FORGE-Consumer FORGE-C2 Downstream
(Data Persist) (Correlation) Systems
FORGE-Consumer is a Kafka consumer group responsible for persisting all sensor data to TimescaleDB. It implements:
Key Operations:
opir-detections and
radar-tracksFORGE-C2 handles track management, correlation logic, and message formatting for downstream systems. It maintains the authoritative state of all tracks in the scenario.
Track Correlation Engine:
The correlation engine applies multi-source fusion algorithms to combine OPIR and radar tracks into unified track objects:
┌─────────────────┐ ┌─────────────────┐
│ OPIR Detections │ │ Radar Tracks │
│ (az/el/intensity)│ │ (range/az/el) │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌────────────────────────────────┐
│ CORRELATION ENGINE │
│ • Temporal Alignment │
│ • Spatial Association │
│ • Confidence Weighting │
│ • Track ID Assignment │
└───────────────┬────────────────┘
│
▼
┌───────────────┐
│ Correlated │
│ Track Object │
│ (fused state) │
└───────┬───────┘
│
┌──────────┴──────────┐
▼ ▼
TimescaleDB c2-messages
(historical) (real-time)
Message Formatting:
FORGE-C2 produces standardized messages for downstream consumers:
TimescaleDB extends PostgreSQL with native time-series capabilities, providing:
Schema Design:
-- Detections hypertable
CREATE TABLE opir_detections (
time TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
track_id TEXT,
azimuth DOUBLE PRECISION,
elevation DOUBLE PRECISION,
intensity DOUBLE PRECISION,
confidence DOUBLE PRECISION,
metadata JSONB
);
SELECT create_hypertable('opir_detections', 'time',
chunk_time_interval => INTERVAL '1 day');
-- Continuous aggregate for hourly stats
CREATE MATERIALIZED VIEW opir_hourly
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', time) AS bucket,
sensor_id,
COUNT(*) AS detection_count,
AVG(confidence) AS avg_confidence
FROM opir_detections
GROUP BY bucket, sensor_id;Standard PostgreSQL tables store configuration, scenario definitions, and reference data:
Connection Pooling:
Both TimescaleDB and PostgreSQL use PgBouncer for connection pooling, reducing connection overhead and improving throughput under load.
FORGE runs on a bare-metal Kubernetes cluster with two node classes:
gms-control-plane Node:
miner Nodes:
Deployment Topology:
┌─────────────────────────────────────────────────────────────────────┐
│ KUBERNETES CLUSTER │
│ │
│ ┌───────────────────────┐ ┌───────────────────────────────┐ │
│ │ gms-control-plane │ │ miner nodes │ │
│ │ │ │ │ │
│ │ ┌─────────────────┐ │ │ ┌─────────────────────────┐ │ │
│ │ │ Kafka Broker(s) │ │ │ │ FORGE-Consumer pods │ │ │
│ │ ├─────────────────┤ │ │ │ (scalable replicas) │ │ │
│ │ │ TimescaleDB │ │ │ ├─────────────────────────┤ │ │
│ │ ├─────────────────┤ │ │ │ FORGE-C2 pods │ │ │
│ │ │ PostgreSQL │ │ │ │ (active/passive) │ │ │
│ │ ├─────────────────┤ │ │ ├─────────────────────────┤ │ │
│ │ │ Prometheus │◄─┼───────┼──│ Simulators │ │ │
│ │ ├─────────────────┤ │ │ │ (scenario-specific) │ │ │
│ │ │ Grafana │ │ │ └─────────────────────────┘ │ │
│ │ └─────────────────┘ │ │ │ │
│ └───────────────────────┘ └───────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
A federated Prometheus architecture provides comprehensive observability:
gms-control-plane, scrapes all servicesKey Metrics Collected:
Pre-built dashboards provide real-time visibility:
The network architecture follows a flat model within the Kubernetes cluster, with network policies providing isolation where required:
┌─────────────────────────────────────┐
│ EXTERNAL ACCESS │
│ (Ingress / LoadBalancer) │
└──────────────────┬──────────────────┘
│
┌──────────────────┴──────────────────┐
│ K8S NETWORK │
│ (CNI: Calico/Flannel) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐│
│ │ kafka │ │ forge │ │ db ││
│ │ ns │ │ ns │ │ ns ││
│ │ │ │ │ │ ││
│ │ brokers │ │consumer │ │timescale││
│ │ │ │c2 │ │postgres ││
│ └─────────┘ └─────────┘ └─────────┘│
│ │
│ Network Policies: │
│ • forge ns → kafka ns: 9092 │
│ • forge ns → db ns: 5432 │
│ • external → kafka: restricted │
└───────────────────────────────────────┘
Kubernetes NetworkPolicy resources enforce:
The architecture supports horizontal scaling at multiple points:
| Component | Scaling Method | Trigger |
|---|---|---|
| Kafka Brokers | Add brokers | Storage capacity, throughput |
| Kafka Partitions | Increase partitions | Consumer parallelism |
| FORGE-Consumer | Add replicas | Consumer lag threshold |
| FORGE-C2 | Active/passive | High availability |
| TimescaleDB | Read replicas | Query load |
| Simulators | Per-scenario deployment | Scenario requirements |
The Kafka-based messaging layer provides natural load buffering: during traffic spikes, messages accumulate in topics, allowing consumers to process at sustainable rates without data loss. This decoupling of producers and consumers is fundamental to the architecture’s resilience.
FORGE-C2 serves as the tactical nervous system of the FORGE simulation platform, responsible for translating raw sensor data into actionable track information and distributing command directives across the simulation network. The subsystem implements NATO-standard tactical data link protocols (JREAP and Link 16) while providing modern real-time streaming interfaces for web-based visualization and operator consoles.
The C2 subsystem is implemented as a Go service
(forge-c2) that handles: - Ingestion and parsing of
JREAP-formatted messages from simulated tactical data links - Generation
of Link 16 J-Series messages for track reporting and coordination -
WebSocket-based real-time track distribution to operator consoles -
RESTful APIs for track management, query, and alert configuration -
Correlation of tracks from multiple sensor sources with threat
assessment
The Joint Range Extension Applications Protocol (JREAP) provides a standardized method for encapsulating tactical data link messages over extended-range communication paths. FORGE-C2 implements all three JREAP variants defined in MIL-STD-3011.
JREAP-A is designed for satellite communications where bandwidth is constrained and latency is high. The protocol uses aggressive compression and implements a sliding window acknowledgment mechanism to handle the inherent delays of SATCOM links.
// JREAP-A Message Header (MIL-STD-3011)
type JREAPAHeader struct {
MessageLength uint16 // Total message length in bytes
MessageType uint8 // Message type identifier
MessageID uint16 // Unique message identifier
TimeStamp uint32 // Mission time in milliseconds
SourceTrackNum uint16 // Originating track number
SecurityClass uint8 // Security classification
CompressionFlag uint8 // Compression indicator
}
// ParseJREAPA extracts the payload from a JREAP-A message
func ParseJREAPA(data []byte) (*JREAPAHeader, []byte, error) {
if len(data) < 12 {
return nil, nil, errors.New("JREAP-A header too short")
}
header := &JREAPAHeader{
MessageLength: binary.BigEndian.Uint16(data[0:2]),
MessageType: data[2],
MessageID: binary.BigEndian.Uint16(data[3:5]),
TimeStamp: binary.BigEndian.Uint32(data[5:9]),
SourceTrackNum: binary.BigEndian.Uint16(data[9:11]),
SecurityClass: data[11],
CompressionFlag: data[11] >> 7,
}
payload := data[12:header.MessageLength]
return header, payload, nil
}The JREAP-A handler maintains a transmit queue with priority-based scheduling to ensure critical track updates are transmitted before routine status messages, critical for the bandwidth-limited satellite environment.
JREAP-B operates over line-of-sight (LOS) radio links, typically UHF/VHF tactical radios. Unlike JREAP-A’s streaming approach, JREAP-B uses discrete message encapsulation suitable for the bursty nature of LOS communications.
type JREAPBMessage struct {
SyncPattern uint16 // 0x55AA synchronization marker
MessageLength uint16
MessageType uint8
MessageSubtype uint8
OriginatorID uint16 // Originating unit identifier
RecipientID uint16 // Destination (0xFFFF for broadcast)
Payload []byte
Checksum uint16 // CRC-16-CCITT
}
func (m *JREAPBMessage) Serialize() []byte {
buf := make([]byte, 12+len(m.Payload))
binary.BigEndian.PutUint16(buf[0:2], m.SyncPattern)
binary.BigEndian.PutUint16(buf[2:4], m.MessageLength)
buf[4] = m.MessageType
buf[5] = m.MessageSubtype
binary.BigEndian.PutUint16(buf[6:8], m.OriginatorID)
binary.BigEndian.PutUint16(buf[8:10], m.RecipientID)
copy(buf[10:10+len(m.Payload)], m.Payload)
// Calculate CRC-16-CCITT checksum
m.Checksum = crc16.ChecksumCCITT(buf[0:10+len(m.Payload)])
binary.BigEndian.PutUint16(buf[10+len(m.Payload):12+len(m.Payload)], m.Checksum)
return buf
}JREAP-C is the modern IP-based variant, designed for transmission over tactical IP networks including satellite IP (SATCOM IP) and terrestrial data networks. It uses TCP for reliable delivery and supports larger message sizes than the radio-based variants.
// JREAP-C operates over TCP/IP with JSON or binary encoding
type JREAPCEnvelope struct {
Version string `json:"version"` // Protocol version
MessageID string `json:"messageId"` // UUID for tracking
Timestamp time.Time `json:"timestamp"` // ISO 8601 timestamp
Source JREAPSource `json:"source"`
Classification string `json:"classification"` // UNCLASSIFIED // SECRET
Payload json.RawMessage `json:"payload"`
}
type JREAPSource struct {
UnitID string `json:"unitId"`
UnitName string `json:"unitName"`
PlatformType string `json:"platformType"`
}
// JREAP-C handler manages persistent TCP connections
type JREAPCHandler struct {
connections sync.Map // Active connections by unit ID
inbound chan []byte // Inbound message channel
outbound chan []byte // Outbound message channel
}
func (h *JREAPCHandler) HandleConnection(conn net.Conn) {
decoder := json.NewDecoder(conn)
for {
var envelope JREAPCEnvelope
if err := decoder.Decode(&envelope); err != nil {
log.Printf("JREAP-C decode error: %v", err)
return
}
h.inbound <- envelope.Payload
}
}Link 16 is the primary tactical data link for NATO forces. FORGE-C2 generates J-Series messages for track reporting, coordination, and weapons management. The formatter converts internal track representations into properly formatted Link 16 binary messages.
J2.x messages handle track lifecycle operations including initiation, update, and dropping of tracks from the tactical picture.
// J2.2 - Track Update Message
type J2Message struct {
TrackNumber uint16 // Track number (0-9999)
TrackQuality uint8 // Track quality (TQ) 0-15
TrackLatitude int32 // Latitude in 0.0001 degree units
TrackLongitude int32 // Longitude in 0.0001 degree units
TrackAltitude int32 // Altitude in feet
TrackSpeed uint16 // Speed in knots
TrackHeading uint16 // Heading in degrees * 0.0055
TrackCategory uint8 // Air, Surface, Subsurface, Land
Force uint8 // Friendly, Hostile, Neutral, Unknown
IFFMode1 uint8 // IFF/SIF Mode 1 code
IFFMode2 uint8 // IFF/SIF Mode 2 code
IFFMode3A uint8 // IFF/SIF Mode 3/A code
}
func FormatJ2(msg *J2Message) []byte {
// Link 16 uses a complex bit-packing format
// This is a simplified representation
buf := make([]byte, 24)
// Word 1: Track number and quality
binary.BigEndian.PutUint16(buf[0:2], msg.TrackNumber)
buf[2] = msg.TrackQuality
// Word 2-3: Position (compressed latitude/longitude)
binary.BigEndian.PutUint32(buf[3:7], uint32(msg.TrackLatitude))
binary.BigEndian.PutUint32(buf[7:11], uint32(msg.TrackLongitude))
// Word 4: Altitude
binary.BigEndian.PutUint32(buf[11:15], uint32(msg.TrackAltitude))
// Word 5: Kinematics
binary.BigEndian.PutUint16(buf[15:17], msg.TrackSpeed)
binary.BigEndian.PutUint16(buf[17:19], msg.TrackHeading)
// Word 6: Track classification
buf[19] = msg.TrackCategory
buf[20] = msg.Force
buf[21] = msg.IFFMode1
buf[22] = msg.IFFMode2
buf[23] = msg.IFFMode3A
return buf
}J3.x messages provide detailed air track information including flight levels, airspeed, and aircraft-specific parameters.
// J3.2 - Air Track Position Message
type J3AirTrack struct {
TrackNumber uint16
Latitude float64 // Decimal degrees
Longitude float64 // Decimal degrees
Altitude float64 // Feet MSL
AirSpeed float64 // Knots
Heading float64 // Degrees true
VerticalSpeed int16 // Feet per minute (positive = climbing)
SpecialPurpose string // Mission code
}
func FormatJ3Air(track *J3AirTrack) ([]byte, error) {
// Convert to Link 16 coordinate system
lat := int32(track.Latitude * 65536.0 / 90.0) // Convert to bit units
lon := int32(track.Longitude * 65536.0 / 180.0) // Convert to bit units
alt := int32(track.Altitude / 6.25) // 6.25 foot resolution
buf := make([]byte, 28)
binary.BigEndian.PutUint16(buf[0:2], track.TrackNumber)
binary.BigEndian.PutUint32(buf[2:6], uint32(lat))
binary.BigEndian.PutUint32(buf[6:10], uint32(lon))
binary.BigEndian.PutUint32(buf[10:14], uint32(alt))
return buf, nil
}J7.x messages assign tracks to tactical missions and coordinate engagement responsibilities.
// J7.1 - Mission Assignment
type J7MissionAssignment struct {
MissionID uint16
MissionType uint8 // CAP, Strike, SEAD, CAS, etc.
AssignedUnit uint16 // Unit assigned to mission
TargetTrackNum uint16 // Target track number (if applicable)
Priority uint8 // Mission priority 1-15
TimeOnStation uint32 // Time expected on station
RulesOfEngagement uint8 // ROE level
}
func FormatJ7(mission *J7MissionAssignment) []byte {
buf := make([]byte, 16)
binary.BigEndian.PutUint16(buf[0:2], mission.MissionID)
buf[2] = mission.MissionType
binary.BigEndian.PutUint16(buf[3:5], mission.AssignedUnit)
binary.BigEndian.PutUint16(buf[5:7], mission.TargetTrackNum)
buf[7] = mission.Priority
binary.BigEndian.PutUint32(buf[8:12], mission.TimeOnStation)
buf[12] = mission.RulesOfEngagement
return buf
}These message series handle the coordination of weapons engagements and engagement status reporting.
// J9.0 - Engagement Assignment
type J9Engagement struct {
EngageTrackNum uint16 // Target track number
EngagingUnit uint16 // Unit performing engagement
WeaponType uint8 // Weapon type code
EngagementTime uint32 // Time of engagement initiation
}
// J10.x - Weapons Coordination
type J10WeaponsCoord struct {
RequestingUnit uint16
WeaponsAvailable uint16 // Number of weapons available
WeaponsCommitted uint16 // Number committed to engagements
ReloadTime uint32 // Time to reload in seconds
}
// J12.x - Control Orders
type J12ControlOrder struct {
OrderType uint8 // HANDOVER, ASSIGN, RELEASE, ABORT
TargetTrackNum uint16
ControllingUnit uint16 // Unit issuing order
ReceivingUnit uint16 // Unit receiving order
Priority uint8
}The WebSocket interface provides real-time track updates to operator consoles and web-based visualization clients. This interface implements a publish-subscribe pattern where clients subscribe to track updates and receive pushed messages.
type TrackUpdate struct {
Type string `json:"type"` // "track_update"
TrackNumber string `json:"trackNumber"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Altitude float64 `json:"altitude"`
Speed float64 `json:"speed"`
Heading float64 `json:"heading"`
Force string `json:"force"` // "friendly", "hostile", "neutral", "unknown"
Category string `json:"category"` // "air", "surface", "subsurface", "land"
Source string `json:"source"` // Sensor source identifier
Timestamp time.Time `json:"timestamp"`
}
type WSHub struct {
clients map[*WSClient]bool
register chan *WSClient
unregister chan *WSClient
broadcast chan []byte
mu sync.RWMutex
}
func (h *WSHub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
delete(h.clients, client)
close(client.send)
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.RUnlock()
}
}
}
func (h *WSHub) BroadcastTrack(update TrackUpdate) {
data, err := json.Marshal(update)
if err != nil {
log.Printf("Failed to marshal track update: %v", err)
return
}
h.broadcast <- data
}Alerts are high-priority notifications that require operator attention, such as hostile track detections or system failures.
type Alert struct {
ID string `json:"id"`
Severity string `json:"severity"` // "critical", "warning", "info"
Category string `json:"category"` // "track", "system", "engagement"
Title string `json:"title"`
Description string `json:"description"`
TrackNumber string `json:"trackNumber,omitempty"`
Timestamp time.Time `json:"timestamp"`
Acknowledged bool `json:"acknowledged"`
}
func (h *WSHub) BroadcastAlert(alert Alert) {
// Alerts are sent with high priority - they bypass normal queues
data, _ := json.Marshal(map[string]interface{}{
"type": "alert",
"alert": alert,
})
h.broadcast <- data
}type WSClient struct {
hub *WSHub
conn *websocket.Conn
send chan []byte
subscriptions map[string]bool // Track number subscriptions
}
func (c *WSClient) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break
}
var cmd struct {
Action string `json:"action"`
Tracks []string `json:"tracks,omitempty"`
}
if err := json.Unmarshal(message, &cmd); err != nil {
continue
}
switch cmd.Action {
case "subscribe":
for _, track := range cmd.Tracks {
c.subscriptions[track] = true
}
case "unsubscribe":
for _, track := range cmd.Tracks {
delete(c.subscriptions, track)
}
}
}
}The track manager correlates observations from multiple sensor sources to maintain a single coherent tactical picture. Correlation uses a multi-hypothesis approach that considers position, kinematics, and identification data.
type TrackCorrelator struct {
tracks map[string]*Track // Track number -> Track
sourceMap map[string][]string // Source ID -> Track numbers
correlationThreshold float64 // Position correlation threshold (NM)
}
type Track struct {
TrackNumber string
Position Position
Kinematics Kinematics
Identity Identity
Sources []TrackSource
Quality uint8
LastUpdate time.Time
Age time.Duration
}
func (tc *TrackCorrelator) CorrelateObservation(obs *Observation) *Track {
// Find best matching track within correlation threshold
bestTrack := ""
bestScore := tc.correlationThreshold
for trackNum, track := range tc.tracks {
score := tc.computeCorrelationScore(obs, track)
if score < bestScore {
bestScore = score
bestTrack = trackNum
}
}
if bestTrack != "" {
// Update existing track
tc.updateTrack(bestTrack, obs)
return tc.tracks[bestTrack]
}
// Create new track
return tc.initiateTrack(obs)
}
func (tc *TrackCorrelator) computeCorrelationScore(obs *Observation, track *Track) float64 {
// Distance in nautical miles
dist := haversineDistance(
obs.Latitude, obs.Longitude,
track.Position.Latitude, track.Position.Longitude,
)
// Kinematic comparison (speed/heading difference)
kinScore := math.Abs(obs.Speed-track.Kinematics.Speed) / 100.0
kinScore += math.Abs(obs.Heading-track.Kinematics.Heading) / 180.0
return dist*0.7 + kinScore*0.3
}Threat assessment evaluates the potential danger posed by each track based on kinematics, identification, and proximity to protected assets.
type ThreatAssessment struct {
ThreatLevel uint8 // 0 (none) to 15 (maximum)
ThreatType string // "air", "surface", "subsurface", "missile"
TimeToTarget float64 // Estimated time to reach nearest asset (seconds)
WeaponRange float64 // Estimated weapon range (NM)
HostileIntent bool // Hostile behavior indicators
EngagementZone string // "W", "W-D", "W-D-A" engagement zones
}
func (tm *TrackManager) AssessThreat(track *Track) *ThreatAssessment {
assessment := &ThreatAssessment{}
// Determine threat level based on force and behavior
switch track.Identity.Force {
case "hostile":
assessment.ThreatLevel = 10
assessment.HostileIntent = true
case "unknown":
assessment.ThreatLevel = 5
case "neutral":
assessment.ThreatLevel = 2
case "friendly":
assessment.ThreatLevel = 0
}
// Calculate time to nearest protected asset
minTime := math.MaxFloat64
for _, asset := range tm.protectedAssets {
dist := haversineDistance(
track.Position.Latitude, track.Position.Longitude,
asset.Latitude, asset.Longitude,
)
if track.Kinematics.Speed > 0 {
timeToTarget := (dist / track.Kinematics.Speed) * 3600 // Convert to seconds
if timeToTarget < minTime {
minTime = timeToTarget
}
}
}
assessment.TimeToTarget = minTime
// Adjust threat level based on proximity
if minTime < 300 { // Within 5 minutes
assessment.ThreatLevel = min(15, assessment.ThreatLevel + 3)
} else if minTime < 900 { // Within 15 minutes
assessment.ThreatLevel = min(15, assessment.ThreatLevel + 1)
}
// Determine engagement zone
if assessment.TimeToTarget < 60 {
assessment.EngagementZone = "W-D-A" // Weapons engagement
} else if assessment.TimeToTarget < 300 {
assessment.EngagementZone = "W-D" // Weapons designation
} else {
assessment.EngagementZone = "W" // Warning
}
return assessment
}Tracks progress through a defined lifecycle: initiation, active tracking, and eventual drop when no longer observed.
type TrackState int
const (
TrackStateInitiating TrackState = iota
TrackStateActive
TrackStateCoasting
TrackStateDropped
)
func (tm *TrackManager) UpdateTrackLifecycle(track *Track) {
age := time.Since(track.LastUpdate)
switch track.State {
case TrackStateInitiating:
if len(track.Sources) >= tm.confirmationThreshold {
track.State = TrackStateActive
} else if age > tm.initiationTimeout {
track.State = TrackStateDropped
}
case TrackStateActive:
if age > tm.coastThreshold {
track.State = TrackStateCoasting
}
case TrackStateCoasting:
if age > tm.dropThreshold {
track.State = TrackStateDropped
}
}
}
func (tm *TrackManager) ProcessTrackDrops() {
for trackNum, track := range tm.tracks {
if track.State == TrackStateDropped {
delete(tm.tracks, trackNum)
tm.notifyTrackDrop(trackNum)
}
}
}// GET /api/v1/tracks - List all active tracks
func (s *Server) handleListTracks(w http.ResponseWriter, r *http.Request) {
tracks := s.trackManager.GetAllTracks()
response := struct {
Tracks []TrackResponse `json:"tracks"`
Count int `json:"count"`
}{
Tracks: make([]TrackResponse, len(tracks)),
Count: len(tracks),
}
for i, track := range tracks {
response.Tracks[i] = TrackResponse{
TrackNumber: track.TrackNumber,
Latitude: track.Position.Latitude,
Longitude: track.Position.Longitude,
Altitude: track.Position.Altitude,
Speed: track.Kinematics.Speed,
Heading: track.Kinematics.Heading,
Force: track.Identity.Force,
Category: track.Identity.Category,
Quality: track.Quality,
LastUpdate: track.LastUpdate,
}
}
json.NewEncoder(w).Encode(response)
}
// GET /api/v1/tracks/{id} - Get specific track
// POST /api/v1/alerts - Create new alert
// GET /api/v1/alerts - List active alerts
// PUT /api/v1/alerts/{id}/acknowledge - Acknowledge alert
// POST /api/v1/tracks/{id}/engage - Initiate engagement
func (s *Server) handleEngageTrack(w http.ResponseWriter, r *http.Request) {
trackNum := chi.URLParam(r, "id")
var req struct {
EngagingUnit string `json:"engagingUnit"`
WeaponType string `json:"weaponType"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Generate J9 engagement message
engagement := &J9Engagement{
EngageTrackNum: parseTrackNumber(trackNum),
EngagingUnit: parseUnitID(req.EngagingUnit),
WeaponType: parseWeaponType(req.WeaponType),
EngagementTime: uint32(time.Now().Unix()),
}
// Send to Link 16 formatter
msg := FormatJ9(engagement)
s.link16Sender.Send(msg)
// Record engagement
s.trackManager.RecordEngagement(trackNum, req.EngagingUnit)
json.NewEncoder(w).Encode(map[string]string{
"status": "engaged",
"trackNumber": trackNum,
"engagingUnit": req.EngagingUnit,
})
}// GET /ws - WebSocket upgrade endpoint
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
client := &WSClient{
hub: s.hub,
conn: conn,
send: make(chan []byte, 256),
subscriptions: make(map[string]bool),
}
s.hub.register <- client
// Start read/write pumps
go client.writePump()
go client.readPump()
}The FORGE-C2 subsystem provides a complete command and control implementation for defense simulation, integrating legacy tactical data link protocols (JREAP, Link 16) with modern web-based real-time interfaces. The modular architecture allows for protocol extensibility while maintaining strict compliance with NATO messaging standards. The track management subsystem ensures accurate correlation and threat assessment across multiple sensor inputs, providing operators with a coherent tactical picture for decision-making.
FORGE’s data pipeline processes sensor data through a series of stages: ingestion, normalization, correlation, storage, and distribution. All data flows through Apache Kafka, which provides durability, replayability, and horizontal scaling. Military messaging standards (JREAP and Link 16) ensure DoD interoperability.
┌─────────────────────────────────────────────────────────────────────────────┐
│ FORGE DATA FLOW PIPELINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ OPIR Sensor │ │ Radar System │ │ External │ │ DoD Seismic │ │
│ │ Simulator │ │ Simulator │ │ Feeds │ │ Simulator │ │
│ │ │ │ │ │ (AIS/ADS-B) │ │ │ │
│ └──────┬──────┘ └──────┬───────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ └─────────────────┼──────────────────┼─────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ SENSOR NORMALIZATION LAYER │ │
│ │ (JSON Schema Validation) │ │
│ └────────────────┬────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ APACHE KAFKA │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │opir-detections│ │ radar-tracks │ │external-feeds│ │ │
│ │ │ (6 parts) │ │ (6 parts) │ │ (3 parts) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │track-updates │ │correlated- │ │ alerts │ │ │
│ │ │ (6 parts) │ │tracks (3) │ │ (3 parts) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │c2-messages │ │link16-reports│ │ │
│ │ │ (6 parts) │ │ (6 parts) │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼─────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ FORGE │ │ VIMI Track │ │ FORGE-C2 │ │
│ │ Consumer │ │ Correlator │ │ (C2 Layer) │ │
│ │ │ │ │ │ │ │
│ │ TimescaleDB │ │ Multi-Source│ │ JREAP/Link16│ │
│ │ Writer │ │ Fusion │ │ Formatting │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TimescaleDB │ │ PostgreSQL │ │ WebSocket │ │
│ │ (Time-Series)│ │ (Relational)│ │ Clients │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
| Topic | Partitions | Replication | Min ISR | Purpose |
|---|---|---|---|---|
| opir-detections | 6 | 1 | 1 | Space-based IR detections |
| radar-tracks | 6 | 1 | 1 | Ground-based radar tracks |
| track-updates | 6 | 1 | 1 | Track position updates |
| correlated-tracks | 3 | 1 | 1 | Fused multi-source tracks |
| c2-messages | 6 | 1 | 1 | Command & control messages |
| link16-reports | 6 | 1 | 1 | Link 16 formatted messages |
| jreap-messages | 6 | 1 | 1 | JREAP formatted messages |
| alerts | 3 | 1 | 1 | Threat alerts |
#!/bin/bash
# Create Kafka topics for FORGE
TOPICS=(
"opir-detections:6:1:1"
"radar-tracks:6:1:1"
"track-updates:6:1:1"
"correlated-tracks:3:1:1"
"c2-messages:6:1:1"
"link16-reports:6:1:1"
"jreap-messages:6:1:1"
"alerts:3:1:1"
)
for TOPIC_SPEC in "${TOPICS[@]}"; do
TOPIC=$(echo $TOPIC_SPEC | cut -d: -f1)
PARTITIONS=$(echo $TOPIC_SPEC | cut -d: -f2)
REPLICATION=$(echo $TOPIC_SPEC | cut -d: -f3)
MIN_ISR=$(echo $TOPIC_SPEC | cut -d: -f4)
kafka-topics.sh --create \
--topic "$TOPIC" \
--partitions "$PARTITIONS" \
--replication-factor "$REPLICATION" \
--config min.insync.replicas="$MIN_ISR" \
--bootstrap-server kafka:9092
done| Environment | Address |
|---|---|
| Internal (K8s) | kafka.forge.svc.cluster.local:9092 |
| External (NodePort) | 207.244.226.151:30721 |
All sensor data is normalized to a common JSON format before publishing to Kafka:
{
"detection_id": "OPIR-SBIRS-GEO-1-1775013473732455512",
"timestamp": "2026-04-01T03:17:53.732455512Z",
"sensor_id": "SBIRS-GEO-1",
"sensor_type": "SBIRS-GEO",
"orbital_slot": "GEO-95W",
"target_type": "ICBM",
"launch_point": {
"latitude": 39.0,
"longitude": 127.0,
"altitude": 0
},
"impact_point": {
"latitude": 42.5,
"longitude": 135.0,
"altitude": 0
},
"band": "SWIR",
"radiance": 8.04,
"temperature": 2500.5,
"intensity": 0.85,
"confidence": 0.98,
"track_id": "TRK-1234",
"track_phase": "BOOST",
"velocity": {"x": 3.5, "y": 2.1, "z": 0.5},
"position": {"x": 100.0, "y": -200.0, "z": 300.0},
"signal_to_noise": 25.5,
"clutter_level": 0.1
}{
"track_id": "RADAR-UEWR-BEALE-1775041685345701470",
"timestamp": "2026-04-01T11:08:05.345701470Z",
"radar_id": "UEWR-BEALE",
"radar_type": "UEWR",
"location": {
"latitude": 39.1,
"longitude": -121.4,
"altitude": 0
},
"target_type": "IRBM",
"range": 3138.83,
"azimuth": 218.69,
"elevation": 60.99,
"velocity": 6.78,
"heading": 74.65,
"altitude": 316.49,
"rcs": -11.15,
"snr": 28.76,
"range_rate": 2.89,
"doppler": 1792.78,
"track_quality": 9,
"confidence": 0.79,
"track_phase": "TRACKING",
"discriminated_type": "DECOY",
"discrimination_confidence": 0.80
}{
"track_id": "TRK-1775013953",
"timestamp": "2026-04-01T03:25:53.199583Z",
"source_tracks": [
"OPIR-SBIRS-GEO-1-123",
"RADAR-UEWR-BEALE-456"
],
"target_type": "ICBM",
"threat_level": 5,
"position": {"x": 100.0, "y": -200.0, "z": 300.0},
"velocity": {"x": 3.5, "y": 2.1, "z": 0.5},
"launch_point": {"latitude": 39.0, "longitude": 127.0, "altitude": 0},
"impact_point": {"latitude": 42.5, "longitude": 135.0, "altitude": 0},
"track_phase": "MIDCOURSE",
"track_quality": 9,
"sensor_count": 2,
"confidence": 0.92,
"discriminated_type": "RV",
"discrimination_confidence": 0.88
}JREAP (Joint Range Extension Applications Protocol) provides beyond-line-of-sight communications for tactical data links.
| Type | Transport | Use Case | Latency Budget |
|---|---|---|---|
| JREAP-A | Satellite (SATCOM) | Distributed operations | <500ms |
| JREAP-B | Line-of-Sight (LOS) | Direct range extension | <100ms |
| JREAP-C | IP Network (TCP/UDP) | Ground network/simulation | <50ms |
┌────────────────────────────────────────────────────────────┐
│ JREAP MESSAGE FORMAT │
├────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┬─────────┬─────────┬─────────────────────────┐ │
│ │ Header │ Source │ Track │ Track Data │ │
│ │ (8 B) │ ID (4B) │ ID (4B) │ (Variable) │ │
│ └─────────┴─────────┴─────────┴─────────────────────────┘ │
│ │
│ HEADER FORMAT: │
│ ┌──────────────┬──────────────┬──────────────┬──────────┐ │
│ │ Version (1B) │ Type (1B) │ Length (2B) │ Reserved │ │
│ │ 0x01 │ A=1/B=2/C=3 │ Payload len │ (4B) │ │
│ └──────────────┴──────────────┴──────────────┴──────────┘ │
│ │
│ TRACK DATA (JREAP-C Example): │
│ ┌──────────────┬──────────────┬──────────────┬──────────┐ │
│ │ Latitude │ Longitude │ Altitude │ Velocity │ │
│ │ (4B, deg*1e7)│ (4B, deg*1e7)│ (4B, m*100) │ (4B, m/s)│ │
│ └──────────────┴──────────────┴──────────────┴──────────┘ │
│ │
└────────────────────────────────────────────────────────────┘
// JREAP message types
type JREAPType int
const (
JREAPA JREAPType = 1 // Satellite
JREAPB JREAPType = 2 // Line-of-Sight
JREAPC JREAPType = 3 // IP Network
)
type JREAPHeader struct {
Version byte
Type JREAPType
Length uint16
Timestamp uint32
SourceID uint32
}
type JREAPMessage struct {
Header JREAPHeader
TrackID uint32
Latitude int32 // degrees * 1e7
Longitude int32 // degrees * 1e7
Altitude int32 // meters * 100
Velocity int32 // m/s * 100
Heading uint16 // degrees * 100
TrackType byte
Threat byte
}
// Encode JREAP-C message
func (m *JREAPMessage) Encode() ([]byte, error) {
buf := new(bytes.Buffer)
// Header
binary.Write(buf, binary.BigEndian, m.Header.Version)
binary.Write(buf, binary.BigEndian, byte(m.Header.Type))
binary.Write(buf, binary.BigEndian, m.Header.Length)
binary.Write(buf, binary.BigEndian, m.Header.Timestamp)
binary.Write(buf, binary.BigEndian, m.Header.SourceID)
// Track data
binary.Write(buf, binary.BigEndian, m.TrackID)
binary.Write(buf, binary.BigEndian, m.Latitude)
binary.Write(buf, binary.BigEndian, m.Longitude)
binary.Write(buf, binary.BigEndian, m.Altitude)
binary.Write(buf, binary.BigEndian, m.Velocity)
binary.Write(buf, binary.BigEndian, m.Heading)
binary.Write(buf, binary.BigEndian, m.TrackType)
binary.Write(buf, binary.BigEndian, m.Threat)
return buf.Bytes(), nil
}Link 16 is the primary tactical data link for U.S. and allied forces, providing secure, jam-resistant communication.
| Series | Purpose | Priority | FORGE Support |
|---|---|---|---|
| J2 | Track Management | P0 | ✅ Implemented |
| J3 | Air Track | P0 | ✅ Implemented |
| J5 | Electronic Combat | P1 | ⏳ Planned |
| J7 | Weapon Coordination | P1 | ⏳ Planned |
| J9 | Engagement Status | P1 | ⏳ Planned |
┌────────────────────────────────────────────────────────────┐
│ LINK 16 J-SERIES FORMAT │
├────────────────────────────────────────────────────────────┤
│ │
│ WORD 0 (Message Header): │
│ ┌────────┬────────┬────────┬────────┬────────┬──────────┐ │
│ │TSD │RI │SR │SRS │BIC │Word │ │
│ │(1b) │(1b) │(1b) │(1b) │(1b) │Count │ │
│ └────────┴────────┴────────┴────────┴────────┴──────────┘ │
│ │
│ WORD 1 (Track ID): │
│ ┌────────────────────┬──────────────────────────────────┐ │
│ │ Track Number (15b) │ Track Quality (5b) + Status (12b)│ │
│ └────────────────────┴──────────────────────────────────┘ │
│ │
│ WORD 2 (Position): │
│ ┌────────────────────┬──────────────────────────────────┐ │
│ │ Latitude (15b) │ Longitude (15b) │ │
│ │ (0.0054 deg resol) │ (0.0054 deg resol) │ │
│ └────────────────────┴──────────────────────────────────┘ │
│ │
│ WORD 3 (Altitude/Velocity): │
│ ┌────────────────────┬──────────────────────────────────┐ │
│ │ Altitude (15b) │ Speed (15b) │ │
│ │ (6.1m resol) │ (1 kt resol) │ │
│ └────────────────────┴──────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘
// Link 16 message types
type Link16MessageType int
const (
J2 Link16MessageType = 2 // Track Management
J3 Link16MessageType = 3 // Air Track
J5 Link16MessageType = 5 // Electronic Combat
J7 Link16MessageType = 7 // Weapon Coordination
J9 Link16MessageType = 9 // Engagement Status
)
type Link16Message struct {
MessageType Link16MessageType
TrackNumber uint16
TrackQuality uint8
Latitude int32 // 0.0054 degree resolution
Longitude int32 // 0.0054 degree resolution
Altitude int32 // 6.1m resolution
Speed uint16 // 1 kt resolution
Heading uint16 // 0.088 degree resolution
TrackType uint8
IFFMode uint8
}
// Convert track to Link 16 J3 message
func TrackToJ3(track CorrelatedTrack) (*Link16Message, error) {
return &Link16Message{
MessageType: J3,
TrackNumber: extractTrackNumber(track.TrackID),
TrackQuality: uint8(track.TrackQuality),
Latitude: int32(track.Position.Lat / 0.0054),
Longitude: int32(track.Position.Lon / 0.0054),
Altitude: int32(track.Position.Alt / 6.1),
Speed: uint16(track.Velocity.Speed),
Heading: uint16(track.Velocity.Heading / 0.088),
TrackType: mapTrackType(track.TargetType),
IFFMode: 0,
}, nil
}
func mapTrackType(targetType string) uint8 {
switch targetType {
case "ICBM":
return 1
case "IRBM":
return 2
case "SRBM":
return 3
case "GLCM":
return 4
default:
return 0 // Unknown
}
}┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Sensor │ │ Kafka │ │ VIMI │ │ FORGE-C2│ │ Client │
│ (OPIR) │ │ (Topic) │ │Correlate│ │ │ │ (WS) │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │ │
│ Detection │ │ │ │
│──────────────▶│ │ │ │
│ (JSON) │ │ │ │
│ │ Consume │ │ │
│ │──────────────▶│ │ │
│ │ │ │ │
│ │ │ Correlate │ │
│ │ │───┐ │ │
│ │ │◀──┘ │ │
│ │ │ │ │
│ │ Correlated │ │ │
│ │◀──────────────│ │ │
│ │ Track │ │ │
│ │ │ │ Consume │
│ │──────────────────────────────▶│ │
│ │ │ │ │
│ │ │ │ JREAP/Link16 │
│ │ │ │───┐ │
│ │ │ │◀──┘ │
│ │ │ │ │
│ │ │ │ WebSocket │
│ │ │ │──────────────▶│
│ │ │ │ (Track/Alert)│
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
| Stage | Budget | Typical | Notes |
|---|---|---|---|
| Sensor → Kafka | <10ms | 5ms | Network + serialization |
| Kafka → Consumer | <20ms | 10ms | Poll interval + deserialization |
| Consumer → Database | <30ms | 15ms | Batch insert (100 records) |
| Track Correlation | <50ms | 25ms | Fusion algorithm |
| Alert Generation | <20ms | 10ms | Threat assessment |
| JREAP/Link16 Format | <10ms | 5ms | Message encoding |
| WebSocket Push | <10ms | 5ms | Client notification |
| Total P95 | <100ms | 75ms | End-to-end |
| Storage | Retention | Compression | Notes |
|---|---|---|---|
| Kafka | 7 days | Snappy | Raw sensor data |
| TimescaleDB | 90 days | Zstd | Compressed chunks |
| PostgreSQL | 1 year | None | Track metadata |
| Backup | 90 days | Velero | Full cluster backup |
FORGE’s data flow architecture provides:
Document Version: 1.0 Last Updated: April 2026
The FORGE system’s sensor fusion architecture integrates multiple heterogeneous sensor types into a unified operational picture. This section describes the simulation models for OPIR and radar sensors, the data ingestion pipeline, and the track correlation algorithms that produce fused tracks for downstream command and control systems.
Overhead Persistent Infrared (OPIR) sensors provide the primary space-based detection capability in FORGE simulations. The system models three generations of infrared sensor constellations, each with distinct spectral characteristics and operational parameters.
The Space-Based Infrared System Geosynchronous (SBIRS-GEO) constellation represents the current operational standard. Each satellite carries two primary infrared sensors:
The simulation models a 6-satellite constellation in geosynchronous orbit, providing continuous coverage from 60°S to 60°N latitude. Each satellite maintains a staring sensor array with configurable revisit rates. The default scan pattern produces full-earth coverage every 8-12 seconds, with detection latency modeled as a function of target altitude and background clutter.
Defense Support Program (DSP) satellites provide legacy LWIR coverage:
The DSP model simulates 3-4 operational satellites with degraded resolution compared to SBIRS. Detection probabilities are reduced by approximately 15-20% for modern threat signatures but remain effective for large boost-phase events.
The Next Generation Overhead Persistent Infrared (NEXTGEN OPIR) represents future capability:
OPIR detections are published to Kafka as JSON messages with the following schema:
{
"detection_id": "opir-sbirs-g1-20260407-001428",
"timestamp": "2026-04-07T01:14:28.123Z",
"sensor_id": "sbirs-g1",
"sensor_type": "opir",
"constellation": "sbirs-geo",
"band": "MWIR",
"target_estimate": {
"latitude": 42.847,
"longitude": 12.394,
"altitude_km": 145.2,
"velocity_ms": [324.5, -12.3, 891.2],
"covariance": [
[0.45, 0.02, -0.01],
[0.02, 0.38, 0.03],
[-0.01, 0.03, 0.52]
]
},
"confidence": 0.87,
"signature_strength_db": -42.3,
"clutter_score": 0.12,
"event_type": "boost_phase"
}| Sensor | Revisit Time | Detection P (Boost) | Detection P (Midcourse) | False Alarm Rate |
|---|---|---|---|---|
| SBIRS-GEO | 8-12s | 0.95 | 0.72 | 0.001/min |
| DSP | 10s | 0.78 | 0.55 | 0.003/min |
| NEXTGEN | 6-8s | 0.98 | 0.85 | 0.0005/min |
Detection probability is modeled as a function of target altitude, velocity, thermal signature intensity, and atmospheric conditions. The simulation applies Monte Carlo sampling to determine individual detection events.
Ground-based radar systems provide precision tracking capability and are essential for midcourse and terminal phase engagement support.
The UEWR systems, including the Clear Air Force Station and Beale AFB installations, operate in the UHF band (420-450 MHz):
UEWR provides broad area surveillance and early warning with robust performance against countermeasures due to lower frequency operation.
The AN/TPY-2 is a transportable X-band (8-12 GHz) radar optimized for ballistic missile defense:
TPY-2 provides high-precision tracking for fire control quality data, enabling engagement solution computation.
The Sea-Based X-band radar combines X-band precision with mobile deployment:
Legacy phased-array systems remain in the simulation:
Radar track data includes higher precision measurements and kinematic state vectors:
{
"track_id": "tpy2-track-20260407-0847",
"timestamp": "2026-04-07T01:14:35.456Z",
"sensor_id": "tpy2-japan",
"sensor_type": "radar",
"track_number": "T-0847",
"track_quality": "firm",
"state_vector": {
"position_ecf_m": [4589234.5, 1234567.8, 4123456.2],
"velocity_ecf_ms": [234.5, 891.2, -156.3],
"covariance_ecf": [
[12.4, 0.8, -0.3],
[0.8, 8.7, 0.5],
[-0.3, 0.5, 15.2]
]
},
"geodetic": {
"latitude": 38.294,
"longitude": 141.856,
"altitude_km": 234.5
},
"features": {
"rcs_m2": 0.45,
"range_resolution_m": 0.25,
"doppler_width_ms": 12.3,
"amplitude_fluctuation": 0.15
},
"track_history": [
{"time": "2026-04-07T01:14:30.000Z", "range_km": 892.3, "azimuth_deg": 34.5},
{"time": "2026-04-07T01:14:32.000Z", "range_km": 878.1, "azimuth_deg": 35.2},
{"time": "2026-04-07T01:14:34.000Z", "range_km": 864.2, "azimuth_deg": 35.9}
],
"confidence": 0.94,
"maneuver_indicator": false
}The FORGE-Consumer service bridges the simulation output and persistence layer, consuming sensor data from Kafka and persisting to TimescaleDB with optimized batch processing.
The consumer operates in three distinct consumer groups to isolate processing responsibilities:
sensor.detections topictrack.correlated topicsimulation.events topicEach consumer group scales horizontally, with partition assignment handled by Kafka’s built-in consumer group protocol. The default configuration uses 12 partitions per topic, enabling parallel consumption across multiple consumer instances.
The consumer implements adaptive batch insertion with the following parameters:
Batch aggregation reduces database round-trips by 95% compared to single-row inserts. The consumer monitors backpressure and dynamically adjusts batch sizing based on TimescaleDB insert latency metrics.
Detection and track data are stored in TimescaleDB hypertables optimized for time-series queries:
-- Detections hypertable
CREATE TABLE detections (
time TIMESTAMPTZ NOT NULL,
detection_id TEXT PRIMARY KEY,
simulation_id TEXT NOT NULL,
sensor_id TEXT NOT NULL,
sensor_type TEXT NOT NULL,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
altitude_km DOUBLE PRECISION,
confidence DOUBLE PRECISION,
event_type TEXT,
raw_json JSONB
);
SELECT create_hypertable('detections', 'time', chunk_time_interval => INTERVAL '1 hour');
-- Tracks hypertable
CREATE TABLE tracks (
time TIMESTAMPTZ NOT NULL,
track_id TEXT NOT NULL,
simulation_id TEXT NOT NULL,
fused_track_id TEXT,
source_track_id TEXT[],
position_ecf VECTOR(3),
velocity_ecf VECTOR(3),
covariance VECTOR(9),
track_quality TEXT,
confidence DOUBLE PRECISION
);
SELECT create_hypertable('tracks', 'time', chunk_time_interval => INTERVAL '1 hour');Continuous aggregates provide pre-computed rollups for dashboard queries:
CREATE MATERIALIZED VIEW detections_hourly
WITH (timescaledb.continuous) AS
SELECT sensor_id,
time_bucket('1 hour', time) AS bucket,
COUNT(*) AS detection_count,
AVG(confidence) AS avg_confidence
FROM detections
GROUP BY sensor_id, bucket;The pipeline maintains the following SLO targets:
| Metric | Target | p99 Target |
|---|---|---|
| Kafka-to-Consumer Latency | < 50ms | < 200ms |
| Consumer-to-DB Latency | < 100ms | < 500ms |
| End-to-End Latency | < 200ms | < 750ms |
| Throughput | 50,000 msg/s | — |
Real-time latency monitoring integrates with Prometheus, exposing histograms for each processing stage.
Track correlation transforms raw sensor detections into unified tracks, resolving multiple sensor perspectives into a single coherent picture.
The correlation engine implements a three-phase algorithm:
The system maintains separate correlation pools for: - OPIR-to-OPIR (infrared cross-correlation) - Radar-to-Radar (kinematic correlation) - OPIR-to-Radar (heterogeneous fusion)
The correlation score uses the Mahalanobis distance accounting for combined sensor covariance:
D² = (x₁ - x₂)ᵀ (P₁ + P₂)⁻¹ (x₁ - x₂)
Where x₁, x₂ are position estimates and P₁, P₂ are covariance matrices. The gating threshold uses a chi-squared distribution with 6 degrees of freedom (3 position + 3 velocity):
Fused track confidence combines source confidences using probabilistic fusion:
P_fused = 1 - Πᵢ(1 - Pᵢ)
For n sensors with individual confidence Pᵢ, the fused confidence reflects the information gain from multiple independent observations.
The Track ID Manager assigns unique identifiers using a namespace scheme:
<fusion_type>-<region>-<sequence>-<timestamp>
Example: FUSED-INDO-PAC-001428-20260407
Track IDs persist across correlation cycles, with parent-child relationships maintained for track branching events (e.g., target fragmentation).
{
"fused_track_id": "FUSED-INDO-PAC-001428-20260407",
"timestamp": "2026-04-07T01:14:42.789Z",
"source_tracks": [
{"sensor_id": "sbirs-g1", "track_ref": "opir-sbirs-g1-20260407-001428"},
{"sensor_id": "tpy2-japan", "track_ref": "tpy2-track-20260407-0847"}
],
"fused_state": {
"position_ecf_m": [4589000.2, 1234500.5, 4123400.8],
"velocity_ecf_ms": [234.2, 891.5, -156.1],
"position_covariance": [
[8.2, 0.4, -0.2],
[0.4, 5.9, 0.3],
[-0.2, 0.3, 9.8]
]
},
"confidence": 0.97,
"correlation_score": 4.23,
"track_age_seconds": 12.4
}┌─────────────────────────────────────────────────────────────────────────────┐
│ SENSOR SIMULATION │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ SBIRS-GEO │ │ DSP Heritage│ │ NEXTGEN OPIR│ │ UEWR/TPY-2/SBX/etc │ │
│ │ (SWIR/MWIR) │ │ (LWIR) │ │ (Enhanced) │ │ (Radar Trackers) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────────┬──────────┘ │
└─────────┼────────────────┼────────────────┼────────────────────┼────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA CLUSTER │
│ ┌───────────────────────┐ ┌───────────────────────┐ ┌──────────────────┐ │
│ │ sensor.detections │ │ sensor.radar_tracks │ │ simulation.events │ │
│ │ (12 partitions) │ │ (12 partitions) │ │ (6 partitions) │ │
│ └───────────┬───────────┘ └───────────┬───────────┘ └────────┬─────────┘ │
└──────────────┼──────────────────────────┼───────────────────────┼──────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ FORGE-CONSUMER SERVICE │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Consumer Groups: detections, tracks, events │ │
│ │ • Adaptive batch insertion (100-10,000 records) │ │
│ │ • 500ms max flush timeout │ │
│ │ • Backpressure monitoring │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ Track Correlation Engine │ │
│ │ • Kinematic gating (χ² threshold: 16.81) │ │
│ │ • Mahalanobis distance scoring │ │
│ │ • Global nearest-neighbor assignment │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ TIMESCALEDB CLUSTER │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────────┐ │
│ │ detections │ │ tracks │ │ fused_tracks │ │
│ │ (1hr chunks) │ │ (1hr chunks) │ │ (continuous aggs) │ │
│ └─────────────────────┘ └─────────────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
| Stage | Mean Latency | p99 Latency | Notes |
|---|---|---|---|
| Sensor → Kafka | 5-15ms | 50ms | Simulation publish time |
| Kafka → Consumer | 10-30ms | 100ms | Consumer poll interval |
| Correlation Processing | 20-50ms | 150ms | Gating + scoring + assignment |
| Consumer → DB | 15-40ms | 200ms | Batch insert with commit |
| End-to-End | 50-135ms | 500ms | Detection to persisted track |
| Metric | Sustained | Burst | Notes |
|---|---|---|---|
| Detections/sec | 15,000 | 50,000 | Peak threat scenario |
| Tracks/sec | 5,000 | 20,000 | Correlated output |
| Kafka partitions | 30 | — | 12 × 2 topics + events |
| Consumer instances | 4-12 | — | Auto-scaled by topic lag |
| DB inserts/sec | 20,000 | 80,000 | Batch-optimized inserts |
The architecture supports horizontal scaling at each layer: additional simulation instances increase detection volume proportionally; additional Kafka partitions and consumer instances maintain processing latency under load; TimescaleDB chunk partitioning ensures query performance remains constant as data volume grows.
FORGE exposes multiple integration interfaces to support diverse operational requirements: REST APIs for synchronous queries, WebSockets for real-time streaming, and Kafka for high-throughput event ingestion. This section documents the API specifications, integration patterns, and operational standards for connecting external systems to FORGE.
The FORGE REST API provides synchronous access to track data, alerts, and system status. All endpoints follow RESTful conventions with JSON request/response bodies.
| Parameter | Value |
|---|---|
| Base URL | http://forge-api.forge.svc:8080 (internal) |
| External URL | http://207.244.226.151:30080 (NodePort) |
| API Version | v1 |
| Content-Type | application/json |
| Method | Path | Description | Auth Required |
|---|---|---|---|
| Track Queries | |||
| GET | /api/v1/tracks |
List all active tracks | Yes |
| GET | /api/v1/tracks/{track_id} |
Get specific track details | Yes |
| GET | /api/v1/tracks/{track_id}/history |
Track position history | Yes |
| GET | /api/v1/tracks/search |
Search tracks by criteria | Yes |
| POST | /api/v1/tracks/correlate |
Manual track correlation | Yes |
| Alert Management | |||
| GET | /api/v1/alerts |
List active alerts | Yes |
| GET | /api/v1/alerts/{alert_id} |
Get alert details | Yes |
| POST | /api/v1/alerts/{alert_id}/acknowledge |
Acknowledge alert | Yes |
| POST | /api/v1/alerts/{alert_id}/resolve |
Resolve alert | Yes |
| GET | /api/v1/alerts/history |
Alert history with filters | Yes |
| System Health | |||
| GET | /api/v1/health |
Basic health check | No |
| GET | /api/v1/health/ready |
Readiness probe | No |
| GET | /api/v1/health/live |
Liveness probe | No |
| GET | /api/v1/metrics |
Prometheus metrics | Yes |
| GET | /api/v1/status |
System status summary | Yes |
| Sensor Management | |||
| GET | /api/v1/sensors |
List registered sensors | Yes |
| GET | /api/v1/sensors/{sensor_id} |
Sensor details | Yes |
| POST | /api/v1/sensors/{sensor_id}/enable |
Enable sensor feed | Yes |
| POST | /api/v1/sensors/{sensor_id}/disable |
Disable sensor feed | Yes |
GET /api/v1/tracks?status=active&threat_level=5&limit=100 HTTP/1.1
Host: forge-api.forge.svc:8080
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
Accept: application/json
{
"tracks": [
{
"track_id": "TRK-1775013953",
"timestamp": "2026-04-09T12:30:00.000Z",
"target_type": "ICBM",
"threat_level": 5,
"position": {"lat": 39.5, "lon": 128.2, "alt": 450.0},
"velocity": {"speed": 6.8, "heading": 74.6},
"track_phase": "MIDCOURSE",
"sensor_count": 3,
"confidence": 0.95
}
],
"pagination": {
"total": 15,
"offset": 0,
"limit": 100
}
}POST /api/v1/alerts/ALT-12345/acknowledge HTTP/1.1
Host: forge-api.forge.svc:8080
Authorization: Bearer eyJhbGciOiJSUzI1NiIs...
Content-Type: application/json
{
"acknowledged_by": "operator-001",
"notes": "Tracking, awaiting further data"
}
WebSocket connections provide real-time streaming of track updates, alerts, and formatted military messages (JREAP/Link 16) to display systems and C2 clients.
| Endpoint | Purpose | Message Format |
|---|---|---|
ws://forge-api:8080/ws/tracks |
Real-time track updates | JSON |
ws://forge-api:8080/ws/alerts |
Alert notifications | JSON |
ws://forge-api:8080/ws/jreap |
JREAP message stream | Binary |
ws://forge-api:8080/ws/link16 |
Link 16 J-series messages | Binary |
ws://forge-api:8080/ws/all |
Combined stream | JSON/Binary |
// WebSocket client connection
const ws = new WebSocket('ws://207.244.226.151:30080/ws/tracks');
// Connection authentication
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'auth',
token: 'eyJhbGciOiJSUzI1NiIs...'
}));
};
// Message handling
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'track_update':
updateTrackDisplay(data.track);
break;
case 'track_drop':
removeTrack(data.track_id);
break;
case 'alert':
showAlertNotification(data.alert);
break;
}
};
// Heartbeat to maintain connection
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
// Subscription management
ws.send(JSON.stringify({
type: 'subscribe',
channels: ['tracks', 'alerts'],
filters: {
threat_level_min: 3,
region: 'PACIFIC'
}
}));{
"type": "track_update",
"timestamp": "2026-04-09T12:30:00.500Z",
"track": {
"track_id": "TRK-1775013953",
"target_type": "ICBM",
"threat_level": 5,
"position": {"lat": 39.5, "lon": 128.2, "alt": 450.0},
"velocity": {"speed": 6.8, "heading": 74.6, "climb_rate": 2.1},
"track_phase": "MIDCOURSE",
"confidence": 0.95,
"source_tracks": ["OPIR-SBIRS-GEO-1-123", "RADAR-UEWR-BEALE-456"]
}
}┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │ │ FORGE-C2 │ │ Kafka │
│ (Browser) │ │ (WS Server)│ │ (Topics) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ 1. WebSocket Connect │ │
│─────────────────────────────────▶│ │
│ │ │
│ 2. Auth Token │ │
│─────────────────────────────────▶│ │
│ │ │
│ 3. Auth Success │ │
│◀─────────────────────────────────│ │
│ │ │
│ 4. Subscribe (channels/filters) │ │
│─────────────────────────────────▶│ │
│ │ │
│ │ 5. Consume track-updates │
│ │◀─────────────────────────────────│
│ │ │
│ 6. Track Update (JSON) │ │
│◀─────────────────────────────────│ │
│ │ │
│ 7. Ping (heartbeat) │ │
│─────────────────────────────────▶│ │
│ │ │
│ 8. Pong │ │
│◀─────────────────────────────────│ │
│ │ │
▼ ▼ ▼
External systems can integrate directly with Kafka for high-throughput data exchange. This section documents producer/consumer configurations and best practices.
# Kafka Producer Configuration (Go sarama)
producer:
client_id: "forge-external-producer"
brokers:
- "kafka.forge.svc:9092"
# Reliability settings
required_acks: "all" # Wait for all replicas
max_retries: 3
retry_backoff: 100ms
# Batching for throughput
batch_size: 100
batch_timeout: 10ms
# Compression
compression: "snappy"
# Idempotency (prevent duplicates)
idempotent: true// Go producer implementation
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 100 * time.Millisecond
config.Producer.Return.Successes = true
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Bytes = 1024 * 1024 // 1MB batch
config.Producer.Flush.Frequency = 10 * time.Millisecond
config.Producer.Idempotent = true
producer, _ := sarama.NewSyncProducer([]string{"kafka:9092"}, config)
message := &sarama.ProducerMessage{
Topic: "track-updates",
Key: sarama.StringEncoder(track.TrackID),
Value: sarama.ByteEncoder(trackJSON),
Headers: []sarama.RecordHeader{
{Key: []byte("source"), Value: []byte("external-sensor")},
{Key: []byte("timestamp"), Value: []byte(time.Now().Format(time.RFC3339))},
},
}
partition, offset, _ := producer.SendMessage(message)# Kafka Consumer Configuration
consumer:
group_id: "forge-external-consumer"
brokers:
- "kafka.forge.svc:9092"
# Consumer group settings
initial_offset: "newest"
session_timeout: 10s
heartbeat_interval: 3s
# Processing settings
commit_interval: 1s
auto_commit: false # Manual commits for reliability
# Fetch settings
fetch_min: 1
fetch_max: 1024 * 1024 # 1MB max per fetch
fetch_wait_max: 100ms
# Concurrency
concurrency: 10 # Goroutines per partition// Go consumer implementation
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Group.Session.Timeout = 10 * time.Second
config.Consumer.Group.Heartbeat.Interval = 3 * time.Second
config.Consumer.MaxProcessingTime = 100 * time.Millisecond
group, _ := sarama.NewConsumerGroup([]string{"kafka:9092"}, "forge-consumer", config)
handler := &ConsumerHandler{
ready: make(chan bool),
process: func(msg *sarama.ConsumerMessage) error {
// Process message
track := parseTrack(msg.Value)
storeTrack(track)
return nil
},
}
for {
group.Consume(ctx, []string{"track-updates", "alerts"}, handler)
}| Category | Pattern | Example | Description |
|---|---|---|---|
| Sensor Data | {sensor-type}-{data-kind} |
opir-detections, radar-tracks |
Raw sensor input |
| Processed Data | {data-kind} |
track-updates, correlated-tracks |
Processed output |
| Military Messages | {standard}-messages |
jreap-messages, link16-reports |
Formatted messages |
| System Events | system-{event-type} |
system-alerts, system-health |
Operational events |
| Topic | Partitions | Key | Partitioning Strategy |
|---|---|---|---|
| opir-detections | 6 | sensor_id |
Sensor locality |
| radar-tracks | 6 | radar_id |
Radar locality |
| track-updates | 6 | track_id |
Track affinity |
| correlated-tracks | 3 | track_id |
Track affinity |
| c2-messages | 6 | message_id |
Round-robin |
FORGE supports integration with external simulators, C2 systems, and tactical displays through standardized interfaces.
┌─────────────────────────────────────────────────────────────────────────────┐
│ EXTERNAL INTEGRATION INTERFACES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Simulators │ │ C2 Systems │ │ Displays │ │
│ │ │ │ │ │ │ │
│ │ • OPIR Sim │ │ • IBCS │ │ • C2PC │ │
│ │ • Radar Sim │ │ • AEGIS │ │ • FalconView │ │
│ │ • Threat Sim │ │ • THAAD C2 │ │ • CPOF │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └───────────────────────┼───────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ INTEGRATION GATEWAY │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Kafka │ │ WebSocket │ │ REST API │ │ │
│ │ │ Ingestion │ │ Streaming │ │ Gateway │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ FORGE CORE │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
External simulators push data to FORGE via Kafka or REST API:
# Simulator registration
simulator:
id: "opir-simulator-001"
type: "OPIR"
sensors:
- id: "SBIRS-GEO-1"
type: "SBIRS-GEO"
orbital_slot: "GEO-95W"
- id: "SBIRS-GEO-2"
type: "SBIRS-GEO"
orbital_slot: "GEO-105E"
# Connection method
connection:
type: "kafka"
brokers: ["kafka.forge.svc:9092"]
topic: "opir-detections"
# Data format
format:
type: "json"
schema: "opir-detection-v2"C2 systems consume FORGE output via WebSocket or Kafka:
# C2 system integration
c2_system:
id: "ibcs-node-001"
type: "IBCS"
# JREAP configuration
jreap:
type: "JREAP-C" # IP-based
endpoint: "tcp://0.0.0.0:47001"
message_format: "binary"
# WebSocket backup
websocket:
url: "ws://forge-api:8080/ws/jreap"
reconnect_interval: 5s
# Filtering
filters:
threat_level_min: 3
region: ["PACIFIC", "WESTERN-CONUS"]# Display client configuration
display:
id: "tactical-display-001"
type: "C2PC"
# WebSocket connection
websocket:
url: "ws://forge-api:8080/ws/tracks"
heartbeat: 30s
# Display preferences
preferences:
track_icons: true
threat_colors:
level_5: "#FF0000" # Red - Critical
level_4: "#FF6600" # Orange - High
level_3: "#FFCC00" # Yellow - Medium
level_2: "#00CCFF" # Cyan - Low
level_1: "#FFFFFF" # White - Minimal
projection: "mercator"
update_interval: 100msFORGE implements a layered authentication model supporting JWT tokens for users, API keys for services, and mTLS for internal service-to-service communication.
| Method | Use Case | Implementation |
|---|---|---|
| JWT Bearer | User sessions, web clients | RS256 signed tokens |
| API Key | Service accounts, automation | Static keys with rotation |
| mTLS | Service-to-service, Kafka | X.509 certificates |
# JWT Configuration
jwt:
issuer: "forge-auth"
audience: "forge-api"
# RS256 key pair (stored in Vault)
private_key: "vault:secret/forge/jwt-private-key"
public_key: "vault:secret/forge/jwt-public-key"
# Token lifetime
access_token_ttl: 1h
refresh_token_ttl: 24h
# Claims
required_claims:
- sub # Subject (user ID)
- iat # Issued at
- exp # Expiration
- roles # User roles// JWT validation middleware
func JWTMiddleware(publicKey *rsa.PublicKey) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
respondError(w, http.StatusUnauthorized, "missing authorization header")
return
}
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
token, err := jwt.Parse(tokenString, func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodRSA); !ok {
return nil, fmt.Errorf("unexpected signing method")
}
return publicKey, nil
})
if err != nil || !token.Valid {
respondError(w, http.StatusUnauthorized, "invalid token")
return
}
// Add claims to context
ctx := context.WithValue(r.Context(), "claims", token.Claims)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}# API Key configuration
api_keys:
storage: "vault"
prefix: "forge_" # forge_xxxxxxxxxxxx
length: 24
# Rotation policy
rotation:
enabled: true
interval: 90d
notification: 7d # Notify before expiration
# Scopes
scopes:
read_only: ["tracks:read", "alerts:read"]
read_write: ["tracks:read", "tracks:write", "alerts:read", "alerts:write"]
admin: ["*"]┌───────────────────────────────────────────────────────────────────────┐
│ mTLS Service Architecture │
├───────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ FORGE-C2 │ │ TimescaleDB│ │
│ │ (Client) │ │ (Server) │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ │ ┌─────────────────────────────────────┐ │ │
│ │ │ TLS HANDSHAKE │ │ │
│ │ │ │ │ │
│ │ │ 1. Client ────────▶ Server │ │ │
│ │ │ (ClientHello + ClientCert) │ │ │
│ │ │ │ │ │
│ │ │ 2. Server ────────▶ Client │ │ │
│ │ │ (ServerHello + ServerCert) │ │ │
│ │ │ │ │ │
│ │ │ 3. Mutual certificate verification│ │ │
│ │ │ │ │ │
│ │ │ 4. Encrypted channel established │ │ │
│ │ │ │ │ │
│ └──┴─────────────────────────────────────┴───┘ │
│ │
│ CERTIFICATES (managed by Vault PKI): │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ CA: forge-cluster-ca │ │
│ │ Cert TTL: 720h (30 days) │ │
│ │ Key Type: ECDSA P-256 │ │
│ │ Rotation: Automatic via cert-manager │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────┘
FORGE implements multi-layer rate limiting to protect services from overload and ensure fair resource allocation.
| Client Type | Endpoint | Limit | Window |
|---|---|---|---|
| Anonymous | /api/v1/health |
60 | 1 minute |
| Authenticated User | /api/v1/tracks |
1000 | 1 minute |
| Authenticated User | /api/v1/alerts |
500 | 1 minute |
| Service Account | All endpoints | 10000 | 1 minute |
| WebSocket | Connection | 10 | 1 minute |
| WebSocket | Messages | 100 | 1 second |
// Token bucket rate limiter
type RateLimiter struct {
mu sync.Mutex
clients map[string]*ClientBucket
rate int // Tokens per second
capacity int // Bucket capacity
}
type ClientBucket struct {
tokens float64
lastUpdate time.Time
}
func (rl *RateLimiter) Allow(clientID string) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
bucket, exists := rl.clients[clientID]
if !exists {
bucket = &ClientBucket{
tokens: float64(rl.capacity),
lastUpdate: time.Now(),
}
rl.clients[clientID] = bucket
}
// Refill tokens
elapsed := time.Since(bucket.lastUpdate)
bucket.tokens += float64(rl.rate) * elapsed.Seconds()
if bucket.tokens > float64(rl.capacity) {
bucket.tokens = float64(rl.capacity)
}
bucket.lastUpdate = time.Now()
if bucket.tokens >= 1 {
bucket.tokens--
return true
}
return false
}
// Middleware
func RateLimitMiddleware(limiter *RateLimiter) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
clientID := getClientID(r) // From JWT or API key
if !limiter.Allow(clientID) {
w.Header().Set("X-RateLimit-Remaining", "0")
w.Header().Set("Retry-After", "60")
respondError(w, http.StatusTooManyRequests, "rate limit exceeded")
return
}
next.ServeHTTP(w, r)
})
}
}┌───────────────────────────────────────────────────────────────────────┐
│ BACKPRESSURE FLOW CONTROL │
├───────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Client │ │ Gateway │ │ Service │ │
│ │ │ │ (Buffer) │ │ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ │ Request │ │ │
│ │─────────────────▶│ │ │
│ │ │ │ │
│ │ │ Check buffer │ │
│ │ │───┐ │ │
│ │ │◀──┘ │ │
│ │ │ │ │
│ │ │ [Buffer < 80%] │ │
│ │ │─────────────────▶│ │
│ │ │ │ │
│ │ │ [Buffer > 80%] │ │
│ │ │───┐ │ │
│ │ │◀──┘ HTTP 503 │ │
│ │◀─────────────────│ Retry-After │ │
│ │ 503 + Backoff │ │ │
│ │ │ │ │
│ │ │ [Buffer > 95%] │ │
│ │ │───┐ │ │
│ │ │◀──┘ Circuit Open │ │
│ │◀─────────────────│ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ │
│ THRESHOLDS: │
│ • 80%: Start returning 503 with Retry-After │
│ • 95%: Open circuit breaker │
│ • 100%: Drop connections │
│ │
└───────────────────────────────────────────────────────────────────────┘
{
"error": {
"code": "TRACK_NOT_FOUND",
"message": "Track TRK-999999 not found in system",
"details": {
"track_id": "TRK-999999",
"suggestion": "Verify track ID or check track history"
},
"request_id": "req-abc123",
"timestamp": "2026-04-09T12:30:00.000Z"
}
}| Code | HTTP Status | Description |
|---|---|---|
UNAUTHORIZED |
401 | Missing or invalid authentication |
FORBIDDEN |
403 | Insufficient permissions |
NOT_FOUND |
404 | Resource does not exist |
TRACK_NOT_FOUND |
404 | Track ID not found |
ALERT_NOT_FOUND |
404 | Alert ID not found |
BAD_REQUEST |
400 | Invalid request format |
VALIDATION_ERROR |
400 | Request validation failed |
RATE_LIMITED |
429 | Rate limit exceeded |
SERVICE_UNAVAILABLE |
503 | Service temporarily unavailable |
CIRCUIT_OPEN |
503 | Circuit breaker triggered |
INTERNAL_ERROR |
500 | Unexpected server error |
# Retry configuration
retry:
max_attempts: 3
initial_backoff: 100ms
max_backoff: 5s
backoff_multiplier: 2
# Retryable errors
retryable_errors:
- "SERVICE_UNAVAILABLE"
- "RATE_LIMITED"
- "INTERNAL_ERROR"
- "TIMEOUT"
# Non-retryable errors (fail immediately)
non_retryable:
- "UNAUTHORIZED"
- "FORBIDDEN"
- "NOT_FOUND"
- "BAD_REQUEST"
- "VALIDATION_ERROR"// Circuit breaker implementation
type CircuitBreaker struct {
mu sync.Mutex
state State // Closed, Open, HalfOpen
failures int
successCount int
threshold int
timeout time.Duration
lastFailure time.Time
}
type State int
const (
Closed State = iota
Open
HalfOpen
)
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
switch cb.state {
case Open:
if time.Since(cb.lastFailure) > cb.timeout {
cb.state = HalfOpen
cb.successCount = 0
} else {
cb.mu.Unlock()
return ErrCircuitOpen
}
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = Open
}
return err
}
cb.failures = 0
if cb.state == HalfOpen {
cb.successCount++
if cb.successCount >= cb.threshold/2 {
cb.state = Closed
}
}
return nil
}┌───────────────────────────────────────────────────────────────────────┐
│ CIRCUIT BREAKER STATE MACHINE │
├───────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ CLOSED │ │
│ │ (Normal) │ │
│ └──────┬──────┘ │
│ │ │
│ │ Failures > Threshold │
│ ▼ │
│ ┌─────────────┐ │
│ │ OPEN │ │
│ │ (Failing) │ │
│ └──────┬──────┘ │
│ │ │
│ │ Timeout elapsed │
│ ▼ │
│ ┌─────────────┐ │
│ │ HALF-OPEN │ │
│ │ (Testing) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ▼ │ ▼ │
│ ┌─────────────┐ │ ┌─────────────┐ │
│ │ CLOSED │ │ │ OPEN │ │
│ │ (Recovered) │ │ │ (Still bad) │ │
│ └─────────────┘ │ └─────────────┘ │
│ ▲ │ ▲ │
│ │ │ │ │
│ └───────────────┴────────────────┘ │
│ Success Failure │
│ │
└───────────────────────────────────────────────────────────────────────┘
FORGE’s integration architecture provides:
These integration patterns ensure FORGE can interoperate with the diverse ecosystem of defense systems while maintaining reliability and security standards required for operational deployments.
Document Version: 1.0
Last Updated: April 2026
FORGE is designed for high-throughput, low-latency sensor data processing. This section documents performance benchmarks, scaling strategies, and capacity planning for production deployments.
| Component | Throughput | Unit | Notes |
|---|---|---|---|
| Kafka (single node) | 100,000 | msg/s | 1KB messages |
| FORGE-Consumer | 50,000 | msg/s | Batch insert |
| TimescaleDB Insert | 100,000 | rows/s | Batch mode |
| WebSocket Broadcast | 10,000 | clients/s | Single server |
| JREAP Formatter | 20,000 | msg/s | Per core |
| Operation | P50 | P95 | P99 | Max |
|---|---|---|---|---|
| Sensor → Kafka | 5ms | 10ms | 15ms | 50ms |
| Kafka → Consumer | 10ms | 20ms | 30ms | 100ms |
| Consumer → TimescaleDB | 15ms | 30ms | 50ms | 200ms |
| Track Correlation | 20ms | 40ms | 60ms | 150ms |
| JREAP Formatting | 2ms | 5ms | 10ms | 20ms |
| WebSocket Push | 3ms | 8ms | 15ms | 50ms |
| End-to-End | 55ms | 113ms | 180ms | 570ms |
| Component | CPU Usage | Memory | Disk I/O | Network |
|---|---|---|---|---|
| Kafka (idle) | 5% | 2GB | 10 MB/s | 1 Mbps |
| Kafka (peak) | 70% | 4GB | 500 MB/s | 100 Mbps |
| TimescaleDB (idle) | 10% | 4GB | 5 MB/s | 0.5 Mbps |
| TimescaleDB (peak) | 80% | 8GB | 200 MB/s | 50 Mbps |
| FORGE-Consumer | 30% | 1GB | 50 MB/s | 20 Mbps |
| FORGE-C2 | 20% | 512MB | 10 MB/s | 10 Mbps |
Partition Count Formula:
partitions = max(target_throughput / producer_throughput,
target_throughput / consumer_throughput)
Example:
Target: 100,000 msg/s
Producer per partition: 20,000 msg/s
Consumer per partition: 15,000 msg/s
partitions = max(100000/20000, 100000/15000)
= max(5, 6.7)
= 7 partitions (round up to 6)
# Optimal consumer configuration
max.poll.records: 500 # Records per poll
fetch.min.bytes: 1048576 # 1MB minimum fetch
fetch.max.wait.ms: 100 # Max wait time
max.partition.fetch.bytes: 1048576
connections.max.idle.ms: 540000
metadata.max.age.ms: 300000| Topic | Retention | Compaction | Size Limit |
|---|---|---|---|
| opir-detections | 7 days | No | 50 GB |
| radar-tracks | 7 days | No | 50 GB |
| correlated-tracks | 30 days | Yes | 20 GB |
| c2-messages | 90 days | Yes | 10 GB |
| alerts | 90 days | Yes | 5 GB |
-- Create hypertable with optimized chunk size
SELECT create_hypertable(
'opir_detections',
'timestamp',
chunk_time_interval => INTERVAL '1 hour',
if_not_exists => TRUE
);
-- Enable compression
ALTER TABLE opir_detections SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id',
timescaledb.compress_orderby = 'timestamp DESC'
);
-- Compression policy
SELECT add_compression_policy(
'opir_detections',
INTERVAL '7 days'
);| Table | Chunk Interval | Estimated Size | Compressed Size |
|---|---|---|---|
| opir_detections | 1 hour | 500 MB | 50 MB |
| radar_tracks | 1 hour | 300 MB | 30 MB |
| correlated_tracks | 4 hours | 100 MB | 10 MB |
-- Optimized track query
EXPLAIN ANALYZE
SELECT * FROM correlated_tracks
WHERE timestamp >= NOW() - INTERVAL '1 hour'
AND threat_level >= 4
ORDER BY timestamp DESC
LIMIT 100;
-- Result: 15ms (P95), Index scan on timestamp-- Primary indexes
CREATE INDEX idx_opir_timestamp ON opir_detections (timestamp DESC);
CREATE INDEX idx_opir_sensor ON opir_detections (sensor_id, timestamp DESC);
CREATE INDEX idx_opir_track ON opir_detections (track_id) WHERE track_id IS NOT NULL;
-- Composite indexes for common queries
CREATE INDEX idx_correlated_threat ON correlated_tracks
(threat_level DESC, timestamp DESC)
WHERE threat_level >= 3;┌─────────────────────────────────────────────────────────────┐
│ KAFKA CLUSTER SCALING │
├─────────────────────────────────────────────────────────────┤
│ │
│ Development (1 node): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ broker-0 (KRaft controller + broker) │ │
│ │ Partitions: 6 | Replication: 1 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Production (3 nodes): │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ broker-0 │ │ broker-1 │ │ broker-2 │ │
│ │ (broker) │ │ (broker) │ │ (broker) │ │
│ │ KRaft: 1 │ │ KRaft: 2 │ │ KRaft: 3 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ Partitions: 12 | Replication: 3 | Min ISR: 2 │
│ │
│ Scale trigger: >70% CPU or >100K msg/s sustained │
│ │
└─────────────────────────────────────────────────────────────┘
# Horizontal Pod Autoscaler for FORGE-Consumer
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: forge-consumer-hpa
namespace: forge
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: forge-consumer
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
consumer_group: forge-consumer
target:
type: AverageValue
averageValue: "1000"# HPA for API servers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: forge-api-hpa
spec:
scaleTargetRef:
kind: Deployment
name: forge-api
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 60
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "1000"| Node Type | CPU Cores | Memory | Disk | Use Case |
|---|---|---|---|---|
| Small | 4 | 16 GB | 100 GB | Development |
| Medium | 8 | 32 GB | 500 GB | Staging |
| Large | 16 | 64 GB | 1 TB | Production |
| XLarge | 32 | 128 GB | 2 TB | High-throughput |
| Indicator | Threshold | Action |
|---|---|---|
| CPU > 80% sustained | 5 min | Add cores or scale horizontally |
| Memory > 85% | 5 min | Increase memory limit |
| Disk I/O wait > 20% | 5 min | Add faster storage or scale |
| GC pauses > 100ms | Per event | Increase heap or optimize |
┌─────────────────────────────────────────────────────────────┐
│ SCALING DECISION TREE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Start │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ CPU > 70% for 5m? │ │
│ └──────────┬──────────┘ │
│ Yes │ No │
│ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Can add pods? │ │ Memory > 85%? │ │
│ │ (Kafka partitions │ └──────────┬──────────┘ │
│ │ > consumer count) │ │ │
│ └──────────┬──────────┘ Yes │ No │
│ Yes │ No ▼ │
│ ▼ ┌─────────────────────┐ │
│ ┌─────────────────────┐ │ Scale vertically │ │
│ │ Scale horizontally │ │ (increase limits) │ │
│ │ (add pods) │ └─────────────────────┘ │
│ └─────────────────────┘ │
│ │
│ If both fail: Scale cluster (add nodes) │
│ │
└─────────────────────────────────────────────────────────────┘
| Metric | Threshold | Action | Cooldown |
|---|---|---|---|
| CPU utilization | > 70% | Scale up | 60s |
| Memory utilization | > 85% | Scale up | 60s |
| Kafka consumer lag | > 5000 | Scale consumers | 30s |
| HTTP request latency | > 200ms P95 | Scale API | 30s |
| WebSocket connections | > 80% limit | Scale WS | 30s |
# k6 load test configuration
stages:
- duration: 30s
target: 100 # Ramp up to 100 RPS
- duration: 60s
target: 500 # Ramp up to 500 RPS
- duration: 120s
target: 1000 # Sustained 1000 RPS
- duration: 60s
target: 2000 # Peak load
- duration: 30s
target: 0 # Ramp down
thresholds:
http_req_duration: ["p95<200", "p99<500"]
http_req_failed: ["rate<0.01"]| Load (RPS) | P50 Latency | P95 Latency | P99 Latency | Error Rate |
|---|---|---|---|---|
| 100 | 25ms | 50ms | 80ms | 0% |
| 500 | 30ms | 65ms | 110ms | 0% |
| 1000 | 40ms | 95ms | 180ms | 0% |
| 1500 | 55ms | 150ms | 350ms | 0.1% |
| 2000 | 85ms | 280ms | 600ms | 2.5% |
Breaking Point: ~1800 RPS with acceptable latency (<200ms P95)
| Test | Duration | Peak Load | Result |
|---|---|---|---|
| Sustained load | 1 hour | 1000 RPS | Passed, stable |
| Spike test | 5 min | 500 → 2000 RPS | Passed, scaled in 45s |
| Soak test | 24 hours | 500 RPS | Passed, no memory leak |
| Breaking point | 10 min | Ramp to failure | Failed at 2500 RPS |
Sensor Throughput:
sensors = number of active sensors
rate = average detections per sensor per second
msg_size = average message size in bytes
total_throughput = sensors * rate * msg_size
Example:
10 OPIR sensors @ 50 msg/s = 500 msg/s
5 Radar sensors @ 20 msg/s = 100 msg/s
Total: 600 msg/s
With 1KB messages: 600 KB/s = 2.16 GB/hour
Kafka Partitions:
partitions = ceil(throughput / per_partition_limit)
With 600 msg/s and 15,000 msg/s per partition:
partitions = ceil(600 / 15000) = 1 (minimum 3 for HA)
Consumer Instances:
consumers = ceil(throughput / consumer_capacity)
With 600 msg/s and 500 msg/s per consumer:
consumers = ceil(600 / 500) = 2
| Parameter | Development | Production | Enterprise |
|---|---|---|---|
| Sensors | 2 | 10 | 50 |
| Detection rate | 20/s | 100/s | 500/s |
| Kafka partitions | 3 | 6 | 24 |
| Kafka brokers | 1 | 3 | 6 |
| Consumer replicas | 1 | 3 | 10 |
| TimescaleDB storage | 100 GB | 1 TB | 10 TB |
| API replicas | 1 | 3 | 10 |
| WebSocket replicas | 1 | 2 | 5 |
Current (2026): 10 sensors, 1000 msg/s peak
Year 1: 20 sensors, 2000 msg/s (2x capacity)
Year 2: 50 sensors, 5000 msg/s (5x capacity)
Year 3: 100 sensors, 10000 msg/s (10x capacity)
Scaling milestones:
- 2000 msg/s: Add 3 Kafka partitions, 2 consumers
- 5000 msg/s: Expand to 3-node Kafka cluster
- 10000 msg/s: Add dedicated TimescaleDB node
Document Version: 1.0 Last Updated: April 2026
FORGE implements a defense-in-depth security architecture aligned with DoD Risk Management Framework (RMF) requirements and NIST 800-53 security controls. This section documents the authentication, authorization, network security, secrets management, data protection, and compliance controls that protect FORGE systems and data.
┌─────────────────────────────────────────────────────────────────────────────┐
│ FORGE DEFENSE-IN-DEPTH SECURITY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Layer 7: Compliance & Audit │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ NIST 800-53 Controls │ DoD RMF │ Audit Logs │ Vulnerability Mgmt │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 6: Application Security │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ JWT/OAuth2 Auth │ RBAC Policies │ Input Validation │ Rate Limiting │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 5: Pod Security │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Restricted PSS │ Security Contexts │ Seccomp │ Capabilities Drop │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 4: Network Security │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Network Policies │ Service Mesh │ mTLS │ Ingress Controllers │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 3: Secrets Management │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ HashiCorp Vault │ Secret Rotation │ Dynamic Secrets │ Encryption │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 2: Data Security │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Encryption at Rest │ Encryption in Transit │ Key Management │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 1: Infrastructure Security │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Kubernetes Hardening │ Node Security │ Container Registry Trust │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
FORGE services use JSON Web Tokens (JWT) for stateless authentication. Tokens are issued by the FORGE identity provider and validated by each service.
JWT Structure:
Header:
{
"alg": "RS256",
"typ": "JWT",
"kid": "forge-key-2026-01"
}
Payload:
{
"sub": "user-uuid",
"iss": "https://forge.auth",
"aud": ["forge-c2", "forge-sensors"],
"exp": 1712345678,
"iat": 1712342078,
"roles": ["forge-operator", "track-viewer"],
"clearance": "SECRET",
"unit": "FORGE-TEAM"
}
Token Validation Flow:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Client │────▶│ API │────▶│ JWT │────▶│ RBAC │
│ Request │ │ Gateway │ │ Verify │ │ Check │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │
│ ▼ ▼
│ ┌──────────┐ ┌──────────┐
│ │ Public │ │ Allow/ │
│ │ Key │ │ Deny │
│ └──────────┘ └──────────┘
│
▼
┌──────────┐
│ Service │
│ Response│
└──────────┘
FORGE integrates with DoD identity providers via OAuth2 for enterprise SSO:
| Grant Type | Use Case | Flow |
|---|---|---|
| Authorization Code | Web applications | Standard OAuth2 flow with PKCE |
| Client Credentials | Service-to-service | Token exchange for machine identity |
| Resource Owner Password | Legacy systems | Deprecated - migration in progress |
OAuth2 Configuration:
oauth2:
issuer: https://forge.auth/oauth2
authorization_endpoint: /authorize
token_endpoint: /token
userinfo_endpoint: /userinfo
jwks_uri: /.well-known/jwks.json
scopes:
- forge:read
- forge:write
- forge:admin
- tracks:read
- tracks:write
- c2:executeFORGE implements Kubernetes-native RBAC for authorization, with custom roles aligned to operational responsibilities.
RBAC Role Definitions:
# FORGE Namespace Admin - Full control within forge namespace
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: forge-namespace-admin
labels:
nist-control: AC-3
clearance: SECRET
rules:
# Core resources
- apiGroups: [""]
resources: ["pods", "services", "configmaps", "secrets", "endpoints", "events"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# Applications
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets", "daemonsets", "replicasets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# Networking
- apiGroups: ["networking.k8s.io"]
resources: ["networkpolicies", "ingresses"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# Batch jobs
- apiGroups: ["batch"]
resources: ["jobs", "cronjobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
# FORGE Sensor Operator - Read sensor data, manage sensor deployments
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: forge-sensor-operator
namespace: forge
labels:
nist-control: AC-3
function: sensor-operations
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "services", "configmaps"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "update"]
---
# FORGE C2 Operator - Command and control operations
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: forge-c2-operator
namespace: forge
labels:
nist-control: AC-3
function: c2-operations
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "services", "endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["configmaps", "secrets"]
verbs: ["get", "list"]
resourceNames: ["c2-config", "c2-endpoints"]
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets"]
verbs: ["get", "list", "watch"]
---
# FORGE Data Pipeline - Access to Kafka and database secrets
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: forge-data-pipeline
namespace: forge
labels:
nist-control: AC-3
function: data-processing
rules:
- apiGroups: [""]
resources: ["pods", "pods/exec", "pods/log"]
verbs: ["get", "list", "watch", "create"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
resourceNames: ["forge-kafka-credentials", "forge-db-credentials"]Service Account Bindings:
| Service Account | Role | Namespace | Purpose |
|---|---|---|---|
| forge-sensor | forge-sensor-operator | forge | Sensor ingestion services |
| forge-consumer | forge-data-pipeline | forge | Kafka/TimescaleDB consumers |
| forge-c2 | forge-c2-operator | forge | Command & control service |
| forge-monitoring | forge-namespace-admin | monitoring | Metrics collection |
FORGE implements default-deny network policies with explicit allow-lists for required traffic flows.
Default Deny Policy:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: forge-default-deny
namespace: forge
labels:
nist-control: SC-7
spec:
podSelector: {}
policyTypes:
- Ingress
- EgressKafka Access Policy:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: forge-kafka-policy
namespace: forge
spec:
podSelector:
matchLabels:
app: kafka
ingress:
- from:
- namespaceSelector:
matchLabels:
name: forge
ports:
- protocol: TCP
port: 9092 # Internal broker
- protocol: TCP
port: 9093 # Controller
egress:
- to:
- podSelector:
matchLabels:
app: kafka
ports:
- protocol: TCP
port: 9092C2 API Access Policy:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: forge-c2-api-policy
namespace: forge
spec:
podSelector:
matchLabels:
app: forge-c2
ingress:
- from:
- ipBlock:
cidr: 10.0.0.0/8 # Internal network
- namespaceSelector:
matchLabels:
name: forge
ports:
- protocol: TCP
port: 8080
egress:
- to:
- namespaceSelector:
matchLabels:
name: forge
ports:
- protocol: TCP
port: 9092 # Kafka
- protocol: TCP
port: 5432 # TimescaleDBFORGE supports optional Istio service mesh deployment for enhanced traffic management and mTLS:
┌──────────────────────────────────────────────────────────────────┐
│ Istio Service Mesh │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Ingress │ │ Envoy │ │ Envoy │ │
│ │ Gateway │────▶│ Sidecar │────▶│ Sidecar │ │
│ │ │ │ (FORGE-C2) │ │ (Kafka) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ ┌────────────┴────────────┐ │ │
│ │ │ mTLS Tunnel │ │ │
│ │ │ (Automatic Encryption)│ │ │
│ │ └─────────────────────────┘ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Citadel │ │ Galley │ │
│ │ (CA/PKI) │ │ (Config) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
| Component | TLS Mode | Certificate Source |
|---|---|---|
| Ingress Gateway | TLS 1.3 | Let’s Encrypt / DoD PKI |
| Inter-service | mTLS | Istio Citadel / Vault PKI |
| Kafka | TLS | Vault PKI |
| PostgreSQL | TLS | Vault PKI |
| Vault | TLS | Self-signed (internal) |
FORGE uses HashiCorp Vault for centralized secrets management, deployed in the forge-security namespace.
Vault Configuration:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: vault
namespace: forge-security
spec:
serviceName: vault
replicas: 1
template:
spec:
containers:
- name: vault
image: hashicorp/vault:1.15
ports:
- containerPort: 8200
name: http
env:
- name: VAULT_API_ADDR
value: "http://vault:8200"
- name: VAULT_LOCAL_CONFIG
value: |
storage "file" {
path = "/vault/data"
}
listener "tcp" {
address = "0.0.0.0:8200"
tls_disable = 1
}
disable_mlock = true
ui = true
securityContext:
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]Access: - Internal:
vault.forge-security.svc.cluster.local:8200 - External
NodePort: 207.244.226.151:30820
| Engine | Purpose | Rotation Policy |
|---|---|---|
| KV v2 | Static secrets (API keys, passwords) | Manual / 90-day trigger |
| Database | Dynamic PostgreSQL credentials | 24-hour TTL, auto-renew |
| PKI | X.509 certificate issuance | 30-day TTL |
| Transit | Encryption as a service | N/A |
Secret Paths:
forge/
├── database/
│ ├── config/postgresql
│ ├── roles/forge-app
│ └── creds/forge-app
├── kv/
│ ├── forge-kafka-credentials
│ ├── forge-jwt-signing-key
│ ├── forge-api-keys
│ └── forge-encryption-keys
├── pki/
│ ├── root/generate/internal
│ ├── roles/forge-server
│ └── issue/forge-server
└── transit/
└── keys/track-encryption
┌──────────────────────────────────────────────────────────────────┐
│ Secret Rotation Workflow │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Vault │ │ Kubernetes│ │ FORGE │ │
│ │ Rotation │────▶│ Secret │────▶│ Pods │ │
│ │ Job │ │ Update │ │ Restart │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ 1. Generate │ │ │
│ │ new secret │ │ │
│ │ │ 2. Update K8s │ │
│ │ │ secret │ │
│ │ │ │ 3. Rolling │
│ │ │ │ restart │
│ │
└──────────────────────────────────────────────────────────────────┘
Rotation Schedule:
| Secret Type | Rotation Period | Automation |
|---|---|---|
| Database credentials | 24 hours | Vault dynamic secrets |
| API keys | 90 days | CronJob + Vault API |
| JWT signing keys | 365 days | Manual with coordination |
| TLS certificates | 30 days | cert-manager / Vault PKI |
| Encryption keys | Annual | Manual, with re-encryption |
| Data Store | Encryption Method | Key Management |
|---|---|---|
| TimescaleDB | AES-256 | LUKS (disk) + Vault (column) |
| PostgreSQL | AES-256 | LUKS (disk) |
| Kafka | AES-256 | LUKS (disk) |
| MinIO | AES-256-GCM | Built-in + Vault KMS |
| Persistent Volumes | LUKS | Node-level encryption |
Sensitive Data Classification:
| Classification | Data Types | Protection |
|---|---|---|
| TOP SECRET | Track trajectories, engagement plans | Vault Transit encryption |
| SECRET | System configs, credentials | Vault KV encryption |
| UNCLASSIFIED | Metrics, logs | Standard encryption |
All network communication is encrypted:
┌──────────────────────────────────────────────────────────────────┐
│ Encryption in Transit Matrix │
├──────────────────────────────────────────────────────────────────┤
│ │
│ Source │ Destination │ Protocol │ Encryption │
│ ────────────────┼──────────────────┼────────────┼────────────── │
│ Client │ Ingress Gateway │ HTTPS │ TLS 1.3 │
│ Service │ Service │ HTTP │ mTLS (Istio) │
│ Service │ Kafka │ TCP │ TLS 1.3 │
│ Service │ PostgreSQL │ TCP │ TLS 1.3 │
│ Kafka │ Kafka (repl) │ TCP │ TLS 1.3 │
│ Prometheus │ Exporters │ HTTP │ mTLS │
│ │
└──────────────────────────────────────────────────────────────────┘
FORGE enforces the Kubernetes Restricted Pod Security Standard across all workloads.
Namespace Labels:
apiVersion: v1
kind: Namespace
metadata:
name: forge
labels:
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/enforce-version: latest
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restrictedSecurity Context (Required):
securityContext:
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
seccompProfile:
type: RuntimeDefault
capabilities:
drop:
- ALLContainer Security Requirements:
| Requirement | Enforcement | Exception Process |
|---|---|---|
| Non-root user | Enforced | Security review required |
| Read-only root filesystem | Enforced | tmpfs volumes for write paths |
| No privilege escalation | Enforced | None |
| Seccomp profile | RuntimeDefault | Custom profiles documented |
| Capability dropping | ALL dropped | Security team approval |
| Resource limits | Required | Quota enforced |
OPA Gatekeeper Constraints:
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
name: forgepodsecurity
spec:
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package forge.podsecurity
violation[{"msg": msg}] {
input.review.kind.kind == "Pod"
container := input.review.object.spec.containers[_]
container.securityContext.privileged == true
msg := "Privileged containers are not allowed in FORGE"
}
violation[{"msg": msg}] {
input.review.kind.kind == "Pod"
not input.review.object.spec.securityContext.runAsNonRoot
msg := "Pods must run as non-root in FORGE"
}| Category | Events Logged | Retention |
|---|---|---|
| Authentication | Login, logout, token refresh, failed attempts | 7 years |
| Authorization | RBAC decisions, access denials, privilege escalation | 7 years |
| Data Access | Track queries, configuration reads, secret access | 90 days |
| Data Modification | Track updates, config changes, deployments | 7 years |
| System Events | Pod lifecycle, scaling events, errors | 90 days |
{
"timestamp": "2026-04-09T12:30:00Z",
"event_id": "audit-20260409-001234",
"category": "AUTHORIZATION",
"severity": "INFO",
"actor": {
"user": "operator@example.mil",
"user_id": "user-uuid-12345",
"roles": ["forge-operator"],
"clearance": "SECRET"
},
"action": "READ_TRACK",
"resource": {
"type": "track",
"id": "TRACK-2026-001",
"classification": "SECRET"
},
"result": "ALLOW",
"source": {
"ip": "10.0.1.100",
"service": "forge-c2",
"namespace": "forge"
},
"metadata": {
"query_params": {"track_id": "TRACK-2026-001"},
"response_size_bytes": 1024
}
}Infrastructure: - Primary: TimescaleDB
audit_logs hypertable - Replica: Kafka
audit-events topic (7-day retention) - Archive: MinIO cold
storage (7-year retention)
Query Interface:
-- Recent authentication failures
SELECT timestamp, actor, action, result
FROM audit_logs
WHERE category = 'AUTHENTICATION'
AND result = 'DENY'
AND timestamp > now() - interval '24 hours'
ORDER BY timestamp DESC;
-- Track access by user
SELECT actor->>'user' as user,
COUNT(*) as access_count,
array_agg(DISTINCT resource->>'type') as resources
FROM audit_logs
WHERE timestamp > now() - interval '7 days'
GROUP BY actor->>'user';FORGE aligns with DoD RMF lifecycle phases:
| Phase | Activities | FORGE Implementation |
|---|---|---|
| Categorize | System categorization | MODERATE baseline |
| Select | Control selection | NIST 800-53 rev 5 |
| Implement | Control implementation | Documented in this section |
| Assess | Control assessment | Continuous ATO |
| Authorize | ATO decision | Pending |
| Monitor | Continuous monitoring | Prometheus + Audit logs |
| Control ID | Control Name | FORGE Implementation | Status |
|---|---|---|---|
| AC-2 | Account Management | RBAC, ServiceAccounts | ✅ Implemented |
| AC-3 | Access Enforcement | RBAC policies, JWT validation | ✅ Implemented |
| AC-4 | Information Flow Enforcement | Network policies, mTLS | ✅ Implemented |
| AC-6 | Least Privilege | Role-scoped permissions | ✅ Implemented |
| AU-2 | Audit Events | Audit logging framework | ✅ Implemented |
| AU-6 | Audit Review | TimescaleDB query interface | ✅ Implemented |
| AU-11 | Audit Retention | 7-year retention policy | ✅ Implemented |
| SC-7 | Boundary Protection | Network policies, firewalls | ✅ Implemented |
| SC-8 | Transmission Confidentiality | TLS/mTLS everywhere | ✅ Implemented |
| SC-12 | Cryptographic Key Management | Vault key management | ✅ Implemented |
| SC-28 | Protection at Rest | Encryption at rest | ✅ Implemented |
| SI-2 | Flaw Remediation | Vulnerability management | 🔄 In Progress |
| SI-4 | System Monitoring | Prometheus federation | ✅ Implemented |
| SI-7 | Software Integrity | Container signing | 🔄 Planned |
┌────────────────────────────────────────────────────────────────────────────┐
│ NIST 800-53 Control Implementation Matrix │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ Control │ Category │ Implementation │ Evidence Location │
│ ────────┼─────────────────┼────────────────────────┼─────────────────── │
│ AC-2 │ Access Control │ K8s RBAC + ServiceAcct │ rbac.yaml │
│ AC-3 │ Access Control │ RoleBindings + JWT │ rbac.yaml │
│ AC-4 │ Access Control │ NetworkPolicy │ network-policy.yaml│
│ SC-7 │ System Protection│ NetworkPolicy + Firewall│ network-policy │
│ SC-8 │ System Protection│ TLS 1.3 + mTLS │ vault.yaml │
│ SC-12 │ System Protection│ Vault secrets engine │ vault.yaml │
│ SC-28 │ System Protection│ LUKS + Vault Transit │ infrastructure │
│ AU-2 │ Audit │ Audit logging service │ audit-config │
│ SI-2 │ System Integrity│ Trivy scanning │ vulnerability-mgmt│
│ SI-4 │ System Integrity│ Prometheus federation │ monitoring │
│ │
└────────────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ Vulnerability Management Pipeline │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Container │ │ Trivy │ │ CVE │ │
│ │ Build │────▶│ Scan │────▶│ Report │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌─────────────┐ │
│ │ │ Severity │ │
│ │ │ Triage │ │
│ │ └─────────────┘ │
│ │ │ │
│ │ ┌─────────────┴──────────┐ │
│ │ ▼ ▼ │
│ │ ┌─────────────┐ ┌─────────────┐
│ │ │ Critical/ │ │ Low/Med │
│ │ │ High │ │ (Queue) │
│ │ │ (Block) │ │ │
│ │ └─────────────┘ └─────────────┘
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐
│ │ Patch & │ │ Scheduled │
│ │ Rebuild │ │ Patching │
│ └─────────────┘ └─────────────┘
│ │
└──────────────────────────────────────────────────────────────────┘
| Severity | Response Time | Action Required |
|---|---|---|
| Critical | 24 hours | Block deployment, immediate patch |
| High | 72 hours | Block deployment, prioritize patch |
| Medium | 30 days | Document, schedule patch |
| Low | 90 days | Document, batch patching |
Scanning Schedule:
| Scan Type | Frequency | Tool |
|---|---|---|
| Container image | Every build | Trivy |
| Container image | Daily | Trivy (cron) |
| Kubernetes manifests | Weekly | Checkov |
| Infrastructure as Code | Weekly | tfsec |
| Runtime vulnerabilities | Continuous | Falco (planned) |
## FORGE Security Checklist
### Authentication & Authorization
- [ ] JWT validation enabled on all services
- [ ] OAuth2 integration configured
- [ ] RBAC roles follow least privilege
- [ ] Service accounts use minimal permissions
- [ ] Token expiration policies enforced
### Network Security
- [ ] Default-deny network policies applied
- [ ] Required traffic flows explicitly allowed
- [ ] TLS 1.3 configured on all ingress points
- [ ] mTLS enabled for inter-service communication
- [ ] Network policy tests passing
### Secrets Management
- [ ] Vault deployed and sealed
- [ ] Secret paths follow naming convention
- [ ] Dynamic secrets configured where applicable
- [ ] Rotation policies defined
- [ ] No secrets in ConfigMaps or code
### Data Security
- [ ] Encryption at rest enabled
- [ ] Encryption in transit verified
- [ ] Data classification documented
- [ ] Key rotation procedures tested
### Pod Security
- [ ] Restricted PSS enforced on namespace
- [ ] Security contexts defined for all pods
- [ ] Containers run as non-root
- [ ] Read-only root filesystem configured
- [ ] Seccomp profile applied
### Audit & Monitoring
- [ ] Audit logging enabled
- [ ] Log retention policy configured
- [ ] Prometheus scraping all services
- [ ] Alerting rules defined
- [ ] Audit log queries tested
### Vulnerability Management
- [ ] Container scanning in CI/CD
- [ ] CVE triage process documented
- [ ] Patching SLAs defined
- [ ] Base images updated monthly
### Compliance
- [ ] NIST 800-53 controls mapped
- [ ] Evidence collected for assessment
- [ ] RMF package prepared
- [ ] ATO documentation readyFORGE’s security architecture provides defense-in-depth protection aligned with DoD requirements:
| Layer | Key Controls | Status |
|---|---|---|
| Authentication | JWT, OAuth2, RBAC | ✅ Implemented |
| Network | Policies, mTLS, Service Mesh | ✅ Implemented |
| Secrets | Vault, rotation, dynamic creds | ✅ Implemented |
| Data | Encryption at rest/transit | ✅ Implemented |
| Pod | Restricted PSS, security contexts | ✅ Implemented |
| Audit | Comprehensive logging, retention | ✅ Implemented |
| Compliance | NIST 800-53, DoD RMF | 🔄 In Progress |
| Vulnerability | Scanning, CVE tracking | 🔄 In Progress |
The security controls documented here position FORGE for a DoD Authority to Operate (ATO) under the Risk Management Framework.
Document Version: 1.0
Classification: UNCLASSIFIED//FOR OFFICIAL USE ONLY
NIST 800-53 Baseline: MODERATE
Last Updated: April 2026
FORGE employs a comprehensive testing strategy aligned with DoD DevSecOps requirements, ensuring system reliability, performance, and compliance. The testing framework spans from unit tests to full end-to-end validation scenarios, leveraging the existing test-runner infrastructure on trooper2 and the vimi-test-suite framework.
The FORGE testing strategy follows the test pyramid model, emphasizing a strong foundation of fast unit tests with progressively fewer integration and end-to-end tests.
╱╲
╱ ╲
╱ E2E ╲ ← 10% - Full pipeline validation
╱ Tests ╲ Sensor → Display flow
╱──────────╲ Manual + Automated scenarios
╱ ╲
╱ Integration ╲ ← 20% - Component integration
╱ Tests ╲ Kafka, database, API tests
╱────────────────────╲
╱ ╲
╱ Unit Tests ╲ ← 70% - Fast, isolated tests
╱ (Go test package) ╲ Functions, methods, types
╱────────────────────────────╲
╱ ╲
╱ Mocks & Stubs ╲
╱ (testify, gomock, gock) ╲
╱──────────────────────────────────────╲
═══════════════════════════════════════════════════════════════════════════════
Execution Time: Fast ←────────────────────────────────────────────→ Slow
Cost: Low ←────────────────────────────────────────────→ High
Confidence: Unit ←────────────────────────────────────────────→ E2E
Debug Difficulty: Easy ←────────────────────────────────────────────→ Hard
FORGE components are written in Go, leveraging the built-in
testing package for unit tests. The test framework
emphasizes fast execution, comprehensive coverage, and clear test
isolation.
Test Structure:
// internal/track/correlator_test.go
package track
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCorrelator_MergeTracks(t *testing.T) {
tests := []struct {
name string
track1 *Track
track2 *Track
expectedConf float64
expectError bool
}{
{
name: "merge compatible tracks",
track1: &Track{
ID: "T001",
Position: Position{Lat: 39.0, Lon: -121.4, Alt: 15000},
Confidence: 0.85,
},
track2: &Track{
ID: "T002",
Position: Position{Lat: 39.01, Lon: -121.41, Alt: 15050},
Confidence: 0.90,
},
expectedConf: 0.88,
expectError: false,
},
{
name: "reject divergent tracks",
track1: &Track{
ID: "T001",
Position: Position{Lat: 39.0, Lon: -121.4, Alt: 15000},
},
track2: &Track{
ID: "T002",
Position: Position{Lat: 45.0, Lon: -100.0, Alt: 20000},
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
correlator := NewCorrelator(DefaultConfig())
result, err := correlator.MergeTracks(tt.track1, tt.track2)
if tt.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.InDelta(t, tt.expectedConf, result.Confidence, 0.05)
}
})
}
}| Component | Minimum Coverage | Critical Paths | Notes |
|---|---|---|---|
| Track Correlator | 80% | 95% | Multi-sensor fusion logic |
| JREAP Formatter | 85% | 100% | DoD messaging standard |
| Link 16 Formatter | 85% | 100% | MIL-STD-6016 compliance |
| Sensor Normalizer | 75% | 90% | Data transformation |
| Kafka Consumer | 70% | 85% | Message processing |
| TimescaleDB Writer | 70% | 85% | Data persistence |
| WebSocket Server | 65% | 80% | Client communication |
| Alert Generator | 80% | 95% | Threat notification |
Running Tests with Coverage:
# Run all tests with coverage report
go test -v -race -coverprofile=coverage.out ./...
# Generate HTML coverage report
go tool cover -html=coverage.out -o coverage.html
# View coverage summary
go tool cover -func=coverage.out | tail -1
# Output: total: (statements) 82.5%FORGE uses three primary mocking approaches depending on the integration point:
1. Interface Mocking (testify/mock):
// internal/kafka/mock_consumer.go
package kafka
import (
"github.com/stretchr/testify/mock"
)
type MockConsumer struct {
mock.Mock
}
func (m *MockConsumer) Consume(topic string) (*Message, error) {
args := m.Called(topic)
if msg := args.Get(0); msg != nil {
return msg.(*Message), args.Error(1)
}
return nil, args.Error(1)
}
func (m *MockConsumer) Commit() error {
return m.Called().Error(0)
}2. HTTP Mocking (gock):
// internal/api/client_test.go
package api
import (
"testing"
"github.com/h2non/gock"
"github.com/stretchr/testify/assert"
)
func TestExternalAPIClient(t *testing.T) {
defer gock.Off()
gock.New("https://external-feed.mil").
Get("/tracks").
Reply(200).
JSON(map[string]interface{}{
"tracks": []map[string]interface{}{
{"id": "EXT001", "lat": 39.0, "lon": -121.4},
},
})
client := NewClient("https://external-feed.mil")
tracks, err := client.FetchTracks()
assert.NoError(t, err)
assert.Len(t, tracks, 1)
}3. Database Mocking (testcontainers or sqlmock):
// internal/storage/timescaledb_test.go
package storage
import (
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"
)
func TestTimescaleDB_InsertTrack(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectBegin()
mock.ExpectExec(`INSERT INTO tracks`).
WithArgs("T001", 39.0, -121.4, 15000.0, 0.85).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
tsdb := &TimescaleDB{db: db}
err = tsdb.InsertTrack(context.Background(), &Track{
ID: "T001", Lat: 39.0, Lon: -121.4, Alt: 15000, Confidence: 0.85,
})
require.NoError(t, err)
require.NoError(t, mock.ExpectationsWereMet())
}Integration tests validate the interactions between FORGE components, focusing on message flow through Kafka and data persistence to TimescaleDB.
Integration Test Configuration:
# configs/integration-test.yaml
integration:
kafka:
broker: "kafka.vimi-test.svc.cluster.local:9092"
topics:
- forge.sensors.raw
- forge.tracks
- forge.link16
consumer_group: "test-integration"
database:
host: "timescaledb.vimi-test.svc.cluster.local"
port: 5432
name: "forge_test"
user: "test_runner"
password: "${DB_PASSWORD}"
timeout:
connection: 30s
message_wait: 60s
cleanup: 10sFORGE uses testcontainers for isolated Kafka testing:
// internal/integration/kafka_test.go
//go:build integration
package integration
import (
"context"
"testing"
"time"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/segmentio/kafka-go"
)
func TestKafkaIntegration(t *testing.T) {
ctx := context.Background()
// Start Kafka container
kafkaContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kafka:7.5.0",
ExposedPorts: []string{"9093/tcp"},
Env: map[string]string{
"KAFKA_BROKER_ID": "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "PLAINTEXT:PLAINTEXT",
"KAFKA_ADVERTISED_LISTENERS": "PLAINTEXT://localhost:9092",
},
WaitingFor: wait.ForListeningPort("9093/tcp"),
},
Started: true,
})
require.NoError(t, err)
defer kafkaContainer.Terminate(ctx)
// Get container host/port
host, err := kafkaContainer.Host(ctx)
require.NoError(t, err)
port, err := kafkaContainer.MappedPort(ctx, "9093")
require.NoError(t, err)
// Create topic and test message flow
broker := fmt.Sprintf("%s:%s", host, port.Port())
conn, err := kafka.DialLeader(ctx, "tcp", broker, "forge.sensors.raw", 0)
require.NoError(t, err)
defer conn.Close()
// Publish test message
err = conn.WriteMessages(kafka.Message{
Key: []byte("test-track"),
Value: []byte(`{"id":"T001","lat":39.0,"lon":-121.4,"alt":15000}`),
})
require.NoError(t, err)
// Consume and validate
batch, err := conn.ReadBatch(0, 1e6)
require.NoError(t, err)
defer batch.Close()
var msg kafka.Message
for {
msg, err = batch.ReadMessage()
if err != nil {
break
}
assert.Contains(t, string(msg.Value), "T001")
}
}TimescaleDB integration tests use a separate test schema:
// internal/integration/database_test.go
//go:build integration
package integration
import (
"testing"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/stretchr/testify/require"
)
func TestTimescaleDB_TrackStorage(t *testing.T) {
dsn := "postgres://test_runner:test@localhost:5432/forge_test?sslmode=disable"
db, err := sqlx.Connect("postgres", dsn)
require.NoError(t, err)
defer db.Close()
// Create hypertable
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS test_tracks (
time TIMESTAMPTZ NOT NULL,
track_id TEXT NOT NULL,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
altitude DOUBLE PRECISION,
confidence DOUBLE PRECISION,
sensor_id TEXT
);
SELECT create_hypertable('test_tracks', 'time', if_not_exists => true);
`)
require.NoError(t, err)
// Insert test data
now := time.Now()
for i := 0; i < 100; i++ {
_, err = db.Exec(`
INSERT INTO test_tracks (time, track_id, latitude, longitude, altitude, confidence, sensor_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, now.Add(time.Duration(i)*time.Second), fmt.Sprintf("T%03d", i), 39.0+float64(i)*0.01, -121.4, 15000.0, 0.85, "SBIRS-GEO-1")
require.NoError(t, err)
}
// Query and validate
var count int
err = db.Get(&count, "SELECT COUNT(*) FROM test_tracks WHERE track_id LIKE 'T%'")
require.NoError(t, err)
require.Equal(t, 100, count)
// Cleanup
_, err = db.Exec("DROP TABLE test_tracks")
require.NoError(t, err)
}End-to-end tests validate the complete sensor-to-display data flow using the vimi-test-suite:
# test-scenarios/e2e_boost_phase.yaml
name: e2e_boost_phase_detection
description: Full pipeline validation from OPIR sensor to Link 16 output
duration: 120s
infrastructure:
namespace: vimi-test
components:
- opir-simulator
- radar-simulator
- track-correlator
- data-consumer
- link16-formatter
- kafka
- timescaledb
steps:
- name: deploy_infrastructure
action: helm_upgrade
chart: charts/vimi-test-suite
values:
opir.enabled: true
radar.enabled: true
correlator.enabled: true
consumer.enabled: true
link16.enabled: true
timeout: 300s
- name: wait_for_ready
action: kubectl_wait
condition: ready
selector: app.kubernetes.io/part-of=vimi-test-suite
timeout: 120s
- name: inject_scenario
action: kubectl_apply
manifest: scenarios/icbm_single_launch.yaml
timeout: 10s
- name: validate_detection
action: kafka_consume
topic: forge.sensors.raw
timeout: 30s
assertions:
- message_count: ">0"
- detection_latency_ms: "<500"
- sensor_id: "SBIRS-GEO-1"
- name: validate_correlation
action: kafka_consume
topic: forge.tracks
timeout: 60s
assertions:
- message_count: ">0"
- correlation_confidence: ">0.85"
- position_error_km: "<10"
- name: validate_link16_output
action: kafka_consume
topic: forge.link16
timeout: 30s
assertions:
- j12_format: valid
- j70_format: valid
- track_number_assigned: true
- name: validate_storage
action: database_query
query: "SELECT COUNT(*) FROM tracks WHERE time > NOW() - INTERVAL '2 minutes'"
expected: ">0"
- name: cleanup
action: helm_uninstall
release: vimi-test
timeout: 60s┌─────────────────────────────────────────────────────────────────────────────┐
│ E2E TEST EXECUTION FLOW │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. SETUP PHASE (60s timeout) │
│ ┌─────────────────┐ │
│ │ Deploy vimi-test │──▶ Create namespace vimi-test │
│ │ suite via Helm │──▶ Deploy Kafka + TimescaleDB │
│ └─────────────────┘──▶ Deploy all simulators + processors │
│ │
│ 2. SCENARIO INJECTION (5s) │
│ ┌─────────────────┐ │
│ │ Apply scenario │──▶ ICBM launch from [39.0°, 127.0°] │
│ │ YAML manifest │──▶ Target trajectory pre-defined │
│ └─────────────────┘──▶ Sensors receive detection request │
│ │
│ 3. VALIDATION PHASE (parallel) │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Kafka Topic │ │ Kafka Topic │ │ TimescaleDB │ │
│ │ forge.sensors │ │ forge.tracks │ │ Query │ │
│ │ .raw │ │ │ │ │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ASSERTION ENGINE │ │
│ │ ✓ Detection latency < 500ms │ │
│ │ ✓ Correlation confidence > 0.85 │ │
│ │ ✓ Position error < 10km │ │
│ │ ✓ J12/J70/J73 valid format │ │
│ │ ✓ Track number assigned │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 4. CLEANUP PHASE (60s) │
│ ┌─────────────────┐ │
│ │ Helm uninstall │──▶ Remove vimi-test namespace │
│ │ vimi-test │──▶ Clean up Kafka topics │
│ └─────────────────┘──▶ Purge test database │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
FORGE uses k6 for load testing, integrated with the test-runner infrastructure on trooper2:
// scripts/load/track_throughput.js
import { check } from 'k6';
import { Kafka } from 'k6/x/kafka';
const kafka = new Kafka({
brokers: ['kafka.forge.svc.cluster.local:9092'],
});
export const options = {
scenarios: {
// Ramp-up scenario for baseline
ramp_up: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '30s', target: 10 },
{ duration: '1m', target: 50 },
{ duration: '30s', target: 100 },
{ duration: '1m', target: 100 },
{ duration: '30s', target: 0 },
],
gracefulRampDown: '30s',
},
// Stress test
stress: {
executor: 'constant-vus',
vus: 200,
duration: '5m',
startTime: '5m',
},
// Spike test
spike: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '10s', target: 500 },
{ duration: '30s', target: 500 },
{ duration: '10s', target: 0 },
],
startTime: '10m',
},
},
thresholds: {
'http_req_duration': ['p(95)<500'], // 95% under 500ms
'http_req_failed': ['rate<0.01'], // Less than 1% failures
'kafka_write_duration': ['p(99)<200'], // 99% under 200ms
'iterations': ['count>10000'], // Minimum throughput
},
};
export default function () {
// Generate synthetic track
const track = {
id: `TRACK-${__VU}-${__ITER}`,
timestamp: Date.now(),
position: {
lat: 39.0 + Math.random() * 10 - 5,
lon: -121.4 + Math.random() * 10 - 5,
alt: 15000 + Math.random() * 5000,
},
velocity: {
vx: Math.random() * 1000,
vy: Math.random() * 1000,
vz: Math.random() * 100,
},
confidence: 0.85 + Math.random() * 0.15,
sensor_id: `SENSOR-${Math.floor(Math.random() * 5)}`,
track_type: 'ballistic',
};
// Produce to Kafka
const result = kafka.produce({
topic: 'forge.sensors.raw',
key: track.id,
value: JSON.stringify(track),
});
check(result, {
'track published': (r) => r.error_code === 0,
'latency acceptable': (r) => r.latency < 200,
});
}
export function teardown() {
kafka.close();
}Running k6 Tests:
# Local development
k6 run scripts/load/track_throughput.js
# Against Kubernetes deployment
k6 run --out influxdb=http://influxdb.forge.svc:8086/k6 \
scripts/load/track_throughput.js
# Cloud execution via test-runner
kubectl apply -f k6-runner-job.yaml -n forge
kubectl logs -f job/k6-track-throughput -n forgeStress tests validate system behavior under extreme load:
# test-scenarios/stress_high_volume.yaml
name: stress_high_volume
description: Validate system behavior at 10x normal load
parameters:
normal_rate: 1000 # tracks/second
stress_rate: 10000 # tracks/second (10x)
duration: 300s # 5 minutes at peak
infrastructure:
kafka:
partitions: 12
replication_factor: 3
consumers:
replicas: 10
consumer_threads: 20
metrics:
- name: track_ingestion_rate
type: counter
threshold: ">9500/s"
- name: kafka_lag
type: gauge
threshold: "<100000"
- name: consumer_latency_p99
type: histogram
threshold: "<500ms"
- name: timescaledb_write_latency
type: histogram
threshold: "<100ms"
- name: memory_usage
type: gauge
threshold: "<85%"
- name: cpu_usage
type: gauge
threshold: "<90%"
validation:
- no_message_loss: true
- no_consumer_restart: true
- kafka_lag_drained_within: "60s"
- all_partitions_rebalanced: trueFORGE uses chaos engineering principles to validate resilience. The chaos testing framework integrates with Kubernetes fault injection.
# test-scenarios/chaos_kafka_failure.yaml
name: chaos_kafka_broker_failure
description: Validate system resilience to Kafka broker failure
infrastructure:
namespace: forge
chaos_mesh: true
steps:
- name: establish_baseline
action: metrics_capture
duration: 60s
metrics:
- track_ingestion_rate
- consumer_latency_p99
- error_rate
- name: inject_chaos
action: chaos_mesh
kind: PodChaos
spec:
action: pod-kill
mode: one
selector:
namespaces: [forge]
labelSelectors:
app.kubernetes.io/name: kafka
gracePeriod: 0
- name: observe_recovery
action: metrics_capture
duration: 120s
expectations:
- consumer_reconnect_time: "<30s"
- message_loss: "0"
- track_ingestion_rate_recovery: ">90% of baseline"
- name: validate_no_data_loss
action: database_query
query: |
SELECT COUNT(*) FROM tracks
WHERE time BETWEEN NOW() - INTERVAL '3 minutes' AND NOW()
expected: ">0"
- name: cleanup_chaos
action: chaos_mesh_cleanupChaos test scenarios for FORGE components:
| Scenario | Target | Expected Behavior |
|---|---|---|
| Kafka broker kill | Kafka cluster | Consumers reconnect, no data loss |
| Network partition (partial) | Network | Remaining brokers handle traffic |
| Consumer OOM | forge-consumer | Pod restart, resume from last offset |
| TimescaleDB connection loss | Database | Buffer messages, retry with backoff |
| DNS resolution failure | CoreDNS | Cached connections continue |
| Node failure | Kubernetes node | Pods rescheduled, state restored |
| High CPU throttle | All pods | Graceful degradation, no crashes |
Chaos Mesh Configuration:
# chaos/network-partition.yaml
apiVersion: chaos-mesh.org/v1alpha1
kind: NetworkChaos
metadata:
name: forge-kafka-partition
namespace: forge
spec:
action: partition
mode: all
selector:
namespaces: [forge]
labelSelectors:
app.kubernetes.io/name: forge-consumer
direction: to
target:
selector:
namespaces: [forge]
labelSelectors:
app.kubernetes.io/name: kafka
mode: one
duration: "60s"FORGE maintains comprehensive regression test suites executed on every commit:
# .gitlab-ci.yml - FORGE regression pipeline
stages:
- lint
- unit
- integration
- e2e
- performance
- regression
# Lint stage
lint:
stage: lint
tags: [trooper2]
script:
- golangci-lint run --config .golangci.yml ./...
- go fmt ./...
- go vet ./...
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
# Unit tests
unit-test:
stage: unit
tags: [trooper2]
script:
- go test -v -race -coverprofile=coverage.out ./...
- go tool cover -func=coverage.out | grep total
coverage: '/total:\s+\(statements\)\s+(\d+\.?\d*)%/'
artifacts:
paths:
- coverage.out
reports:
coverage_report:
coverage_format: cobertura
path: coverage.out
# Integration tests
integration-test:
stage: integration
tags: [trooper2]
services:
- name: confluentinc/cp-kafka:7.5.0
alias: kafka
- name: timescale/timescaledb:latest-pg15
alias: timescaledb
variables:
KAFKA_BROKER: "kafka:9092"
DATABASE_URL: "postgres://test@timescaledb:5432/forge_test"
script:
- go test -tags=integration -v ./internal/integration/...
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_BRANCH == "main"
# End-to-end tests
e2e-test:
stage: e2e
tags: [test-runner]
script:
- kubectl create namespace forge-e2e-${CI_COMMIT_SHA}
- helm upgrade --install forge-e2e ./charts/forge
--namespace forge-e2e-${CI_COMMIT_SHA}
--set replicaCount=1
--wait --timeout 300s
- ./scripts/run-e2e-tests.sh
- helm uninstall forge-e2e --namespace forge-e2e-${CI_COMMIT_SHA}
- kubectl delete namespace forge-e2e-${CI_COMMIT_SHA}
rules:
- if: $CI_COMMIT_BRANCH == "main"
- if: $CI_COMMIT_TAG
# Performance regression
performance-regression:
stage: performance
tags: [test-runner]
script:
- k6 run scripts/load/track_throughput.js --out json=performance-results.json
- ./scripts/compare-performance.sh performance-results.json baseline.json
artifacts:
paths:
- performance-results.json
rules:
- if: $CI_COMMIT_BRANCH == "main"
# Full regression suite
regression-suite:
stage: regression
tags: [test-runner]
script:
- ./scripts/regression/run-all-scenarios.sh
artifacts:
paths:
- regression-results/
- junit/
reports:
junit: junit/*.xml
rules:
- if: $CI_COMMIT_BRANCH == "main"
- if: $CI_COMMIT_TAG =~ /^v\d+\.\d+\.\d+$/| Test Level | Coverage Target | Critical Path Coverage | Execution Time |
|---|---|---|---|
| Unit | 80% | 95% | <2 minutes |
| Integration | 70% | 85% | <5 minutes |
| End-to-End | 60% | 80% | <15 minutes |
| Performance | N/A | Key scenarios | <10 minutes |
| Chaos | N/A | Failure modes | <30 minutes |
| Regression | N/A | All scenarios | <60 minutes |
Each FORGE component has defined acceptance criteria:
Track Correlator:
| Criterion | Threshold | Measurement Method |
|---|---|---|
| Correlation latency | <100ms P99 | Prometheus histogram |
| Position error | <10km RMSE | Ground truth comparison |
| Velocity error | <0.5km/s | Kalman filter validation |
| Correlation confidence | >0.85 | Algorithm output |
| Multi-sensor merge | >95% success | Scenario test coverage |
JREAP Formatter:
| Criterion | Threshold | Measurement Method |
|---|---|---|
| Message format | 100% valid | MIL-STD-3011 schema validation |
| Encoding time | <10ms | Benchmark test |
| Field population | 100% required | Schema compliance check |
| Track number assignment | Unique | Database constraint |
Link 16 Formatter:
| Criterion | Threshold | Measurement Method |
|---|---|---|
| J12 format | 100% valid | MIL-STD-6016 validation |
| J70 format | 100% valid | MIL-STD-6016 validation |
| J73 format | 100% valid | MIL-STD-6016 validation |
| Message rate | >100 msg/s | Load test |
System-level pass/fail criteria:
# validation/system-thresholds.yaml
system_validation:
throughput:
track_ingestion_rate:
minimum: 1000
unit: tracks/second
severity: critical
kafka_publish_latency:
maximum: 100
unit: milliseconds
percentile: 99
severity: warning
latency:
end_to_end_processing:
maximum: 500
unit: milliseconds
percentile: 95
severity: critical
sensor_to_track:
maximum: 200
unit: milliseconds
percentile: 99
severity: critical
reliability:
message_loss:
maximum: 0
unit: count
severity: critical
consumer_restart_count:
maximum: 2
unit: count
time_window: 1h
severity: warning
data_quality:
track_position_accuracy:
maximum: 10
unit: kilometers
severity: critical
track_velocity_accuracy:
maximum: 0.5
unit: km/s
severity: warning
correlation_confidence:
minimum: 0.85
severity: warning
compliance:
jreap_message_validity:
minimum: 100
unit: percent
severity: critical
link16_message_validity:
minimum: 100
unit: percent
severity: criticalFORGE uses synthetic data generators for realistic test scenarios:
// internal/testutil/generator.go
package testutil
import (
"math/rand"
"time"
)
type TrackGenerator struct {
rng *rand.Rand
baseTime time.Time
launchSite GeoPoint
targetSite GeoPoint
}
type GeoPoint struct {
Lat float64
Lon float64
Alt float64
}
func NewTrackGenerator() *TrackGenerator {
return &TrackGenerator{
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
baseTime: time.Now(),
launchSite: GeoPoint{Lat: 39.0, Lon: 127.0, Alt: 0},
targetSite: GeoPoint{Lat: 40.0, Lon: -121.4, Alt: 0},
}
}
// GenerateBallisticTrack creates a realistic ballistic missile trajectory
func (g *TrackGenerator) GenerateBallisticTrack(id string, startTime time.Time) []*Track {
var trajectory []*Track
// ICBM flight profile: boost (180s), midcourse (1200s), terminal (120s)
phases := []struct {
name string
duration time.Duration
dt time.Duration
}{
{"boost", 180 * time.Second, 100 * time.Millisecond},
{"midcourse", 1200 * time.Second, 200 * time.Millisecond},
{"terminal", 120 * time.Second, 50 * time.Millisecond},
}
t := startTime
for _, phase := range phases {
end := t.Add(phase.duration)
for t.Before(end) {
track := g.computeTrajectoryPoint(id, t, phase.name)
trajectory = append(trajectory, track)
t = t.Add(phase.dt)
}
}
return trajectory
}
func (g *TrackGenerator) computeTrajectoryPoint(id string, t time.Time, phase string) *Track {
// Simplified trajectory calculation
elapsed := t.Sub(g.baseTime).Seconds()
// Boost phase: accelerating upward
// Midcourse: ballistic arc
// Terminal: decelerating downward
var lat, lon, alt, vx, vy, vz float64
switch phase {
case "boost":
lat = g.launchSite.Lat + elapsed*0.01
lon = g.launchSite.Lon + elapsed*0.005
alt = elapsed * elapsed * 10 // Quadratic rise
vx = 0.01
vy = 0.005
vz = elapsed * 10
case "midcourse":
// Ballistic trajectory
tNorm := (elapsed - 180) / 1200
lat = g.launchSite.Lat + tNorm*(g.targetSite.Lat-g.launchSite.Lat)
lon = g.launchSite.Lon + tNorm*(g.targetSite.Lon-g.launchSite.Lon)
alt = 1500000 - (tNorm-0.5)*(tNorm-0.5)*4000000 // Parabolic arc
vx = (g.targetSite.Lat - g.launchSite.Lat) / 1200
vy = (g.targetSite.Lon - g.launchSite.Lon) / 1200
vz = 0
case "terminal":
tNorm := (elapsed - 1380) / 120
lat = g.targetSite.Lat - (1-tNorm)*(g.targetSite.Lat-g.launchSite.Lat)*0.1
lon = g.targetSite.Lon - (1-tNorm)*(g.targetSite.Lon-g.launchSite.Lon)*0.1
alt = 1500000 * (1 - tNorm)
vx = (g.targetSite.Lat - g.launchSite.Lat) / 1200 * 1.5
vy = (g.targetSite.Lon - g.launchSite.Lon) / 1200 * 1.5
vz = -elapsed * 100
}
return &Track{
ID: id,
Timestamp: t,
Position: Position{Lat: lat, Lon: lon, Alt: alt},
Velocity: Velocity{Vx: vx, Vy: vy, Vz: vz},
Confidence: 0.85 + g.rng.Float64()*0.15,
SensorID: "SYNTHETIC",
TrackType: "ballistic",
}
}Test fixtures provide pre-configured scenarios:
# test-fixtures/scenarios/multi_sensor_correlation.yaml
name: multi_sensor_correlation
description: OPIR + Radar correlation test case
version: 1.0.0
sensors:
- id: SBIRS-GEO-1
type: opir
position: [0.0, -95.0, 35786000] # GEO position
bands: [SWIR, MWIR]
detection_prob: 0.85
false_alarm_rate: 1e-6
- id: UEWR-BEALE
type: radar
position: [39.1376, -121.3547, 0]
frequency: 0.44 # GHz
max_range: 5000 # km
scan_rate: 0.2 # Hz
targets:
- id: ICBM-001
type: icbm
launch_point: [39.0, 127.0, 0]
launch_time: "2026-04-01T00:00:00Z"
trajectory: ballistic_standard
rv_count: 1
decoy_count: 2
timeline:
- time: "T+0s"
event: launch_detected
sensors: [SBIRS-GEO-1]
expected_confidence: 0.80
- time: "T+60s"
event: radar_acquisition
sensors: [UEWR-BEALE]
expected_confidence: 0.90
- time: "T+120s"
event: track_correlation
sensors: [SBIRS-GEO-1, UEWR-BEALE]
expected_confidence: 0.92
expected_position_error_km: 8.5
- time: "T+180s"
event: track_confirmed
sensors: [SBIRS-GEO-1, UEWR-BEALE]
expected_confidence: 0.95
expected_position_error_km: 5.2
expected_outputs:
- topic: forge.sensors.raw
message_count_min: 1000
message_count_max: 1500
- topic: forge.tracks
message_count_min: 100
message_count_max: 200
track_id_unique: 1
correlation_confidence_min: 0.85
- topic: forge.link16
message_count_min: 50
message_count_max: 100
j12_valid: true
j70_valid: true
validation:
detection_latency_max_ms: 500
correlation_latency_max_ms: 100
position_error_max_km: 10
confidence_min: 0.85The test-runner component on trooper2 provides:
# test-runner deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-runner
namespace: forge
spec:
replicas: 1
selector:
matchLabels:
app: test-runner
template:
metadata:
labels:
app: test-runner
spec:
containers:
- name: test-runner
image: forge/test-runner:latest
env:
- name: KAFKA_BROKER
value: "kafka.forge.svc.cluster.local:9092"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: forge-db-credentials
key: url
volumeMounts:
- name: test-scenarios
mountPath: /scenarios
- name: test-results
mountPath: /results
volumes:
- name: test-scenarios
configMap:
name: test-scenarios
- name: test-results
persistentVolumeClaim:
claimName: test-results-pvcThe vimi-test-suite provides DoD-compliant test scenarios:
# Deploy test suite
helm install vimi-test ./charts/vimi-test-suite \
--namespace vimi-test \
--create-namespace \
--set opir.enabled=true \
--set radar.enabled=true \
--set correlator.enabled=true \
--set testRunner.enabled=true
# Execute specific test scenario
kubectl apply -f scenarios/boost_phase_detection.yaml -n vimi-test
# Monitor test execution
kubectl logs -f deployment/test-runner -n vimi-test
# Collect results
kubectl cp vimi-test/test-runner-xxx:/results ./test-resultsFORGE’s testing and validation framework ensures system reliability, performance, and DoD compliance through:
The test infrastructure leverages existing components (test-runner on trooper2, vimi-test-suite Helm charts) to provide consistent, repeatable testing across all environments.
Section 8: Testing & Validation - FORGE System Design Version 1.0.0
FORGE’s modular architecture provides a foundation for continuous enhancement. This section outlines planned capabilities, architectural evolution, integration pathways, and research directions that will extend the platform’s utility for missile defense simulation and operational planning.
The current track correlation system fuses data from OPIR satellites, radar systems, and external feeds (AIS/ADS-B) through the VIMI correlator. Future development will implement advanced fusion techniques:
Probabilistic Data Association (PDA): Replace the current nearest-neighbor correlation with a probabilistic approach that weights multiple track hypotheses, reducing miscorrelation in dense target environments. PDA maintains multiple association hypotheses and computes weighted state estimates, improving tracking accuracy when targets maneuver or cross paths.
Track Quality Scoring: Each sensor contribution will receive a dynamic quality score based on sensor type, geometric dilution of precision (GDOP), environmental conditions, and historical accuracy. The fusion algorithm will weight track updates proportionally to quality scores, automatically favoring more reliable sources.
Multi-Hypothesis Tracking (MHT): For complex scenarios with overlapping tracks, MHT will maintain multiple track hypotheses over time, pruning low-probability branches as new data arrives. This addresses the “ghost track” problem where a single target generates multiple correlated tracks from different sensor perspectives.
Machine learning will augment rule-based correlation algorithms for improved accuracy and reduced false positives:
Anomaly Detection: Neural networks trained on historical track data will identify anomalous patterns that may indicate sensor malfunction, deliberate deception, or previously uncharacterized threat behaviors. The system will flag unusual correlation confidence scores for operator review.
Adaptive Learning: Online learning algorithms will continuously refine correlation parameters based on operator feedback. When operators accept or reject correlation decisions, the system learns to adjust future behavior, reducing the cognitive burden of manual correlation overrides.
Predictive Association: Sequence-to-sequence models will predict likely track associations before all sensor data arrives, enabling faster correlation in time-critical scenarios. The system will pre-position hypotheses and validate them as additional sensor data becomes available.
Current track state estimation uses Kalman filters with constant-velocity or constant-acceleration models. Future enhancements will incorporate:
Maneuver Detection: Interactive Multiple Model (IMM) filters will detect and adapt to target maneuvers in real-time. The system will transition between motion models (constant velocity, turning, accelerating) based on observed behavior, maintaining track continuity through aggressive maneuvers.
Impact Point Prediction: Ballistic trajectory prediction will estimate impact points and times for threat assessment. The system will propagate track states forward using orbital mechanics for exo-atmospheric phases and atmospheric drag models for terminal phases.
Launch Point Estimation: Back-propagation algorithms will estimate launch locations from track geometry, enabling attribution and threat characterization. This supports counter-fire planning and intelligence analysis.
FORGE’s current microservices architecture will evolve toward finer-grained components:
┌─────────────────────────────────────────────────────────────────────────────────┐
│ FUTURE ARCHITECTURE (2026-2028) │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ CURRENT STATE (2026) FUTURE STATE (2028) │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ FORGE-C2 │ │ FORGE-C2 SERVICE MESH │ │
│ │ (Monolithic) │ │ ┌───────────┐ ┌───────────┐ │ │
│ │ │ ───────────────▶ │ │ Track │ │ Threat │ │ │
│ │ - JREAP │ │ │ Manager │ │ Assessor │ │ │
│ │ - Link 16 │ │ └───────────┘ └───────────┘ │ │
│ │ - WebSocket │ │ ┌───────────┐ ┌───────────┐ │ │
│ │ - Alert Gen │ │ │ Engage │ │ Message │ │ │
│ │ - Track Mgmt │ │ │ Coord │ │ Gateway │ │ │
│ └─────────────────┘ │ └───────────┘ └───────────┘ │ │
│ │ ┌───────────┐ ┌───────────┐ │ │
│ ┌─────────────────┐ │ │ Alert │ │ WebSocket │ │ │
│ │ FORGE-Consumer │ │ │ Generator │ │ Broadcaster│ │ │
│ │ (Pipeline) │ │ └───────────┘ └───────────┘ │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ VIMI Correlator │ │ VIMI FABRIC (Event-Sourced) │ │
│ │ (Single Service)│ ───────────────▶ │ ┌─────────────────────────────┐ │ │
│ │ │ │ │ Command Handler │ │ │
│ │ │ │ ├─────────────────────────────┤ │ │
│ │ │ │ │ Event Store (Kafka Log) │ │ │
│ │ │ │ ├─────────────────────────────┤ │ │
│ │ │ │ │ Projection Layer │ │ │
│ │ │ │ │ (Read Models) │ │ │
│ │ │ │ └─────────────────────────────┘ │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
Service Decomposition: FORGE-C2 will decompose into specialized services (Track Manager, Threat Assessor, Engagement Coordinator, Message Gateway, Alert Generator). Each service will own its data and communicate through well-defined gRPC interfaces.
Domain-Driven Design: Services will align with bounded contexts from the missile defense domain: Track Management, Threat Assessment, Engagement Planning, and External Communications. This enables independent deployment cycles and team ownership.
The current architecture uses Kafka as a message broker with stateful services maintaining local caches. Future development will implement full event sourcing:
Event Store: All state changes will be captured as immutable events stored in Kafka compacted topics. The event log becomes the system of record, enabling event replay, temporal queries, and audit trails.
Command Query Responsibility Segregation (CQRS): Write paths (commands) will be separated from read paths (queries). Command handlers validate and emit events. Query handlers subscribe to events and maintain optimized read models for specific use cases (track display, alert generation, historical analysis).
Benefits: Event sourcing provides complete audit history for after-action review, enables point-in-time system state reconstruction, supports event replay for debugging and testing, and decouples services through event-driven communication.
Integration with external command and control systems will extend FORGE’s utility in joint and coalition environments:
| System | Integration Type | Protocol | Timeline | Priority |
|---|---|---|---|---|
| JADC2 (Joint All-Domain C2) | Bi-directional | JREAP-C over STANAG 4609 | Q2 2027 | P0 |
| AFATDS (Army Field Artillery) | Outbound | VMF (Variable Message Format) | Q4 2027 | P1 |
| AEGIS (Navy Combat System) | Bi-directional | Link 16 / CEC (Cooperative Engagement Capability) | Q3 2027 | P0 |
| Patriot ICC (Air Defense) | Bi-directional | JREAP-B / Link 16 | Q1 2027 | P1 |
| THAAD C2BMC | Bi-directional | JREAP-C / Custom XML | Q2 2028 | P2 |
JADC2 Integration: The Joint All-Domain Command and Control initiative represents the future of joint force coordination. FORGE will implement JADC2 data exchange through STANAG 4609 (JREAP-C over IP networks), enabling track sharing across service boundaries and support for multi-domain operations.
Coalition operations require interoperability with NATO partner networks:
MIDS Terminal Emulation: Software emulation of Multifunctional Information Distribution System (MIDS) terminals will enable participation in NATO Link 16 networks without dedicated hardware. The emulator will implement MIL-STD-6016 message formatting and network participation groups.
Network Participation Groups (NPGs): FORGE will support multiple NPGs simultaneously: PPLI (Precise Participant Location and Identification), Surveillance, Electronic Warfare, and Weapon Coordination. Each NPG operates as a separate logical network within the Link 16 architecture.
Security Classification Handling: Multi-level security (MLS) integration will enable track sharing at appropriate classification levels. FORGE will implement security labeling, downgrading workflows, and cross-domain solutions for coalition data exchange.
Beyond NATO, FORGE will support broader coalition partnerships:
Five Eyes Integration: Data sharing protocols with Australia, Canada, New Zealand, and the United Kingdom through secure gateway interfaces. This includes track sharing, sensor data exchange, and coordinated engagement planning.
Bilateral Agreements: Configurable data sharing policies for specific partner nations, with fine-grained control over what data elements are shared (track position yes, launch point estimate no, for example).
Track correlation and state estimation algorithms will leverage GPU parallelism:
CUDA-Accelerated Correlation: The GNN (Global Nearest Neighbor) correlation algorithm scales poorly with track count O(n³). GPU implementation will evaluate all association hypotheses in parallel, reducing correlation time from seconds to milliseconds for large track counts.
Kalman Filter Batches: Processing thousands of simultaneous Kalman filters on GPU enables real-time state estimation for dense threat environments. Each track update becomes a GPU kernel launch operating on batched track state vectors.
Machine Learning Inference: ONNX Runtime with CUDA execution providers will accelerate neural network inference for anomaly detection and predictive association. GPU acceleration reduces inference latency from ~100ms to <5ms per batch.
Hardware acceleration for latency-critical paths:
Protocol Encoding: JREAP and Link 16 message encoding will move to FPGA for sub-microsecond message generation. This is particularly important for time-critical engagement messages (J7.4 Engage Order, J9.0 Engagement Status).
Sensor Data Preprocessing: Raw sensor data normalization and validation will execute on FPGA before reaching the software pipeline. This includes coordinate transformations, duplicate filtering, and format validation.
Timing Requirements: FPGA processing targets <100µs end-to-end latency for critical message paths, compared to ~10ms for software-only processing.
Distributed processing at sensor sites reduces data transfer latency:
Edge Track Correlators: Lightweight correlators deployed at sensor locations will perform initial track processing before forwarding to the central FORGE cluster. This reduces bandwidth requirements and enables continued operation during network degradation.
Tactical Edge Nodes: Kubernetes deployments on tactical hardware (ruggedized servers, airborne platforms) will extend FORGE capabilities to forward-deployed units with limited connectivity.
Store-and-Forward: Edge nodes will buffer track data during network outages, synchronizing with the central cluster when connectivity resumes. Conflict resolution algorithms will handle divergent track states.
FORGE will implement zero-trust security principles throughout the architecture:
Service Mesh Authentication: All service-to-service communication will require mutual TLS (mTLS) with certificate rotation. Istio or Linkerd service mesh will provide automatic certificate management and encrypted channels.
Just-in-Time Access: Operators will receive time-limited, scoped access tokens through a centralized identity provider. Access will be granted based on mission role, clearance level, and operational need.
Continuous Verification: Every request will be authenticated and authorized, regardless of network location. Internal services will validate tokens on every call, eliminating implicit trust within the cluster boundary.
Protecting sensitive data in use:
SGX Enclaves: Intel Software Guard Extensions (SGX) will protect cryptographic keys and sensitive track data during processing. Enclaves isolate computation from the host operating system, protecting against privileged attackers.
Secure Multi-Party Computation: For coalition sharing scenarios, secure MPC protocols will enable joint track correlation without revealing raw sensor data. Partners can compute on combined data without exposing their individual contributions.
Homomorphic Encryption: Research into homomorphic encryption will enable track correlation on encrypted data, eliminating the need to decrypt sensor inputs during processing.
Particle Filters for Non-Gaussian Tracking: Traditional Kalman filters assume Gaussian noise distributions. Particle filters will handle non-Gaussian measurement noise from complex sensor models and maneuvering targets with unknown intent.
Reinforcement Learning for Engagement Optimization: Agents trained through simulation will learn optimal weapon-target pairing strategies, adapting to evolving threat profiles and resource constraints. The system will continuously improve engagement decisions based on simulation outcomes.
Graph Neural Networks for Correlation: GNNs will model track correlation as a graph problem, learning structural patterns in track associations that rule-based algorithms cannot capture. This enables correlation of tracks from fundamentally different sensor modalities.
Constraint-Based Scheduling: Engagement coordination will use constraint programming to optimize weapon assignments across multiple threats, considering interceptor availability, engagement zones, probability of kill, and resource constraints.
Latency-Aware Processing: Critical paths will be identified and optimized for minimum latency. Event-driven processing will skip non-essential steps for time-critical tracks, falling back to full processing when time permits.
Adaptive Quality of Service: The system will dynamically adjust processing fidelity based on system load, maintaining real-time guarantees for high-priority tracks while reducing processing for lower-priority observations.
Raft for Track Ownership: In multi-node deployments, Raft consensus will determine track ownership when multiple correlators process the same sensor data. This prevents split-brain scenarios where different nodes assign different track IDs to the same physical target.
CRDTs for Edge Synchronization: Conflict-Free Replicated Data Types will enable edge nodes to operate independently and merge track states when connectivity is restored, without requiring coordination during normal operation.
FORGE will maintain strict compliance with military standards:
| Standard | Version | Status | Planned Enhancement |
|---|---|---|---|
| MIL-STD-3011 (JREAP) | Revision C | Implemented | JREAP-D (satellite Link 16) |
| MIL-STD-6016 (Link 16) | Revision D | Implemented | MATT (Multi-Link 16) |
| MIL-STD-2525 (Symbols) | Revision D | Partial | Full 2525C compliance |
| STANAG 5516 (Link 16) | Edition 3 | Implemented | Edition 4 updates |
| STANAG 4609 (JREAP) | Edition A | Implemented | Edition B integration |
Certification Path: FORGE will pursue formal certification through Joint Interoperability Test Command (JITC) for JREAP and Link 16 implementations. Certification enables deployment in operational environments requiring validated interoperability.
Selective open-sourcing will benefit the broader defense simulation community:
Open Core Model: Core track correlation algorithms, message formatters, and data models will be released under a permissive license (Apache 2.0 or MIT). This enables academic research, algorithm development, and community contributions.
Sensitive Components: C2-specific logic, threat profiles, and engagement coordination algorithms will remain proprietary or government-owned. Export-controlled components will have restricted distribution.
Community Engagement: Regular releases, documentation, and developer engagement will build a community around the platform. Hackathons and challenge problems will encourage algorithm development and innovation.
| Feature | Description | Priority | Target |
|---|---|---|---|
| Multi-Hypothesis Tracking | MHT correlation algorithm | P0 | Q3 2026 |
| Interactive Multiple Model | Maneuver detection filters | P0 | Q4 2026 |
| GPU Correlation Acceleration | CUDA-accelerated GNN | P1 | Q1 2027 |
| Service Mesh (mTLS) | Zero-trust service authentication | P1 | Q2 2027 |
| Event Sourcing | Full event store implementation | P1 | Q2 2027 |
| JADC2 Gateway | Joint C2 integration | P0 | Q2 2027 |
| NATO Link 16 MIDS | Network participation groups | P1 | Q3 2027 |
| FPGA Message Encoding | Hardware-accelerated protocols | P2 | Q4 2027 |
| ML Anomaly Detection | Neural network track analysis | P2 | Q1 2028 |
| Secure MPC | Coalition privacy-preserving compute | P3 | Q2 2028 |
| Edge Deployment | Tactical Kubernetes distribution | P2 | Q2 2028 |
| JITC Certification | Formal interoperability validation | P0 | Q4 2028 |
2026 2027 2028
│ │ │
│ Q3 Q4 │ Q1 Q2 Q3 Q4 │ Q1 Q2 Q3 Q4
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ └─ MHT Correlation │ │ │ │ │ │ │ │ │ │ │
│ │ ┌─ IMM Filters │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ └──┼──────────────────────┼──┼──┼──┼──┼─────────────┼──┼──┼──┼──┤
│ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ └──┼──┼──┤ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ GPU│ │ │ │ │ │ │ │ │
│ │ │ Acc│el│ │ │ │ │ │ │ │
│ │ │ └──┼──┤ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ │ │ └──JADC2 │ │ │ │ │
│ │ │ │ Integration │ │ │ │ │
│ │ │ │ └───────────┼──┤ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ NATO │ │ │
│ │ │ │ Link 16│ │ │
│ │ │ │ Full │ │ │
│ │ │ │ └──────┼──┤ │
│ │ │ │ │ML│ │
│ │ │ │ │ │ │
│ │ │ │ │ └──JITC
│ │ │ │ │ Cert
| Partner Type | Opportunities | Engagement Model |
|---|---|---|
| DoD Labs (DARPA, ARL) | Advanced algorithms, ML correlation | CRADAs, SBIR/STTR |
| Federally Funded R&D Centers (FFRDCs) | Architecture validation, security | Contracted research |
| University Research | Academic publications, algorithm development | Sponsored research, intern programs |
| Defense Contractors | System integration, operational deployment | Subcontracting, teaming agreements |
| NATO Science & Technology | Coalition interoperability | Working groups, technical panels |
| Open Source Community | Algorithm contributions, bug reports | GitHub, community forums |
MIT Lincoln Laboratory: Joint research on multi-sensor fusion algorithms and track correlation in contested environments. FORGE provides realistic simulation environment for algorithm validation.
Johns Hopkins APL: Collaboration on engagement coordination algorithms and weapon-target pairing optimization. Integration with APL’s threat modeling capabilities.
NATO STO: Participation in Technical Panels (IST, SET) for sensor fusion standards development. FORGE as reference implementation for proposed standards.
FORGE’s future development balances immediate operational needs with long-term architectural evolution. The roadmap prioritizes capabilities that enhance current simulation and training missions while building toward a platform that can support operational deployment and coalition integration.
Key themes drive development:
The architecture decisions made today—event sourcing, CQRS, service mesh—provide the foundation for these future capabilities. Each enhancement builds on the core platform rather than replacing it, ensuring continuity for existing users and use cases.
Document Version: 1.0
Classification: UNCLASSIFIED//FOR OFFICIAL USE ONLY
Last Updated: April 2026
This appendix provides reference materials, command references, and recovery procedures for the FORGE system design paper. It also includes instructions for resuming work if the system crashes or the session is lost.
| Section | File | Status | Last Updated |
|---|---|---|---|
| 1. Executive Summary | 01-executive-summary.md | ✅ Complete | 2026-04-07 |
| 2. System Architecture | 02-architecture.md | ✅ Complete | 2026-04-07 |
| 3. Data Flow & Messaging | 03-data-flow-messaging.md | ⬜ Pending | - |
| 4. Deployment & Infrastructure | 04-deployment-infrastructure.md | ✅ Complete | 2026-04-08 |
| 5. Integration Patterns & APIs | 05-integration-apis.md | ⬜ Pending | - |
| 6. Performance & Scalability | 06-performance-scalability.md | ⬜ Pending | - |
| 7. Security & Compliance | 07-security-compliance.md | ⬜ Pending | - |
| 8. Testing & Validation | 08-testing-validation.md | ⬜ Pending | - |
| 9. Future Development | 09-future-development.md | ⬜ Pending | - |
| 10. Appendix & Recovery | 10-appendix-recovery.md | ✅ This file | 2026-04-07 |
| Final Assembly | forge-system-design-complete.md | ⬜ Pending | - |
Find the workspace:
/home/wez/stsgym-work/papers/forge-system-design/
Check progress: Read this file (10-appendix-recovery.md) and SECTION-PLAN.md
Identify next section: Look for the first ⬜ Pending in the status table above
Read source material:
/home/wez/stsgym-work/docs/FORGE-SIMULATORS.md/home/wez/stsgym-work/forge-c2//home/wez/stsgym-work/deploy//home/wez/.openclaw/workspace/MEMORY.mdContinue writing: Create the next pending section file
Each section must include:
After all sections are complete:
Merge sections:
cat 01-executive-summary.md 02-architecture.md 03-data-flow-messaging.md \
04-deployment-infrastructure.md 05-integration-apis.md \
06-performance-scalability.md 07-security-compliance.md \
08-testing-validation.md 09-future-development.md 10-appendix-recovery.md \
> forge-system-design-complete.mdGenerate PDF (requires pandoc):
pandoc forge-system-design-complete.md -o forge-system-design.pdf \
--pdf-engine=xelatex -V geometry:margin=1inPublish: Copy to web server or paper repository
| Purpose | Location |
|---|---|
| FORGE Architecture | /home/wez/stsgym-work/docs/FORGE-SIMULATORS.md |
| FORGE-C2 Module | /home/wez/stsgym-work/forge-c2/ |
| Deployment Manifests | /home/wez/stsgym-work/deploy/ |
| Memory/Context | /home/wez/.openclaw/workspace/MEMORY.md |
| TODO Tracking | /home/wez/stsgym-work/TODO.md |
| Purpose | Location |
|---|---|
| Section Files | /home/wez/stsgym-work/papers/forge-system-design/XX-name.md |
| Diagrams | /home/wez/stsgym-work/papers/forge-system-design/diagrams/ |
| Final Paper | /home/wez/stsgym-work/papers/forge-system-design/forge-system-design-complete.md |
# Kubernetes pods
kubectl get pods -A | grep -E 'forge|kafka|patroni|monitoring'
# Kafka topics
kubectl exec -it kafka-0 -n forge -- kafka-topics.sh --list --bootstrap-server localhost:9092
# TimescaleDB connection
psql -h 207.244.226.151 -p 30432 -U postgres -d forge
# Prometheus targets
curl -s http://207.244.226.151:30090/api/v1/targets | jq '.data.activeTargets[].labels.job'# FORGE-C2 logs
kubectl logs -f deployment/forge-c2 -n forge
# Kafka consumer logs
kubectl logs -f deployment/forge-consumer -n forge
# Patroni logs
kubectl logs -f patroni-0 -n patroni# Create backup
velero backup create manual-backup --include-namespaces forge,patroni,monitoring
# List backups
velero backup get
# Restore from backup
velero restore create --from-backup manual-backup| Service | URL | Credentials |
|---|---|---|
| ArgoCD | http://207.244.226.151:31539 | admin / lDZWiXA8uAy8yV81 |
| Prometheus | http://207.244.226.151:30090 | - |
| Grafana | http://207.244.226.151:30300 | admin / admin |
| PostgreSQL | NodePort 30432 | postgres / StsGym2024PostgreSQL |
| MinIO Console | http://207.244.226.151:30901 | velero / VeleroBackup2024! |
| Kafka | NodePort 30721 | - |
| Role | System | Notes |
|---|---|---|
| Primary Developer | Wesley Robbins | Infrastructure engineer |
| Repository | git@idm.wezzel.com:crab-meat-repos/stsgym-work.git | Main codebase |
| FORGE-C2 Repo | git@idm.wezzel.com:crab-meat-repos/forge-c2.git | Submodule |
| Issue | Diagnosis | Resolution |
|---|---|---|
| Kafka not receiving | Check topic config | kubectl logs kafka-0 -n forge |
| TimescaleDB slow | Check hypertable compression | SELECT compress_chunk(i) FROM show_chunks('opir_detections') i; |
| Pod CrashLoopBackOff | Check resource limits | kubectl describe pod <name> |
| Prometheus targets down | Check federation config | kubectl get prometheus -n monitoring |
Kafka Recovery:
kubectl rollout restart statefulset/kafka -n forge
kubectl rollout status statefulset/kafka -n forgePatroni Recovery:
kubectl rollout restart statefulset/patroni -n patroni
kubectl rollout status statefulset/patroni -n patroniFull Cluster Restore:
velero restore create --from-backup weekly-full-backup --wait| Date | Version | Changes |
|---|---|---|
| 2026-04-07 | 1.0 | Initial appendix creation with recovery procedures |
This document is part of the FORGE System Design Paper. For questions, contact the development team.
| Property | Value |
|---|---|
| Author | Wesley Robbins |
| Organization | STSGym |
| Repository | git@idm.wezzel.com:crab-meat-repos/stsgym-work.git |
| Classification | UNCLASSIFIED//FOR OFFICIAL USE ONLY |
| Version | 1.1 |
| Last Updated | May 2026 |
Generated: 2026-05-20T03:29:08+00:00