activecm / ipfix-rita Goto Github PK
View Code? Open in Web Editor NEWCollect IPFIX / Netflow v9 Records and Ship them to RITA for Analysis
Home Page: https://www.activecountermeasures.com/
Collect IPFIX / Netflow v9 Records and Ship them to RITA for Analysis
Home Page: https://www.activecountermeasures.com/
mgo is no longer supported.
However, MongoDB has written their own, new Go driver here.
The new Go driver supports the atomic findOneAndUpdate() which is needed to support parallel consumers for the Logstash -> MongoDB queue.
RITA is moving to version 2.0. Thankfully, it appears the input schemas are backwards compatible. We will need to bump the ImportVersion version string in the MetaDatabase records we create to 2.0.
We might think about versioning the input schemas separate from the rest of the project to reduce this type of problem in the future.
This line simply needs to be changed to:
ImportVersion: "v2.0.0+ActiveCM-IPFIX",
We need a configuration manager to hold the MongoDB connection/ security settings (similar to RITA).
In addition, we need to be able to configure a session aggregate expiration threshold and a DBRoot value to name the RITA output databases with. (DBRoot-%Y-%m-%d)
Might as well use YAML to stay consistent with everything else.
Currently the ipfix-rita script simply relays commands to docker-compose.
This can lead to user errors and confusion.
For example, the down
command removes the ipfix-rita docker network. If the user has configured their mongodb server to listen on the docker host adapter for the ipfix-rita network, the user will need to update their mongodb config after bringing the system back up. We should just prevent users from issuing this command.
Additionally, the version
command reports the docker compose version rather than ipfix-rita's.
Overall, we should provide a more user friendly wrapper script for controlling ipfix-rita.
In order to prevent the corruption of RITA data as discussed in activecm/rita#237 , we must implement the import_finished
MetaDatabase flag. This flag can be set to true in accordance with #28.
I propose we keep the filterless output module that is currently written in addition to the new filtered module. The filterless module can set the import_finished
flag when the program exits. The existing module helps with testing.
The session aggregate should have the same fields as ipfix.Flow.
However, a problem occurs when querying for sessions with the same flow key as a given flow. The source and destination might be swapped on the session aggregate being queried for. To solve this problem, session aggregates should consider themselves with Host A and Host B rather than source and destination hosts. Additionally, Host A and Host B must be ordered so there is only one way to query each session aggregate.
For example, requiring Host A comes before Host B in the alphabet ensures at most one session aggregate exists for each flow. Given a flow with SourceIPAddress: 222.222.222.222
and DestinationIPAddress: 111.111.111.111
, a session aggregate could be found by querying for IPAddressA:111.111.111.111
and IPAddressB: 222.222.222.222
.
Additionally, we may as well aggregate the flow information separately (A->B. and B->A) so we don't lose visibility in the debugging process. The separate counts for (A->B and B->A) can be merged when the record written out to RITA.
If a Flow looks something like this:
type Flow struct {
Exporter string
SourceIPAddress string
SourcePort uint16
DestinationIPAddress string
DestinationPort uint16
FlowStartMilliseconds uint64
FlowEndMilliseconds uint64
OctetTotalCount uint64
PacketTotalCount uint64
ProtocolIdentifier protocols.Identifier
IPClassOfService uint8
VlanID uint16
FlowEndReason FlowEndReason
Version uint8
}
A session should look something like this
type Session struct {
IPAddressA string
PortA uint16
IPAddressB string
PortB uint16
FlowStartMillisecondsAB uint64
FlowEndMillisecondsAB uint64
FlowStartMillisecondsBA uint64
FlowEndMillisecondsBA uint64
OctetTotalCountAB uint64
PacketTotalCountAB uint64
OctetTotalCountBA uint64
PacketTotalCountBA uint64
LastFlowEndReasonAB FlowEndReason
LastFlowEndReasonBA FlowEndReasons
//These fields should remain constant
ProtocolIdentifier protocols.Identifier
Exporter string
Version uint8
//We don't need these fields
//IPClassOfService uint8
//VlanID uint16
}
Currently, IPFIX-RITA writes all of the resulting sessions to the screen using the output.SpewRITAConnWriter
. This type implements the output.SessionWriter
interface.
//SessionWriter writes session aggregates to their final destination
type SessionWriter interface {
//Write writes the session aggregate to its final destination
Write(sess *session.Aggregate) error
}
//SpewRITAConnWriter writes session aggregates out to the terminal
//as RITA conn objects
type SpewRITAConnWriter struct{}
//Write spews the session to the terminal as a RITA Conn object
func (s SpewRITAConnWriter) Write(sess *session.Aggregate) error {
var conn parsetypes.Conn
sess.ToRITAConn(&conn, func(ipAddress string) bool { return false })
spew.Println(conn)
return nil
}
Write output.RITAConnWriter
which implements output.SessionWriter
.
The RITAConnWriter
must write session.Aggregate
objects into MongoDB databases such that the resulting connection records can be processed by RITA.
Additionally, the RITAConnWriter
must separate the connection records into databases based on when the connections started (parsetypes.Conn.Timestamp
). A connection record with a Timestamp
falling on June 18, 2018 should be written to [DBRoot]-2018-06-18
. The DBRoot
field should be provided by the user.
The connection records must be inserted into a collection named conn
in each RITA compatible database. Each conn
collection must possess the indexes specified in RITA here. These indexes must be able to be changed easily as the existing indexes are likely not optimal.
When IPFIX-RITA is certain it will not insert any more records into a given RITA database, it must insert a MetaDatabase record. The MetaDatabase is specified in the RITA configuration. Within this database, a collection named databases
holds the index of RITA compatible databases. The records within this collection follow the schema here. The import_version
field should be set to the minimum version of RITA that IPFIX is compatible with.
If the Metadatabase does not exist, IPFIX-RITA must create it. The databases
collection requires the index listed here.
The DBRoot
option can be obtained from RITA configuration object provided by the environment.Environment
type.
The MetaDatabase
name will need to be added to the RITA configuration section or the field must be read from the RITA configuration file if RITA is installed.
The session.Aggregate
type has a method ToRITAConn
which performs the format conversion.
Figuring out when IPFIX-RITA will no longer insert data into a database is likely the most difficult part of implementing the RITA Writer Subsystem. Write calls will not necessarily occur in order, and the last place the order of the data is maintained is in stitching.Manager
. We may be able to rely on the exporter/flusher clocks (stitching.flusher
). (I don't have a solution for this yet)
See commands/convert.go
for how to integrate a different output.SessionWriter
into the program.
A prototype should be created which doesn't split out the RITA connection records by day. Rather, it should simply dump all of the records into a single RITA compatible database. The Metadatabase record should be added at the end of execution.
In order to test the accuracy of IPFIX-RITA + YAF vs RITA + Bro IDS, we need a way to difference RITA conn collections. Currently, MongoDiff supports differencing entire databases. Unfortunately, there are fields that are left blank by IPFIX-RITA + YAF that are filled in by RITA + Bro IDS. The MongoDiff script will flag these as differences. The MongoDiff script should be adapted to our use case in order to ignore these differences.
As described in issue #14, we need to assure RITA that when we "finish" writing to a database, we won't write any more records to that database. The issue is, IPFIX-RITA doesn't "finish" writing to a database. If a pair of flow records come in with closing timestamps aligned with a given day, the program will insert the matched pair of flows into that day's database no matter what.
In order to add the idea of "finishing" a database import, we will implement a Same Day Filter With a Grace Period.
This filter is applied to the output module in the IPFIX-RITA converter pipeline.
Filter algorithm:
The filter can be written in terms of arbitrary periods instead of being date aligned. For the more general algorithm, please see issue #14.
Currently, the grace period should be set to 4 minutes (5 minutes, but we short 1 minute to account for minor clock drift). Ideally, this grace period will be pushed back to 1 hour.
Related issues:
Errors and notices need to be funneled to stdout such that they may be read with docker logs.
All other activecm projects use https://github.com/sirupsen/logrus.
When the user runs ipfix-rita stop
, it is fair to say that the user expects to be able to analyze any data collected during the current day. We currently do not support this.
If we naively mark the database's import_finished
field on exit, then the system will become unstable if the user starts the program within the same day. This is due to the fact that IPFIX-RITA does not check if the import_finished
flag is set to true before it starts writing out results.
In order to support clean shutdowns, we need to do one of two things:
import_finished
flag before writing, if it is set, create a new database, otherwise carry on as usual.import_finished
flag before writing, if it is set, check if RITA has started analysis (currently impossible), if it has started, create a new database, otherwise unset the import_finished flag and carry on as usual.Since the latter requires a change in RITA and the latter could build on the prior, I suggest we implement the first option to start.
IPFIX-RITA only supports reading IPFIX records via MongoDB/ Logstash.
The schema of Netflow v9 records is unknown.
Several different architecture designs allow us to support other netflow formats with their own schemas.
Detect v9 records in Logstash and use Logstash filters to map the v9 fields into IPFIX records.
If we send both types of data, mixing schemas within the same MongoDB collection, we need to implement more complex parsing logic in the go code. However, no changes need to be made to the way data flows through the converter program.
In this case, the parsing logic is made much simpler, but IPFIX-RITA must support reading data from several data sources.
We need to know what netflow v9 records look like when they come through Logstash to MongoDB.
As a subdirectory, create a docker image containing logstash spec'd out to our purposes.
This image can be used via docker compose along with IPFIX-RITA's main executable and MongoDB.
While working IPFIX-RITA it is often convenient to run data through the system that was not captured within the development day. In order to prevent out-of-time-segment errors, we should provide a flag which uses a simple MongoDB output module instead of the rotating date based MongoDB output module. A simple output module that carries this out can be found in the git history. Check out fe541cf
Come up with a plan for proper error management.
Of particular importance is recording a stack trace with each error. errors.WithStack() can be used for this.
Currently it appears that there is an issue reading the IPFIX Logs incoming from the MikroTik router. The error follows ERRO[1082] input map must contain key 'netflow.flowStartMilliseconds'
The input map follows:
map[
_id:ObjectIdHex("5c004115354bf5fc1800008c")
netflow:map[
tcpControlBits:16
destinationTransportPort:80
destinationMacAddress:REDACTED
postSourceMacAddress:REDACTED
tcpWindowSize:4096
udpMessageLength:0
ingressInterface:2
flowEndSysUpTime:85237060
isMulticast:0
ipTotalLength:40
sourceIPv4Address:REDACTED
ipTTL:64
ipHeaderLength:5
octetDeltaCount:559
postNATSourceIPv4Address:REDACTED
postNATDestinationIPv4Address:REDACTED
ipNextHopIPv4Address:REDACTED
version:10
destinationIPv4Address:REDACTED
tcpAcknowledgementNumber:256845980
sourceIPv4PrefixLength:0
icmpCodeIPv4:0
flowStartSysUpTime:85222260
icmpTypeIPv4:0
packetDeltaCount:3
ipVersion:4
ipClassOfService:0
postNAPTSourceTransportPort:54507
sourceTransportPort:54507
destinationIPv4PrefixLength:0
egressInterface:0
protocolIdentifier:6
postNAPTDestinationTransportPort:80
igmpType:0
tcpSequenceNumber:661187750
]
@timestamp:"2018-11-29T19:42:11.000Z"
@Version:1
host:REDACTED
]
What appears to be happening is that we expect a flowStartMilliseconds field but in the above we have flowStartSysUpTime. In input/mgologstash/flow.go on line 133 we search the map for flowStartMilliseconds, and report an error if we don't have it. Also we check for flowEndMilliseconds on line 142. It would be ideal to add a check for the flowStartSysUpTime and flowEndSysUpTime to resolve this issue.
We shouldn't have to type /opt/ipfix-rita/bin/ipfix-rita every time we want to use the program.
We could add our own file in /etc/profile.d and require users to run source /etc/profile
in order to solve this problem.
Or if RITA is already installed, we could drop a symlink to ipfix-rita in RITA's bin folder.
Currently, we do not know how the project scales with more CPU/ RAM. However, we do have rough numbers for logstash. We need to set up similar benchmarks so we can estimate what resources will be needed for various loads.
In order to define test cases, we can create packet captures with various types of netflow traffic from different devices.
However, we need to be able to replay the netflow traffic in the pcap file in order to test IPFIX-RITA.
The easiest way to accomplish this will likely be to use a python script with scapy to read the data from the pcap file and send the data to a designated host.
Some firewalls include NAT in IPFIX. In this case, simply take the postNATDestination*
fields as the destination*
fields when available.
{
"_id" : ObjectId("5b58eed810a0cffdc9016777"),
"@timestamp" : "\"2018-07-25T21:42:47.000Z\"",
"host" : "123.45.67.89",
"netflow" : {
"packetDeltaCount" : 1,
"egressInterface" : 2,
"protocolIdentifier" : 184,
"ipNextHopIPv4Address" : "0.0.0.0",
"version" : 10,
"ingressInterface" : 1,
"postNAPTSourceTransportPort" : 18856,
"postNATSourceIPv4Address" : "10.0.0.237",
"sourceIPv4Address" : "192.168.168.65",
"postNAPTDestinationTransportPort" : 53,
"flowStartSysUpTime" : 24000,
"sourceTransportPort" : 33016,
"octetDeltaCount" : 61,
"destinationTransportPort" : 53,
"destinationIPv4Address" : "8.8.4.4",
"flowEndSysUpTime" : 24000,
"postNATDestinationIPv4Address" : "8.8.4.4"
},
"@version" : "1"
}
/* 2 */
{
"_id" : ObjectId("5b58eed810a0cffdc9016778"),
"@timestamp" : "\"2018-07-25T21:42:47.000Z\"",
"host" : "123.45.67.89",
"netflow" : {
"packetDeltaCount" : 1,
"egressInterface" : 1,
"protocolIdentifier" : 17,
"ipNextHopIPv4Address" : "10.0.0.1",
"version" : 10,
"ingressInterface" : 2,
"postNAPTSourceTransportPort" : 53,
"postNATSourceIPv4Address" : "8.8.4.4",
"sourceIPv4Address" : "8.8.4.4",
"postNAPTDestinationTransportPort" : 33016,
"flowStartSysUpTime" : 24000,
"sourceTransportPort" : 53,
"octetDeltaCount" : 77,
"destinationTransportPort" : 18856,
"destinationIPv4Address" : "10.0.0.237",
"flowEndSysUpTime" : 24000,
"postNATDestinationIPv4Address" : "192.168.168.65"
},
"@version" : "1"
}
Some IPFIX exporters choose to use packetDeltaCount
and octetDeltaCount
over packetTotalCount
and octectTotalCount
even when such exporters are configured to only report on connection close. This is allowed within the IPFIX specification, and we should handle it.
As it stands, we have implemented a subset of netflow v9 and IPFIX.
We would like to go forward with implementing netflow v5.
In collector/logstash/pipeline/ipfix.conf
, we do not specify the versions to be processed by logstash, so netflow v5 data should make it into the MongoDB buffer collection. As it stands, we should be seeing the converter write an error out to the log whenever it tries to parse a netflow v5 record. This error should contain the text "unspported netflow version: 5". This error is thrown from FillFromBSONMap
in ipfix-rita/converter/input/mgologstash/flow.go
.
An investigation of the logstash source code reveals that each of the records produced should have the following fields
ip4_addr :ipv4_src_addr
ip4_addr :ipv4_dst_addr
ip4_addr :ipv4_next_hop
uint16 :input_snmp
uint16 :output_snmp
uint32 :in_pkts
uint32 :in_bytes
uint32 :first_switched
uint32 :last_switched
uint16 :l4_src_port
uint16 :l4_dst_port
uint8 :tcp_flags
uint8 :protocol
uint8 :src_tos
uint16 :src_as
uint16 :dst_as
uint8 :src_mask
uint8 :dst_mask
uint32 :uptime
uint32 :unix_sec
uint32 :unix_nsec
uint32 :flow_seq_num
uint8 :engine_type
uint8 :engine_id
bit2 :sampling_algorithm
bit14 :sampling_interval
uint16 :version
type Flow interface {
//SourceIPAddress returns the source IPv4 or IPv6 address
SourceIPAddress() string
//SourcePort returns the source transport port
SourcePort() uint16
//DestinationIPAddress returns the destination IPv4 or IPv6 address
DestinationIPAddress() string
//DestinationPort returns the destination transport port
DestinationPort() uint16
//ProtocolIdentifier returns which transport protocol was used
ProtocolIdentifier() protocols.Identifier
//FlowStartMilliseconds is the time the flow started as a Unix timestamp
FlowStartMilliseconds() (int64, error)
//FlowEndMilliseconds is the time the flow ended as a Unix timestamp
FlowEndMilliseconds() (int64, error)
//OctetTotalCount returns the total amount of bytes sent (including IP headers and payload)
OctetTotalCount() int64
//PacketTotalCount returns the number of packets sent from the source to the destination
PacketTotalCount() int64
//FlowEndReason returns why the metering process stopped recording the flow
FlowEndReason() FlowEndReason
//Version returns the IPFIX/Netflow version
Version() uint8
//Exporter returns the address of the exporting process for this flow
Exporter() string
}
The mapping between these two looks something like this:
ipv4_src_addr -> SourceIPAddress
l4_src_port -> SourcePort
ipv4_dst_addr -> DestinationIPAddress
l4_dst_port -> DestinationPort
protocol -> ProtocolIdentifier # We need to check if netflow v5 uses the same protocol numbers
first_switched -> FlowStartMilliseconds
last_switched -> FlowEndMilliseconds
in_bytes -> OctectTotalCount
in_pkts -> PacketTotalCount
NilEndReason -> FlowEndReason # Netflow v5 does not support this information
version -> Version
host -> Exporter # Logstash tells us what machine sent it data in the host field of each record
We will need to add a method fillFromNetflowv5BSONMap
to ipfix-rita/converter/input/mgologstash/flow.go
and dispatch to it based on the version value in the method FillFromBSONMap
. It should be rather to base this new method off of fillFromNetflowv9BSONMap
.
At the moment, I do not see any reason other changes need to be made. Once we map the incoming BSON data to flow records to flow objects, the data should pass easily through the rest of the program.
Add information on how to generate an ipfix-rita release to Developer Notes.md
Unable to start solution as the README file indicates because there is no docker-compose.yaml in the docker/ folder. Was it moved?
This project relies heavily on MongoDB and will need an integration testing harness that provides access to a test MongoDB server (preferably managed by Docker).
Need to add some debugging code to the landing README page, this will help customers determine issues with their install of IPFIX-RITA
We need to present a user friendly interface on top of docker-compose.
(More information will be added later)
When running IPFIX-RITA capturing logs starting at 4PM and going until about 6:15PM MST on December 6th I discovered that the logs rolled over and started saving to December 7th.
I believe this is caused by MST being UTC-07:00 and 5PM would be midnight on December 7th so the logs automatically rolled over starting around that time.
If possible we should have IPFIX-RITA auto-detect and to adjust it's time zone based on system time.
I discovered that we can take UTC 00:00 to local time using the following code
rn := time.Now() //get the system time right now
_, secDiff := rn.Zone() //get the time zone difference (in seconds)
//When going from UTC to local just adding the difference in nanoseconds
// We multiply by 1000000000 because duration is an int64 measured in nanoseconds
secDiffDur := int64(secDiff) * int64(1000000000)
time := logTime.Add(time.Duration(secDiffDur))
This performs the majority of the flow -> session conversion.
In order to test the collector, converter connection, a screen writer which pulls its data from the MongoDB queue should be written.
Every once in a while TestGoRoutineLeaks fails.
In the converter project, we stitch IPFIX flow data into RITA/ Bro sessions. Flow data is unidirectional (there are two records describing the communications from host A to host B and from host B to host A) while Bro sessions are bidirectional (there is one record describing the communications from host A to host B and from host B to host A). The main stitching logic is located in converter/stitching/stitching.go
https://github.com/activecm/ipfix-rita/blob/master/converter/stitching/stitcher.go#L121
The current algorithm in psuedocode:
Input: A session.Aggregate f , A Matcher m
Output: A Merged Session Aggregate or Nothing
matches := m.Find(f.AggregateQuery) //Finds all session aggregates which may match f
matchFound := false
Loop over matches as nextMatch:
if shouldMerge(f, nextMatch):
matchFound = true
f.merge(nextMatch) //changes f
if f.FilledFromSourceA and f.FilledFromSource: //The session has both sides of the connection detailed
m.Remove(f)
return f
else: //The merge happened on the same side of the connection
m.Update(nextMatch, f) //overwrite the matching aggregate in the matcher with the merged
return
if not matchFound: //either len(matches) = 0 or shouldMerge returned false for all results
m.Insert(f)
return
This algorithm is greedy in that it takes the first session.Aggregate
which matches the input and passes the shouldMerge()
check.
Ideally, we should merge the input with the best fitting session.
A merge cost can be computed as abs(f_FlowEndMilliseconds - nextMatch_FlowEndMilliseconds) + abs(f_FlowStartMilliseconds - nextMatch_FlowStartMillseconds)
Unfortunately, the FlowEndMilliseconds and FlowStartMilliseconds are split between the AB
and BA
sides of the session.Aggregate
.
During stitching only one side of the session should be filled and the following code snippet can be used to get the correct field. The following snippets should also work for FlowStart (in the second snippet, change the > sign to a < sign)
var f_FlowEndMilliseconds int64
if f.FilledFromSourceA:
f_FlowEndMilliseconds = f.FlowEndMillisecondsAB
else if f.FilledFromSourceB:
f_FlowEndMilliseconds = f.FlowEndMillisecondsBA
else:
//shouldn't happen during the stitching process
However, the following snippet handles sessions which may have both sides of the connection filled
var f_FlowEndMilliseconds int64
if f.FilledFromSourceA && f.FilledFromSourceB:
if f.FlowEndMillisecondsAB > f.FlowEndMillisecondsBA:
f_FlowEndMilliseconds = f.FlowEndMillisecondsAB
else:
f_FlowEndMilliseconds = f.FlowEndMillsecondsBA
else if f.FilledFromSourceA:
f_FlowEndMilliseconds = f.FlowEndMillisecondsAB
else if f.FilledFromSourceB:
f_FlowEndMilliseconds = f.FlowEndMillsecondsBA
else:
//no data available. leave zero
It may be worthwhile to add this to the session struct as a method.
In the new algorithm, we compute the cost for each possible match (after passing shouldMerge
). If the new cost is lower than the existing cost (initialized to MAX_INT), then we store the nextMatch
and update the existing cost. After iterating, if the stored nextMatch
is not nil, then we perform the update logic with the stored nextMatch
.
Overall the new algorithm will look like:
Input: A session.Aggregate f , A Matcher m
Output: A Merged Session Aggregate or Nothing
matches := m.Find(f.AggregateQuery) //Finds all session aggregates which may match f
match := session.Aggregate{}
matchFound := false
matchCost := MAX_INT
Loop over matches as nextMatch:
if shouldMerge(f, nextMatch):
matchFound = true
compute f_FlowStartMilliseconds, nextMatch_FlowStartMilliseconds, f_FlowEndMilliseconds, nextMatch_FlowEndMilliseconds
newMatchCost = abs(f_FlowStartMilliseconds - nextMatch_FlowStartMilliseconds) + abs(f_FlowEndMilliseconds - nextMatch_FlowEndMilliseconds)
if newMatchCost < matchCost:
match = nextMatch
newMatchCost = matchCost
if matchFound:
f.merge(match) //changes f
if f.FilledFromSourceA and f.FilledFromSource: //The session has both sides of the connection detailed
m.Remove(match)
return f
else: //The merge happened on the same side of the connection
m.Update(match, f) //overwrite the matching aggregate in the matcher with the merged
return
else:
m.Insert(f)
return
Currently the install script stops and has the end user modify the mongod.conf to have the docker IP address, then restart mongo.service
This might be the most confusing step of the install, and it might be better to prompt the user to enter the IP Address of the RITA system (or hit enter if they are installing it local) then have the install script automatically add the IP to the mongod.conf file and restart mongo.
In an upcoming release of IPFIX-RITA we want to limit the number of connections we store. This reduces the size of the conn table in RITA and makes analyzing data faster
This isn't an urgent concern yet, need to wait for it to be implemented in RITA first
Add a Manual to IPFix-Rita that writes out the stages for installing IPFix-Rita, and includes a disclaimer about this being a beta and how to contact us if there are any issues
Initialization script (ipfix-rita) looks for environment variables that don't exist, causing the script to fail to launch.
ERROR: Invalid interpolation format for "image" option in service "logstash": "quay.io/activecm/ipfix-rida-logstash:${IPFIX-RITA-VERSION:-latest}"
As we have moved more information out of the README and into the docs folder, we should distribute the docs/ folder with the installer tarball.
When a new release is produced using the make-release.sh script it tags both the tar file and base directory with the version number, we need to remove the version number from both the tar file and base directory names.
Ubuntu 16.04 repo docker-compose cannot interpret version 3.3 in main.yaml. Need minimum required version for docker-compose to function.
It is currently unknown what happens if RITA runs with only connectoin records (without dns/ http records).
RITA should be able to function without http and dns information with minimal changes.
Import a dataset with RITA, connect to MongoDB, delete the http and dns collections, and run the analyze step. Repeatedly note where the program breaks.
No integration tests have been written to ensure IPFIX flows are stitched together correctly.
Ensure stiching.Manager
correctly stitches flows.
A new integration test can be created by creating the file manager_test.go
. Label the package as stiching_test
. Next, create a standard go test function TextXXX(t *testing.T) {}
. Then, start a new integration test using
env, cleanup := environmenttest.SetupIntegrationTest(t)
defer cleanup()
The env
variable contains configuration information, and most importantly, a connection to a dockerized MongodDB.
From here, instantiate a new stitching.Manager
, create a go routine to feed the input
channel, and a simple writer
implementation to retrieve the outputs from the manager.
The require
package is immensely helpful in go tests. See the other test files for usage.
A fake flow with random data can be generated with ipfix.NewFlowMock()
, then the needed fields can be overwritten as needed.
When a flow is added to a session aggregate, the flow's one-way data is added to a two-way data structure. While a flow has a field OctetTotalCount
representing the amount of bytes sent from SourceIPAddress
to DestinationIPAddress
, a session aggregate has two fields OctetTotalCountAB
and OctetTotalCountBA
representing the amount of bytes sent between IPAddressA
and IPAddressB
in each direction. The true source and destination of the session aggregate are determined when the aggregate is ready to be written out. If FlowStartMillisecondsAB < FlowStartMillisecondsBA
, then IPAddressA
is considered the source and IPAddressB
is considered the destination, otherwise IPAddressB
is the source and IPAddressA
is the destination.
It is critical that flows are assigned to the AB
and BA
sections of session aggregates in a non-ambiguous, reproducible manner. To ensure this happens, the host (SourceIPAddress
or DestinationIPAddress
) which comes first alphabetically within a flow, will be assigned to a session aggregate's IPAddressA
field, and the other host will be assigned to the IPAddressB
field.
When a session aggregate is first created from a single flow, the flow is said to be stitched with zeroes. Since a flow only contains one-way data, the other side of the connection cannot be known. This lack of data is represented with zero values.
For example given the flow:
flow1 := ipfix.NewFlowMock()
//Ensure the source comes before the destination alphabetically
//to ensure the source is mapped to host "A", and the destination is
//mapped to host "B"
flow1.MockSourceIPAddress = "1.1.1.1"
flow1.MockDestinationIPAddress = "2.2.2.2"
flow1.MockProtocolIdentifier = protocols.ICMP
flow1.MockFlowEndReason = ipfix.IdleTimeout
we can create a session aggregate:
var sessAgg session.Aggregate
srcMapping, err := session.FromFlow(flow, &sessAgg)
if err != nil {
return err
}
srcMapping
lets us know whether the flow was mapped into the AB portion of the session aggregate or the BA portion. We can check this like so:
if srcMapping == session.ASource { //flow mapped into AB portion }
else { //flow mapped into BA portion }
Our sessAgg
contains the following data (psuedocode):
sessAgg = struct {
AggregateQuery: struct {
IPAddressA: "1.1.1.1"
PortA: <random port from mock flow>
IPAddressB: "2.2.2.2"
PortB: <random port from mock flow>
ProtocolIdentifier: protocols.ICMP
Exporter: <random ip address from mock flow>`
}
FlowStartMillisecondsAB: <random time from mock flow>
FlowEndMillisecondsAB: <random time from mock flow>
FlowStartMillisecondsBA: 0
FlowEndMillisecondsBA: 0
OctetTotalCountAB: <random amount of data from mock flow>
OctetTotalCountBA: 0
PacketTotalCountAB: <random amount of data from mock flow>
PacketTotalCountBA: 0
FlowEndReasonAB: <random reason from mock flow>
FlowEndReasonBA: ipfix.Nil
}
Since all of the BA fields (FlowStartMillisecondsBA
, FlowEndMillisecondsBA
, OctetTotalCountBA
, and FlowEndReasonBA
) are zero (or ipfix.Nil
in the case of FlowEndReason
), the flow is said to have been stitched with zeroes.
Additional flows can be aggregated into an existing session aggregate provided the flow has a matching Flow Key / Aggregate Query. First, the new flow is mapped into either the AB or BA section of the session aggregate. The OctetTotalCount
and PacketTotalCount
of the flow are added to the corresponding fields in the matching AB/ BA section of the aggregate. If the FlowStartMilliseconds
of the new flow is earlier than corresponding field in the session aggregate or if the field is unset in the aggregate, the value is overwritten with the flow's. If the FlowEndMilliseconds
of the new flow is later than the corresponding field in the session aggregate or if the field is unset in the aggregate, the value is overwritten with the flow's and the FlowEndReason
is updated as well.
Flows should only be stitched when it logically makes sense. Unfortunately, this depends on the protocol.
ICMP is a connectionless protocol in which sessions are not maintained (@william-stearns can you confirm?). In this case, flows are never stitched together. They are all stitched with zeroes.
We use this model to deal with unknown protocols as well.
UDP is a connectionless protocol in which sessions are not maintained at the protocol level. However, UDP sessions are often maintained in the application layer unlike ICMP. Since UDP has no support for sessions at the protocol layer, the most common FlowEndReason
for UDP flows is simply IdleTimeout
. In order to recover the application layer sessions, we have to make intelligent guesses.
Our guessing scheme relies on the sameSessionTimeout. If a flow from host A to host B ended at time tFlowEnd, and another flow from host A to host B or host B to host A starts at time tFlowStart, then the two flows will be stitched if tFlowStart <= tFlowEnd + sameSessionTimeout. Otherwise the two flows will each be stitched with zeroes.
TCP is a connection oriented protocol, so we don't have to make as many guesses. However, TCP monitoring isn't perfect. During normal operation, TCP flows will have FlowEndReason
fields marked with EndOfFlow
. EndOfFlow
signals one end of the TCP session was tore down cleanly.
When the FlowEndReason
for a given side of a session aggregate (AB/BA) is EndOfFlow
, we make sure no more flows are stitched into that side of the aggregate. Otherwise, TCP is treated the same way as UDP.
The above information is summarized in the following table of test cases.
A Google Sheet with the same information can be found here
Test Name | Protocol | Flow1 EndFlowReason | Flow 1 Direction | Flow 2 Direction | Flow2.FlowStartMilliseconds <= Flow1.EndMilliseconds + sameSessionTimeout | # Of Output Session Aggregates | # Of Errors | Session Aggregate 1 Results AB | Session Aggregate 1 Results BA | Session Aggregate 2 Results AB | Session Aggregate 2 Results BA |
---|---|---|---|---|---|---|---|---|---|---|---|
wn | |||||||||||
SingleICMPFlow | ICMP | IdleTimeout | A -> B | N/A | N/A | 1 | 0 | Flow 1 Values | Flow 1 Values | N/A | N/A |
TwoICMPFlowsSameSourceInTimeout | ICMP | IdleTimeout | A -> B | A -> B | True | 2 | 0 | Flow 1 Values | Zeroes | Flow 2 Values | Zeroes |
TwoICMPFlowsSameSourceOutOfTimeout | ICMP | IdleTimeout | A -> B | A -> B | False | 2 | 0 | Flow 1 Values | Zeroes | Flow 2 Values | Zeroes |
TwoICMPFlowsFlippedSourceInTimeout | ICMP | IdleTimeout | A -> B | B -> A | True | 2 | 0 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
TwoICMPFlowsFlippedSourceOutOfTimeout | ICMP | IdleTimeout | A -> B | B -> A | False | 2 | 0 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
SingleUDPFlow | UDP | IdleTimeout | A -> B | N/A | N/A | 1 | 0 | Flow 1 Values | Zeroes | N/A | N/A |
TwoUDPFlowsSameSourceInTimeout | UDP | IdleTimeout | A -> B | A -> B | True | 1 | 0 | Flow 1 Values + Flow 2 Values | Zeroes | N/A | N/A |
TwoUDPFlowsSameSourceOutOfTimeout | UDP | IdleTimeout | A -> B | A -> B | False | 2 | 1 | Flow 1 Values | Zeroes | Flow 2 Values | Zeroes |
TwoUDPFlowsFlippedSourceInTimeout | UDP | IdleTimeout | A -> B | B -> A | True | 1 | 0 | Flow 1 Values | Flow 2 Values | N/A | N/A |
TwoUDPFlowsFlippedSourceOutOfTimeout | UDP | IdleTimeout | A -> B | B -> A | False | 2 | 1 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
SingleTCPIdleOutFlow | TCP | IdleTimeout | A -> B | N/A | N/A | 1 | 0 | Flow 1 Values | Zeores | N/A | N/A |
TwoTCPIdleOutFlowsSameSourceInTimeout | TCP | IdleTimeout | A -> B | A -> B | True | 1 | 0 | Flow 1 Values + Flow 2 Values | Zeroes | N/A | N/A |
TwoTCPIdleOutFlowsSameSourceOutOfTimeout | TCP | IdleTimeout | A -> B | A -> B | False | 2 | 1 | Flow 1 Values | Zeroes | Flow 2 Values | Zeroes |
TwoTCPIdleOutFlowsFlippedSourceInTimeout | TCP | IdleTimeout | A -> B | B -> A | True | 1 | 0 | Flow 1 Values | Flow 2 Values | N/A | N/A |
TwoTCPIdleOutFlowsFlippedSourceOutOfTimeout | TCP | IdleTimeout | A -> B | B -> A | False | 2 | 1 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
SingleTCPEOFFlow | TCP | EndOfFlow | A -> B | N/A | N/A | 1 | 0 | Flow 1 Values | Zeroes | N/A | N/A |
TwoTCPEOFFlowsSameSourceInTimeout | TCP | EndOfFlow | A -> B | A -> B | True | 2 | 1 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
TwoTCPEOFFlowsSameSourceOutOfTimeout | TCP | EndOfFlow | A -> B | A -> B | False | 2 | 1 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
TwoTCPEOFFlowsFlippedSourceInTimeout | TCP | EndOfFlow | A -> B | B -> A | True | 1 | 0 | Flow 1 Values | Flow 2 Values | N/A | N/A |
TwoTCPEOFFlowsFlippedSourceOutOfTimeout | TCP | EndOfFlow | A -> B | B -> A | False | 2 | 1 | Flow 1 Values | Zeroes | Zeroes | Flow 2 Values |
In order to better support the docker environment, the collector should be able to expand environment variables in its configuration file. (or implement a environment variable configuration provider).
This way the MongoDB uri can be configured without rebuilding the docker image.
RITA does this here
The config file allows the user to specify a TLS certificate for connecting to MongoDB. However, the path the config file points to is not loaded into the docker image, so the path resolution fails. We need to figure out a good way to handle this. When we package RITA for docker, we will need to confront this same issue.
Currently we split the output databases by day in the UTC timezone. We need to offset the output record timestamps with the current system timezone before we determine the corresponding date. That way records are sorted by day in the user's timezone.
Softflowd is an open source software based netflow exporter for Linux and BSD.
ElementID | Name | Abstract Data Type | Data Type Semantics | Status | Description |
---|---|---|---|---|---|
21 | flowEndSysUpTime | unsigned32 | current | The relative timestamp of the last packet of this Flow. It indicates the number of milliseconds since the last (re-)initialization of the IPFIX Device (sysUpTime). sysUpTime can be calculated from systemInitTimeMilliseconds. | |
22 | flowStartSysUpTime | unsigned32 | current | The relative timestamp of the first packet of this Flow. It indicates the number of milliseconds since the last (re-)initialization of the IPFIX Device (sysUpTime). sysUpTime can be calculated from systemInitTimeMilliseconds. | |
160 | systemInitTimeMilliseconds | dateTimeMilliseconds | default | current | The absolute timestamp of the last (re-)initialization of the IPFIX Device. |
systemInitTimeMilliseconds
is sent in a data flow (matching an option template) along with the IPFIX templates every 16 packets. See https://github.com/irino/softflowd/blob/master/ipfix.c#L421
Set 3 [id=3] (Options Template): 256
FlowSet Id: Options Template (V10 [IPFIX]) (3)
FlowSet Length: 30
Options Template (Id = 256) (Scope Count = 1; Data Count = 4)
Template Id: 256
Total Field Count: 5
Scope Field Count: 1
Field (1/1) [Scope]: meteringProcessId
0... .... .... .... = Pen provided: No
.000 0000 1000 1111 = Type: meteringProcessId (143)
Length: 4
Field (1/4): systemInitTimeMilliseconds
0... .... .... .... = Pen provided: No
.000 0000 1010 0000 = Type: systemInitTimeMilliseconds (160)
Length: 8
Field (2/4): selectorAlgorithm
0... .... .... .... = Pen provided: No
.000 0001 0011 0000 = Type: selectorAlgorithm (304)
Length: 2
Field (3/4): samplingPacketInterval
0... .... .... .... = Pen provided: No
.000 0001 0011 0001 = Type: samplingPacketInterval (305)
Length: 2
Field (4/4): samplingPacketSpace
0... .... .... .... = Pen provided: No
.000 0001 0011 0010 = Type: samplingPacketSpace (306)
Length: 4
Set 4 [id=256] (1 flows)
FlowSet Id: (Data) (256)
FlowSet Length: 24
[Template Frame: 5]
Flow 1
Metering Process Id: 92091
System Init Time: Oct 18, 2018 17:57:57.250000000 MDT
Selector Algorithm: Systematic count-based Sampling (1)
Sampling Packet Interval: 1
Sampling Packet Space: 0
The Cisco Stealthwatch 2000 throws the following netflow v9 errors.
Unsupported field in template 317 {:type=>70, :length=>3}
Unsupported field in template 318 {:type=>70, :length=>3}
Unsupported field in template 319 {:type=>29800, :length=>2}
Unsupported field in template 320 {:type=>29821, :length=>2}
Unsupported field in template 321 {:type=>70, :length=>3}
Unsupported field in template 322 {:type=>70, :length=>3}
Unsupported field in template 323 {:type=>29800, :length=>2}
Unsupported field in template 324 {:type=>29821, :length=>2}
Unsupported field in template 325 {:type=>29802, :length=>4}
Unsupported field in template 326 {:type=>70, :length=>3}
Unsupported field in template 327 {:type=>70, :length=>3}
Unsupported field in template 328 {:type=>29800, :length=>2}
Unsupported field in template 329 {:type=>70, :length=>3}
Unsupported field in template 330 {:type=>70, :length=>3}
Unsupported field in template 331 {:type=>29800, :length=>2}
Unsupported field in template 332 {:type=>29808, :length=>4}
Unsupported field in template 333 {:type=>29802, :length=>4}
Unsupported field in template 334 {:type=>29808, :length=>4}
These errors prevent the logstash codec from ever parsing a single flow from the device.
From https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html
MPLS_LABEL_1 | 70 | 3 | MPLS label at position 1 in the stack. This comprises 20 bits of MPLS label, 3 EXP (experimental) bits and 1 S (end-of-stack) bit. |
---|
This is a known bug in the logstash codec: https://github.com/logstash-plugins/logstash-codec-netflow/blob/master/RFC_COMPLIANCE_NETFLOW_v9.md
An implementation for the field would require editing the following files:
We might be able to simply skip the fields by editing the netflow.yaml file in our Logstash docker image to include :skip lines for the offending fields.
The stealthwatch also defines custom fields (29800 etc) in https://www.cisco.com/c/dam/en/us/td/docs/security/stealthwatch/Information_Elements/SW_7_0_Information_Elements_DV_1_0.pdf. We likely do not have to support these fields to get it to work.
Some IPFIX exporters give timestamps in relation to the initialization time of the exporter.
We use absolute timestamps to divide out the data into separate databases and provide useful information in RITA.
More discussion is needed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.