Giter Club home page Giter Club logo

ipfix-rita's People

Contributors

enbake-developer avatar ethack avatar samuelcarroll avatar william-stearns avatar zalgo2462 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

ipfix-rita's Issues

Manual Changes

  • Manual should mention where to send IPFix/Netflow records (IPFIX-RITA host: udp port 2055)
  • How to test if IPFix/Netflow records are arriving
  • How to test Records are arriving at the RITA system
  • Stop/Restart/Log commands for IPFIX-RITA added in
  • Add new Min/Max range for Docker & Docker Compose to main Readme

Bump RITA Output Version

RITA is moving to version 2.0. Thankfully, it appears the input schemas are backwards compatible. We will need to bump the ImportVersion version string in the MetaDatabase records we create to 2.0.

We might think about versioning the input schemas separate from the rest of the project to reduce this type of problem in the future.

This line simply needs to be changed to:
ImportVersion: "v2.0.0+ActiveCM-IPFIX",

Configuration Manager

We need a configuration manager to hold the MongoDB connection/ security settings (similar to RITA).

In addition, we need to be able to configure a session aggregate expiration threshold and a DBRoot value to name the RITA output databases with. (DBRoot-%Y-%m-%d)

Might as well use YAML to stay consistent with everything else.

Write a Better Wrapper Script

Currently the ipfix-rita script simply relays commands to docker-compose.
This can lead to user errors and confusion.

For example, the down command removes the ipfix-rita docker network. If the user has configured their mongodb server to listen on the docker host adapter for the ipfix-rita network, the user will need to update their mongodb config after bringing the system back up. We should just prevent users from issuing this command.

Additionally, the version command reports the docker compose version rather than ipfix-rita's.

Overall, we should provide a more user friendly wrapper script for controlling ipfix-rita.

Implement import_finished MetaDatabase Field

In order to prevent the corruption of RITA data as discussed in activecm/rita#237 , we must implement the import_finished MetaDatabase flag. This flag can be set to true in accordance with #28.

I propose we keep the filterless output module that is currently written in addition to the new filtered module. The filterless module can set the import_finished flag when the program exits. The existing module helps with testing.

Create Session Aggregate Struct

The session aggregate should have the same fields as ipfix.Flow.

However, a problem occurs when querying for sessions with the same flow key as a given flow. The source and destination might be swapped on the session aggregate being queried for. To solve this problem, session aggregates should consider themselves with Host A and Host B rather than source and destination hosts. Additionally, Host A and Host B must be ordered so there is only one way to query each session aggregate.

For example, requiring Host A comes before Host B in the alphabet ensures at most one session aggregate exists for each flow. Given a flow with SourceIPAddress: 222.222.222.222 and DestinationIPAddress: 111.111.111.111, a session aggregate could be found by querying for IPAddressA:111.111.111.111 and IPAddressB: 222.222.222.222.

Additionally, we may as well aggregate the flow information separately (A->B. and B->A) so we don't lose visibility in the debugging process. The separate counts for (A->B and B->A) can be merged when the record written out to RITA.

If a Flow looks something like this:

type Flow struct {
	Exporter        string
	SourceIPAddress string
	SourcePort      uint16

	DestinationIPAddress string
	DestinationPort      uint16

	FlowStartMilliseconds uint64
	FlowEndMilliseconds   uint64

	OctetTotalCount  uint64
	PacketTotalCount uint64

	ProtocolIdentifier protocols.Identifier
	IPClassOfService   uint8
	VlanID             uint16
	FlowEndReason      FlowEndReason
	Version            uint8
}

A session should look something like this

type Session struct {
	IPAddressA string
	PortA      uint16

	IPAddressB string
	PortB      uint16

	FlowStartMillisecondsAB uint64
	FlowEndMillisecondsAB   uint64

	FlowStartMillisecondsBA uint64
	FlowEndMillisecondsBA   uint64

	OctetTotalCountAB  uint64
	PacketTotalCountAB uint64

	OctetTotalCountBA  uint64
	PacketTotalCountBA uint64

	LastFlowEndReasonAB FlowEndReason
	LastFlowEndReasonBA FlowEndReasons

	//These fields should remain constant
	ProtocolIdentifier protocols.Identifier
	Exporter           string
	Version            uint8

	//We don't need these fields
	//IPClassOfService    uint8
	//VlanID              uint16
}

Create RITA Writer Subsystem

RITA Writer Subsystem

Current Status

Currently, IPFIX-RITA writes all of the resulting sessions to the screen using the output.SpewRITAConnWriter. This type implements the output.SessionWriter interface.

//SessionWriter writes session aggregates to their final destination
type SessionWriter interface {
	//Write writes the session aggregate to its final destination
	Write(sess *session.Aggregate) error
}

//SpewRITAConnWriter writes session aggregates out to the terminal
//as RITA conn objects
type SpewRITAConnWriter struct{}

//Write spews the session to the terminal as a RITA Conn object
func (s SpewRITAConnWriter) Write(sess *session.Aggregate) error {
	var conn parsetypes.Conn
	sess.ToRITAConn(&conn, func(ipAddress string) bool { return false })
	spew.Println(conn)
	return nil
}

Goal

Write output.RITAConnWriter which implements output.SessionWriter.

The RITAConnWriter must write session.Aggregate objects into MongoDB databases such that the resulting connection records can be processed by RITA.

Additionally, the RITAConnWriter must separate the connection records into databases based on when the connections started (parsetypes.Conn.Timestamp). A connection record with a Timestamp falling on June 18, 2018 should be written to [DBRoot]-2018-06-18. The DBRoot field should be provided by the user.

RITA Schema

The connection records must be inserted into a collection named conn in each RITA compatible database. Each conn collection must possess the indexes specified in RITA here. These indexes must be able to be changed easily as the existing indexes are likely not optimal.

When IPFIX-RITA is certain it will not insert any more records into a given RITA database, it must insert a MetaDatabase record. The MetaDatabase is specified in the RITA configuration. Within this database, a collection named databases holds the index of RITA compatible databases. The records within this collection follow the schema here. The import_version field should be set to the minimum version of RITA that IPFIX is compatible with.

If the Metadatabase does not exist, IPFIX-RITA must create it. The databases collection requires the index listed here.

Implementation Notes

The DBRoot option can be obtained from RITA configuration object provided by the environment.Environment type.

The MetaDatabase name will need to be added to the RITA configuration section or the field must be read from the RITA configuration file if RITA is installed.

The session.Aggregate type has a method ToRITAConn which performs the format conversion.

Figuring out when IPFIX-RITA will no longer insert data into a database is likely the most difficult part of implementing the RITA Writer Subsystem. Write calls will not necessarily occur in order, and the last place the order of the data is maintained is in stitching.Manager. We may be able to rely on the exporter/flusher clocks (stitching.flusher). (I don't have a solution for this yet)

See commands/convert.go for how to integrate a different output.SessionWriter into the program.

Prototype

A prototype should be created which doesn't split out the RITA connection records by day. Rather, it should simply dump all of the records into a single RITA compatible database. The Metadatabase record should be added at the end of execution.

Adapt activecm/mongodiff for ipfix-rita in order to test accuracy

In order to test the accuracy of IPFIX-RITA + YAF vs RITA + Bro IDS, we need a way to difference RITA conn collections. Currently, MongoDiff supports differencing entire databases. Unfortunately, there are fields that are left blank by IPFIX-RITA + YAF that are filled in by RITA + Bro IDS. The MongoDiff script will flag these as differences. The MongoDiff script should be adapted to our use case in order to ignore these differences.

Same Day Filter With Grace Period

As described in issue #14, we need to assure RITA that when we "finish" writing to a database, we won't write any more records to that database. The issue is, IPFIX-RITA doesn't "finish" writing to a database. If a pair of flow records come in with closing timestamps aligned with a given day, the program will insert the matched pair of flows into that day's database no matter what.

In order to add the idea of "finishing" a database import, we will implement a Same Day Filter With a Grace Period.

This filter is applied to the output module in the IPFIX-RITA converter pipeline.

Filter algorithm:

  • If the date of the closing timestamp of the session record is the current day, we insert the record into today's database.
  • If the date of the closing timestamp of the session record is the previous day, and today's grace period has not elapsed, we insert the record into yesterday's database.
  • Otherwise, we drop the session

The filter can be written in terms of arbitrary periods instead of being date aligned. For the more general algorithm, please see issue #14.

Currently, the grace period should be set to 4 minutes (5 minutes, but we short 1 minute to account for minor clock drift). Ideally, this grace period will be pushed back to 1 hour.

Related issues:

Ensure DB's import_finished field is marked on shutdown

When the user runs ipfix-rita stop, it is fair to say that the user expects to be able to analyze any data collected during the current day. We currently do not support this.

If we naively mark the database's import_finished field on exit, then the system will become unstable if the user starts the program within the same day. This is due to the fact that IPFIX-RITA does not check if the import_finished flag is set to true before it starts writing out results.

In order to support clean shutdowns, we need to do one of two things:

  • Check the import_finished flag before writing, if it is set, create a new database, otherwise carry on as usual.
  • Check the import_finished flag before writing, if it is set, check if RITA has started analysis (currently impossible), if it has started, create a new database, otherwise unset the import_finished flag and carry on as usual.

Since the latter requires a change in RITA and the latter could build on the prior, I suggest we implement the first option to start.

Implement Netflow v9

Netflow v9 / (Other Netflow formats)

Current Status

IPFIX-RITA only supports reading IPFIX records via MongoDB/ Logstash.
The schema of Netflow v9 records is unknown.

Implementation Choices

Several different architecture designs allow us to support other netflow formats with their own schemas.

Option 1: Normalize in Logstash

Detect v9 records in Logstash and use Logstash filters to map the v9 fields into IPFIX records.

Option 2: Send Both IPFIX and Netflow v9 Records to the same MongoDB collection

If we send both types of data, mixing schemas within the same MongoDB collection, we need to implement more complex parsing logic in the go code. However, no changes need to be made to the way data flows through the converter program.

Option 3: Send IPFIX and Netflow v9 Records to their own MongoDB collection

In this case, the parsing logic is made much simpler, but IPFIX-RITA must support reading data from several data sources.

Prototyping

We need to know what netflow v9 records look like when they come through Logstash to MongoDB.

Create Logstash Docker Image

As a subdirectory, create a docker image containing logstash spec'd out to our purposes.
This image can be used via docker compose along with IPFIX-RITA's main executable and MongoDB.

Provide an easy way to disable database rotation for testing

While working IPFIX-RITA it is often convenient to run data through the system that was not captured within the development day. In order to prevent out-of-time-segment errors, we should provide a flag which uses a simple MongoDB output module instead of the rotating date based MongoDB output module. A simple output module that carries this out can be found in the git history. Check out fe541cf

Error Management

Come up with a plan for proper error management.

Of particular importance is recording a stack trace with each error. errors.WithStack() can be used for this.

Solve IPFIX issue with MikroTik Router Logs

Currently it appears that there is an issue reading the IPFIX Logs incoming from the MikroTik router. The error follows ERRO[1082] input map must contain key 'netflow.flowStartMilliseconds'

The input map follows:
map[
_id:ObjectIdHex("5c004115354bf5fc1800008c")
netflow:map[
tcpControlBits:16
destinationTransportPort:80
destinationMacAddress:REDACTED
postSourceMacAddress:REDACTED
tcpWindowSize:4096
udpMessageLength:0
ingressInterface:2
flowEndSysUpTime:85237060
isMulticast:0
ipTotalLength:40
sourceIPv4Address:REDACTED
ipTTL:64
ipHeaderLength:5
octetDeltaCount:559
postNATSourceIPv4Address:REDACTED
postNATDestinationIPv4Address:REDACTED
ipNextHopIPv4Address:REDACTED
version:10
destinationIPv4Address:REDACTED
tcpAcknowledgementNumber:256845980
sourceIPv4PrefixLength:0
icmpCodeIPv4:0
flowStartSysUpTime:85222260
icmpTypeIPv4:0
packetDeltaCount:3
ipVersion:4
ipClassOfService:0
postNAPTSourceTransportPort:54507
sourceTransportPort:54507
destinationIPv4PrefixLength:0
egressInterface:0
protocolIdentifier:6
postNAPTDestinationTransportPort:80
igmpType:0
tcpSequenceNumber:661187750
]
@timestamp:"2018-11-29T19:42:11.000Z"
@Version:1
host:REDACTED
]

What appears to be happening is that we expect a flowStartMilliseconds field but in the above we have flowStartSysUpTime. In input/mgologstash/flow.go on line 133 we search the map for flowStartMilliseconds, and report an error if we don't have it. Also we check for flowEndMilliseconds on line 142. It would be ideal to add a check for the flowStartSysUpTime and flowEndSysUpTime to resolve this issue.

Add IPFIX-RITA to the PATH

We shouldn't have to type /opt/ipfix-rita/bin/ipfix-rita every time we want to use the program.

We could add our own file in /etc/profile.d and require users to run source /etc/profile in order to solve this problem.

Or if RITA is already installed, we could drop a symlink to ipfix-rita in RITA's bin folder.

Performance Testing

Currently, we do not know how the project scales with more CPU/ RAM. However, we do have rough numbers for logstash. We need to set up similar benchmarks so we can estimate what resources will be needed for various loads.

Add script which replays netflow/ ipfix data from a packet capture

In order to define test cases, we can create packet captures with various types of netflow traffic from different devices.

However, we need to be able to replay the netflow traffic in the pcap file in order to test IPFIX-RITA.

The easiest way to accomplish this will likely be to use a python script with scapy to read the data from the pcap file and send the data to a designated host.

Handle NAT Stitching

Some firewalls include NAT in IPFIX. In this case, simply take the postNATDestination* fields as the destination* fields when available.

{
    "_id" : ObjectId("5b58eed810a0cffdc9016777"),
    "@timestamp" : "\"2018-07-25T21:42:47.000Z\"",
    "host" : "123.45.67.89",
    "netflow" : {
        "packetDeltaCount" : 1,
        "egressInterface" : 2,
        "protocolIdentifier" : 184,
        "ipNextHopIPv4Address" : "0.0.0.0",
        "version" : 10,
        "ingressInterface" : 1,
        "postNAPTSourceTransportPort" : 18856,
        "postNATSourceIPv4Address" : "10.0.0.237",
        "sourceIPv4Address" : "192.168.168.65",
        "postNAPTDestinationTransportPort" : 53,
        "flowStartSysUpTime" : 24000,
        "sourceTransportPort" : 33016,
        "octetDeltaCount" : 61,
        "destinationTransportPort" : 53,
        "destinationIPv4Address" : "8.8.4.4",
        "flowEndSysUpTime" : 24000,
        "postNATDestinationIPv4Address" : "8.8.4.4"
    },
    "@version" : "1"
}

/* 2 */
{
    "_id" : ObjectId("5b58eed810a0cffdc9016778"),
    "@timestamp" : "\"2018-07-25T21:42:47.000Z\"",
    "host" : "123.45.67.89",
    "netflow" : {
        "packetDeltaCount" : 1,
        "egressInterface" : 1,
        "protocolIdentifier" : 17,
        "ipNextHopIPv4Address" : "10.0.0.1",
        "version" : 10,
        "ingressInterface" : 2,
        "postNAPTSourceTransportPort" : 53,
        "postNATSourceIPv4Address" : "8.8.4.4",
        "sourceIPv4Address" : "8.8.4.4",
        "postNAPTDestinationTransportPort" : 33016,
        "flowStartSysUpTime" : 24000,
        "sourceTransportPort" : 53,
        "octetDeltaCount" : 77,
        "destinationTransportPort" : 18856,
        "destinationIPv4Address" : "10.0.0.237",
        "flowEndSysUpTime" : 24000,
        "postNATDestinationIPv4Address" : "192.168.168.65"
    },
    "@version" : "1"
}

Allow DeltaCounts as TotalCounts

Some IPFIX exporters choose to use packetDeltaCount and octetDeltaCount over packetTotalCount and octectTotalCount even when such exporters are configured to only report on connection close. This is allowed within the IPFIX specification, and we should handle it.

Support Netflow v5

As it stands, we have implemented a subset of netflow v9 and IPFIX.

We would like to go forward with implementing netflow v5.

In collector/logstash/pipeline/ipfix.conf, we do not specify the versions to be processed by logstash, so netflow v5 data should make it into the MongoDB buffer collection. As it stands, we should be seeing the converter write an error out to the log whenever it tries to parse a netflow v5 record. This error should contain the text "unspported netflow version: 5". This error is thrown from FillFromBSONMap in ipfix-rita/converter/input/mgologstash/flow.go.

An investigation of the logstash source code reveals that each of the records produced should have the following fields

    ip4_addr :ipv4_src_addr
    ip4_addr :ipv4_dst_addr
    ip4_addr :ipv4_next_hop
    uint16   :input_snmp
    uint16   :output_snmp
    uint32   :in_pkts
    uint32   :in_bytes
    uint32   :first_switched
    uint32   :last_switched
    uint16   :l4_src_port
    uint16   :l4_dst_port
    uint8    :tcp_flags
    uint8    :protocol
    uint8    :src_tos
    uint16   :src_as
    uint16   :dst_as
    uint8    :src_mask
    uint8    :dst_mask
    uint32   :uptime
    uint32   :unix_sec
    uint32   :unix_nsec
    uint32   :flow_seq_num
    uint8    :engine_type
    uint8    :engine_id
    bit2     :sampling_algorithm
    bit14    :sampling_interval
    uint16   :version
type Flow interface {
	//SourceIPAddress returns the source IPv4 or IPv6 address
	SourceIPAddress() string
	//SourcePort returns the source transport port
	SourcePort() uint16
	//DestinationIPAddress returns the destination IPv4 or IPv6 address
	DestinationIPAddress() string
	//DestinationPort returns the destination transport port
	DestinationPort() uint16
	//ProtocolIdentifier returns which transport protocol was used
	ProtocolIdentifier() protocols.Identifier
	//FlowStartMilliseconds is the time the flow started as a Unix timestamp
	FlowStartMilliseconds() (int64, error)
	//FlowEndMilliseconds is the time the flow ended as a Unix timestamp
	FlowEndMilliseconds() (int64, error)
	//OctetTotalCount returns the total amount of bytes sent (including IP headers and payload)
	OctetTotalCount() int64
	//PacketTotalCount returns the number of packets sent from the source to the destination
	PacketTotalCount() int64
	//FlowEndReason returns why the metering process stopped recording the flow
	FlowEndReason() FlowEndReason
	//Version returns the IPFIX/Netflow version
	Version() uint8
	//Exporter returns the address of the exporting process for this flow
	Exporter() string
}

The mapping between these two looks something like this:

ipv4_src_addr   -> SourceIPAddress
l4_src_port     -> SourcePort
ipv4_dst_addr   -> DestinationIPAddress
l4_dst_port     -> DestinationPort
protocol        -> ProtocolIdentifier # We need to check if netflow v5 uses the same protocol numbers
first_switched  -> FlowStartMilliseconds
last_switched   -> FlowEndMilliseconds
in_bytes        -> OctectTotalCount
in_pkts         -> PacketTotalCount
NilEndReason    -> FlowEndReason # Netflow v5 does not support this information
version         -> Version
host            -> Exporter # Logstash tells us what machine sent it data in the host field of each record

We will need to add a method fillFromNetflowv5BSONMap to ipfix-rita/converter/input/mgologstash/flow.go and dispatch to it based on the version value in the method FillFromBSONMap. It should be rather to base this new method off of fillFromNetflowv9BSONMap.

At the moment, I do not see any reason other changes need to be made. Once we map the incoming BSON data to flow records to flow objects, the data should pass easily through the rest of the program.

docker-compose.yaml missing

Unable to start solution as the README file indicates because there is no docker-compose.yaml in the docker/ folder. Was it moved?

Integration Testing Harness

This project relies heavily on MongoDB and will need an integration testing harness that provides access to a test MongoDB server (preferably managed by Docker).

Add Debugging to README

Need to add some debugging code to the landing README page, this will help customers determine issues with their install of IPFIX-RITA

Wrapper Script

We need to present a user friendly interface on top of docker-compose.

(More information will be added later)

Log Rollover Issue

When running IPFIX-RITA capturing logs starting at 4PM and going until about 6:15PM MST on December 6th I discovered that the logs rolled over and started saving to December 7th.

I believe this is caused by MST being UTC-07:00 and 5PM would be midnight on December 7th so the logs automatically rolled over starting around that time.

If possible we should have IPFIX-RITA auto-detect and to adjust it's time zone based on system time.

I discovered that we can take UTC 00:00 to local time using the following code

    rn := time.Now() //get the system time right now
    _, secDiff := rn.Zone() //get the time zone difference (in seconds)

    //When going from UTC to local just adding the difference in nanoseconds
    //  We multiply by 1000000000 because duration is an int64 measured in nanoseconds
    secDiffDur := int64(secDiff) * int64(1000000000)
    time := logTime.Add(time.Duration(secDiffDur))

Create Screen Writer

In order to test the collector, converter connection, a screen writer which pulls its data from the MongoDB queue should be written.

Match With Best Fitting Flow Rather Than First Flow

In the converter project, we stitch IPFIX flow data into RITA/ Bro sessions. Flow data is unidirectional (there are two records describing the communications from host A to host B and from host B to host A) while Bro sessions are bidirectional (there is one record describing the communications from host A to host B and from host B to host A). The main stitching logic is located in converter/stitching/stitching.go

https://github.com/activecm/ipfix-rita/blob/master/converter/stitching/stitcher.go#L121
The current algorithm in psuedocode:

Input: A session.Aggregate f , A Matcher m
Output: A Merged Session Aggregate or Nothing

matches := m.Find(f.AggregateQuery) //Finds all session aggregates which may match f
matchFound := false
Loop over matches as nextMatch: 
   if shouldMerge(f, nextMatch):
        matchFound = true
        f.merge(nextMatch) //changes f
        if f.FilledFromSourceA and f.FilledFromSource: //The session has both sides of the connection detailed
            m.Remove(f)
            return f
        else: //The merge happened on the same side of the connection
            m.Update(nextMatch, f) //overwrite the matching aggregate in the matcher with the merged
            return

if not matchFound: //either len(matches) = 0 or shouldMerge returned false for all results
    m.Insert(f)
    return

This algorithm is greedy in that it takes the first session.Aggregate which matches the input and passes the shouldMerge() check.

Ideally, we should merge the input with the best fitting session.

A merge cost can be computed as abs(f_FlowEndMilliseconds - nextMatch_FlowEndMilliseconds) + abs(f_FlowStartMilliseconds - nextMatch_FlowStartMillseconds)

Unfortunately, the FlowEndMilliseconds and FlowStartMilliseconds are split between the AB and BA sides of the session.Aggregate.

During stitching only one side of the session should be filled and the following code snippet can be used to get the correct field. The following snippets should also work for FlowStart (in the second snippet, change the > sign to a < sign)

var f_FlowEndMilliseconds int64
if f.FilledFromSourceA:
    f_FlowEndMilliseconds = f.FlowEndMillisecondsAB
else if f.FilledFromSourceB:
    f_FlowEndMilliseconds = f.FlowEndMillisecondsBA
else:
    //shouldn't happen during the stitching process

However, the following snippet handles sessions which may have both sides of the connection filled

var f_FlowEndMilliseconds int64
if f.FilledFromSourceA && f.FilledFromSourceB:
    if f.FlowEndMillisecondsAB > f.FlowEndMillisecondsBA:
        f_FlowEndMilliseconds = f.FlowEndMillisecondsAB
    else:
        f_FlowEndMilliseconds = f.FlowEndMillsecondsBA
else if f.FilledFromSourceA:
    f_FlowEndMilliseconds = f.FlowEndMillisecondsAB
else if f.FilledFromSourceB:
    f_FlowEndMilliseconds = f.FlowEndMillsecondsBA
else:
    //no data available. leave zero

It may be worthwhile to add this to the session struct as a method.

In the new algorithm, we compute the cost for each possible match (after passing shouldMerge). If the new cost is lower than the existing cost (initialized to MAX_INT), then we store the nextMatch and update the existing cost. After iterating, if the stored nextMatch is not nil, then we perform the update logic with the stored nextMatch.

Overall the new algorithm will look like:

Input: A session.Aggregate f , A Matcher m
Output: A Merged Session Aggregate or Nothing

matches := m.Find(f.AggregateQuery) //Finds all session aggregates which may match f
match := session.Aggregate{}
matchFound := false
matchCost := MAX_INT
Loop over matches as nextMatch: 
   if shouldMerge(f, nextMatch):
        matchFound = true
        compute  f_FlowStartMilliseconds, nextMatch_FlowStartMilliseconds, f_FlowEndMilliseconds, nextMatch_FlowEndMilliseconds 
        newMatchCost = abs(f_FlowStartMilliseconds - nextMatch_FlowStartMilliseconds) + abs(f_FlowEndMilliseconds - nextMatch_FlowEndMilliseconds)
        if newMatchCost < matchCost:
            match = nextMatch
            newMatchCost = matchCost

if matchFound:
    f.merge(match) //changes f
    if f.FilledFromSourceA and f.FilledFromSource: //The session has both sides of the connection detailed
        m.Remove(match)
        return f
    else: //The merge happened on the same side of the connection
        m.Update(match, f) //overwrite the matching aggregate in the matcher with the merged
        return
else:
    m.Insert(f)
    return

Modify install script

Currently the install script stops and has the end user modify the mongod.conf to have the docker IP address, then restart mongo.service

This might be the most confusing step of the install, and it might be better to prompt the user to enter the IP Address of the RITA system (or hit enter if they are installing it local) then have the install script automatically add the IP to the mongod.conf file and restart mongo.

Filter IPs

In an upcoming release of IPFIX-RITA we want to limit the number of connections we store. This reduces the size of the conn table in RITA and makes analyzing data faster

This isn't an urgent concern yet, need to wait for it to be implemented in RITA first

  • Filter external IP to external IP
  • Filter internal IP to internal IP
  • Ensure connections on "alwaysinclude" list are present (nameservers, possible internal http proxies)

Update/Add Manual

Add a Manual to IPFix-Rita that writes out the stages for installing IPFix-Rita, and includes a disclaimer about this being a beta and how to contact us if there are any issues

ipfix-rita script unable to launch

Initialization script (ipfix-rita) looks for environment variables that don't exist, causing the script to fail to launch.

ERROR: Invalid interpolation format for "image" option in service "logstash": "quay.io/activecm/ipfix-rida-logstash:${IPFIX-RITA-VERSION:-latest}"

Remove Version from tar file

When a new release is produced using the make-release.sh script it tags both the tar file and base directory with the version number, we need to remove the version number from both the tar file and base directory names.

Document Needed Changes to RITA

Needed RITA Changes

Current Status

It is currently unknown what happens if RITA runs with only connectoin records (without dns/ http records).

RITA should be able to function without http and dns information with minimal changes.

How to help

Import a dataset with RITA, connect to MongoDB, delete the http and dns collections, and run the analyze step. Repeatedly note where the program breaks.

Write Stitching.Manager Integration Tests

Stitching.Manager Integration Tests

Current Status

No integration tests have been written to ensure IPFIX flows are stitched together correctly.

Goal

Ensure stiching.Manager correctly stitches flows.

Implementation Details

Terminology

  • Flow: A one-way connection from one host to another
  • Flow Key/ Aggregate Query: The information which uniquely identifies a session at a given time.
    • Two IP addresses (Source/ Destination, IPAddressA/ IPAddressB)
    • Two ports (Source/ Destination, PortA/ PortB)
    • The protocol
    • The IP address of the IPFIX exporter which sent us the flows which make up the session
  • Session Aggregate: A two-way connection from one host to another and back. Created by aggregating Flows
  • sameSessionTimeout: If a second flow starts after another flow ended, it may be aggregated with it to create a session. The flows are only aggregated if the second flow began after the first flow ended and the amount of time passed between the flows is less than or equal to the sameSessionTimeout.

Getting Started

A new integration test can be created by creating the file manager_test.go. Label the package as stiching_test. Next, create a standard go test function TextXXX(t *testing.T) {}. Then, start a new integration test using

	env, cleanup := environmenttest.SetupIntegrationTest(t)
	defer cleanup()

The env variable contains configuration information, and most importantly, a connection to a dockerized MongodDB.

From here, instantiate a new stitching.Manager, create a go routine to feed the input channel, and a simple writer implementation to retrieve the outputs from the manager.

The require package is immensely helpful in go tests. See the other test files for usage.

A fake flow with random data can be generated with ipfix.NewFlowMock(), then the needed fields can be overwritten as needed.

The Stitching Process

When a flow is added to a session aggregate, the flow's one-way data is added to a two-way data structure. While a flow has a field OctetTotalCount representing the amount of bytes sent from SourceIPAddress to DestinationIPAddress, a session aggregate has two fields OctetTotalCountAB and OctetTotalCountBA representing the amount of bytes sent between IPAddressA and IPAddressB in each direction. The true source and destination of the session aggregate are determined when the aggregate is ready to be written out. If FlowStartMillisecondsAB < FlowStartMillisecondsBA, then IPAddressA is considered the source and IPAddressB is considered the destination, otherwise IPAddressB is the source and IPAddressA is the destination.

It is critical that flows are assigned to the AB and BA sections of session aggregates in a non-ambiguous, reproducible manner. To ensure this happens, the host (SourceIPAddress or DestinationIPAddress) which comes first alphabetically within a flow, will be assigned to a session aggregate's IPAddressA field, and the other host will be assigned to the IPAddressB field.

Zero-Stitched Flows

When a session aggregate is first created from a single flow, the flow is said to be stitched with zeroes. Since a flow only contains one-way data, the other side of the connection cannot be known. This lack of data is represented with zero values.

For example given the flow:

flow1 := ipfix.NewFlowMock()
//Ensure the source comes before the destination alphabetically
//to ensure the source is mapped to host "A", and the destination is
//mapped to host "B"
flow1.MockSourceIPAddress = "1.1.1.1"
flow1.MockDestinationIPAddress = "2.2.2.2"
flow1.MockProtocolIdentifier = protocols.ICMP
flow1.MockFlowEndReason = ipfix.IdleTimeout

we can create a session aggregate:

var sessAgg session.Aggregate
srcMapping, err := session.FromFlow(flow, &sessAgg)
if err != nil {
	return err
}

srcMapping lets us know whether the flow was mapped into the AB portion of the session aggregate or the BA portion. We can check this like so:

if srcMapping == session.ASource { //flow mapped into AB portion }
else { //flow mapped into BA portion }

Our sessAgg contains the following data (psuedocode):

sessAgg = struct {
	AggregateQuery: struct { 
		IPAddressA: "1.1.1.1"
		PortA: <random port from mock flow>

		IPAddressB:  "2.2.2.2"
		PortB: <random port from mock flow>

		ProtocolIdentifier: protocols.ICMP

		Exporter: <random ip address from mock flow>`
	}

	FlowStartMillisecondsAB:  <random time from mock flow>
	FlowEndMillisecondsAB:  <random time from mock flow>
	FlowStartMillisecondsBA: 0
	FlowEndMillisecondsBA:  0

	OctetTotalCountAB: <random amount of data from mock flow>
	OctetTotalCountBA: 0
	PacketTotalCountAB: <random amount of data from mock flow>
	PacketTotalCountBA: 0

	FlowEndReasonAB: <random reason from mock flow>
	FlowEndReasonBA: ipfix.Nil
}

Since all of the BA fields (FlowStartMillisecondsBA, FlowEndMillisecondsBA, OctetTotalCountBA, and FlowEndReasonBA) are zero (or ipfix.Nil in the case of FlowEndReason), the flow is said to have been stitched with zeroes.

Sessions From Multiple Flows

Additional flows can be aggregated into an existing session aggregate provided the flow has a matching Flow Key / Aggregate Query. First, the new flow is mapped into either the AB or BA section of the session aggregate. The OctetTotalCount and PacketTotalCount of the flow are added to the corresponding fields in the matching AB/ BA section of the aggregate. If the FlowStartMilliseconds of the new flow is earlier than corresponding field in the session aggregate or if the field is unset in the aggregate, the value is overwritten with the flow's. If the FlowEndMilliseconds of the new flow is later than the corresponding field in the session aggregate or if the field is unset in the aggregate, the value is overwritten with the flow's and the FlowEndReason is updated as well.

When to Stitch Flows

Flows should only be stitched when it logically makes sense. Unfortunately, this depends on the protocol.

When to Stitch ICMP(v6) (and Unknown Protocol) Flows

ICMP is a connectionless protocol in which sessions are not maintained (@william-stearns can you confirm?). In this case, flows are never stitched together. They are all stitched with zeroes.

We use this model to deal with unknown protocols as well.

When to Stitch UDP

UDP is a connectionless protocol in which sessions are not maintained at the protocol level. However, UDP sessions are often maintained in the application layer unlike ICMP. Since UDP has no support for sessions at the protocol layer, the most common FlowEndReason for UDP flows is simply IdleTimeout. In order to recover the application layer sessions, we have to make intelligent guesses.

Our guessing scheme relies on the sameSessionTimeout. If a flow from host A to host B ended at time tFlowEnd, and another flow from host A to host B or host B to host A starts at time tFlowStart, then the two flows will be stitched if tFlowStart <= tFlowEnd + sameSessionTimeout. Otherwise the two flows will each be stitched with zeroes.

When to Stitch TCP

TCP is a connection oriented protocol, so we don't have to make as many guesses. However, TCP monitoring isn't perfect. During normal operation, TCP flows will have FlowEndReason fields marked with EndOfFlow. EndOfFlow signals one end of the TCP session was tore down cleanly.

When the FlowEndReason for a given side of a session aggregate (AB/BA) is EndOfFlow, we make sure no more flows are stitched into that side of the aggregate. Otherwise, TCP is treated the same way as UDP.

Test Cases

The above information is summarized in the following table of test cases.
A Google Sheet with the same information can be found here

Test Name Protocol Flow1 EndFlowReason Flow 1 Direction Flow 2 Direction Flow2.FlowStartMilliseconds <= Flow1.EndMilliseconds + sameSessionTimeout # Of Output Session Aggregates # Of Errors Session Aggregate 1 Results AB Session Aggregate 1 Results BA Session Aggregate 2 Results AB Session Aggregate 2 Results BA
wn
SingleICMPFlow ICMP IdleTimeout A -> B N/A N/A 1 0 Flow 1 Values Flow 1 Values N/A N/A
TwoICMPFlowsSameSourceInTimeout ICMP IdleTimeout A -> B A -> B True 2 0 Flow 1 Values Zeroes Flow 2 Values Zeroes
TwoICMPFlowsSameSourceOutOfTimeout ICMP IdleTimeout A -> B A -> B False 2 0 Flow 1 Values Zeroes Flow 2 Values Zeroes
TwoICMPFlowsFlippedSourceInTimeout ICMP IdleTimeout A -> B B -> A True 2 0 Flow 1 Values Zeroes Zeroes Flow 2 Values
TwoICMPFlowsFlippedSourceOutOfTimeout ICMP IdleTimeout A -> B B -> A False 2 0 Flow 1 Values Zeroes Zeroes Flow 2 Values
SingleUDPFlow UDP IdleTimeout A -> B N/A N/A 1 0 Flow 1 Values Zeroes N/A N/A
TwoUDPFlowsSameSourceInTimeout UDP IdleTimeout A -> B A -> B True 1 0 Flow 1 Values + Flow 2 Values Zeroes N/A N/A
TwoUDPFlowsSameSourceOutOfTimeout UDP IdleTimeout A -> B A -> B False 2 1 Flow 1 Values Zeroes Flow 2 Values Zeroes
TwoUDPFlowsFlippedSourceInTimeout UDP IdleTimeout A -> B B -> A True 1 0 Flow 1 Values Flow 2 Values N/A N/A
TwoUDPFlowsFlippedSourceOutOfTimeout UDP IdleTimeout A -> B B -> A False 2 1 Flow 1 Values Zeroes Zeroes Flow 2 Values
SingleTCPIdleOutFlow TCP IdleTimeout A -> B N/A N/A 1 0 Flow 1 Values Zeores N/A N/A
TwoTCPIdleOutFlowsSameSourceInTimeout TCP IdleTimeout A -> B A -> B True 1 0 Flow 1 Values + Flow 2 Values Zeroes N/A N/A
TwoTCPIdleOutFlowsSameSourceOutOfTimeout TCP IdleTimeout A -> B A -> B False 2 1 Flow 1 Values Zeroes Flow 2 Values Zeroes
TwoTCPIdleOutFlowsFlippedSourceInTimeout TCP IdleTimeout A -> B B -> A True 1 0 Flow 1 Values Flow 2 Values N/A N/A
TwoTCPIdleOutFlowsFlippedSourceOutOfTimeout TCP IdleTimeout A -> B B -> A False 2 1 Flow 1 Values Zeroes Zeroes Flow 2 Values
SingleTCPEOFFlow TCP EndOfFlow A -> B N/A N/A 1 0 Flow 1 Values Zeroes N/A N/A
TwoTCPEOFFlowsSameSourceInTimeout TCP EndOfFlow A -> B A -> B True 2 1 Flow 1 Values Zeroes Zeroes Flow 2 Values
TwoTCPEOFFlowsSameSourceOutOfTimeout TCP EndOfFlow A -> B A -> B False 2 1 Flow 1 Values Zeroes Zeroes Flow 2 Values
TwoTCPEOFFlowsFlippedSourceInTimeout TCP EndOfFlow A -> B B -> A True 1 0 Flow 1 Values Flow 2 Values N/A N/A
TwoTCPEOFFlowsFlippedSourceOutOfTimeout TCP EndOfFlow A -> B B -> A False 2 1 Flow 1 Values Zeroes Zeroes Flow 2 Values

Support Environment Variables in Configuration

In order to better support the docker environment, the collector should be able to expand environment variables in its configuration file. (or implement a environment variable configuration provider).

This way the MongoDB uri can be configured without rebuilding the docker image.

RITA does this here

Provide an Easy Way to Mount in TLS Certificates

The config file allows the user to specify a TLS certificate for connecting to MongoDB. However, the path the config file points to is not loaded into the docker image, so the path resolution fails. We need to figure out a good way to handle this. When we package RITA for docker, we will need to confront this same issue.

Align output databases to system timezone

Currently we split the output databases by day in the UTC timezone. We need to offset the output record timestamps with the current system timezone before we determine the corresponding date. That way records are sorted by day in the user's timezone.

Support PFSense/ Softflowd

PFSense is a common OS used for routing based on BSD. In order to support IPFIX, PFSense employs the softflowd module.

Softflowd is an open source software based netflow exporter for Linux and BSD.

Currently Unsupported IPFIX Fields

ElementID Name Abstract Data Type Data Type Semantics Status Description
21 flowEndSysUpTime unsigned32   current The relative timestamp of the last packet of this Flow. It indicates the number of milliseconds since the last (re-)initialization of the IPFIX Device (sysUpTime). sysUpTime can be calculated from systemInitTimeMilliseconds.
22 flowStartSysUpTime unsigned32   current The relative timestamp of the first packet of this Flow. It indicates the number of milliseconds since the last (re-)initialization of the IPFIX Device (sysUpTime). sysUpTime can be calculated from systemInitTimeMilliseconds.
160 systemInitTimeMilliseconds dateTimeMilliseconds default current The absolute timestamp of the last (re-)initialization of the IPFIX Device.

systemInitTimeMilliseconds is sent in a data flow (matching an option template) along with the IPFIX templates every 16 packets. See https://github.com/irino/softflowd/blob/master/ipfix.c#L421

Raw wireshark example dump:

Set 3 [id=3] (Options Template): 256
    FlowSet Id: Options Template (V10 [IPFIX]) (3)
    FlowSet Length: 30
    Options Template (Id = 256) (Scope Count = 1; Data Count = 4)
        Template Id: 256
        Total Field Count: 5
        Scope Field Count: 1
        Field (1/1) [Scope]: meteringProcessId
            0... .... .... .... = Pen provided: No
            .000 0000 1000 1111 = Type: meteringProcessId (143)
            Length: 4
        Field (1/4): systemInitTimeMilliseconds
            0... .... .... .... = Pen provided: No
            .000 0000 1010 0000 = Type: systemInitTimeMilliseconds (160)
            Length: 8
        Field (2/4): selectorAlgorithm
            0... .... .... .... = Pen provided: No
            .000 0001 0011 0000 = Type: selectorAlgorithm (304)
            Length: 2
        Field (3/4): samplingPacketInterval
            0... .... .... .... = Pen provided: No
            .000 0001 0011 0001 = Type: samplingPacketInterval (305)
            Length: 2
        Field (4/4): samplingPacketSpace
            0... .... .... .... = Pen provided: No
            .000 0001 0011 0010 = Type: samplingPacketSpace (306)
            Length: 4

Set 4 [id=256] (1 flows)
    FlowSet Id: (Data) (256)
    FlowSet Length: 24
    [Template Frame: 5]
    Flow 1
        Metering Process Id: 92091
        System Init Time: Oct 18, 2018 17:57:57.250000000 MDT
        Selector Algorithm: Systematic count-based Sampling (1)
        Sampling Packet Interval: 1
        Sampling Packet Space: 0

Support Cisco Stealthwatch

The Cisco Stealthwatch 2000 throws the following netflow v9 errors.

 Unsupported field in template 317 {:type=>70, :length=>3}
 Unsupported field in template 318 {:type=>70, :length=>3}
 Unsupported field in template 319 {:type=>29800, :length=>2}
 Unsupported field in template 320 {:type=>29821, :length=>2}
 Unsupported field in template 321 {:type=>70, :length=>3}
 Unsupported field in template 322 {:type=>70, :length=>3}
 Unsupported field in template 323 {:type=>29800, :length=>2}
 Unsupported field in template 324 {:type=>29821, :length=>2}
 Unsupported field in template 325 {:type=>29802, :length=>4}
 Unsupported field in template 326 {:type=>70, :length=>3}
 Unsupported field in template 327 {:type=>70, :length=>3}
 Unsupported field in template 328 {:type=>29800, :length=>2}
 Unsupported field in template 329 {:type=>70, :length=>3}
 Unsupported field in template 330 {:type=>70, :length=>3}
 Unsupported field in template 331 {:type=>29800, :length=>2}
 Unsupported field in template 332 {:type=>29808, :length=>4}
 Unsupported field in template 333 {:type=>29802, :length=>4}
 Unsupported field in template 334 {:type=>29808, :length=>4}

These errors prevent the logstash codec from ever parsing a single flow from the device.

From https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html

MPLS_LABEL_1 70 3 MPLS label at position 1 in the stack. This comprises 20 bits of MPLS label, 3 EXP (experimental) bits and 1 S (end-of-stack) bit.

This is a known bug in the logstash codec: https://github.com/logstash-plugins/logstash-codec-netflow/blob/master/RFC_COMPLIANCE_NETFLOW_v9.md

An implementation for the field would require editing the following files:

We might be able to simply skip the fields by editing the netflow.yaml file in our Logstash docker image to include :skip lines for the offending fields.

The stealthwatch also defines custom fields (29800 etc) in https://www.cisco.com/c/dam/en/us/td/docs/security/stealthwatch/Information_Elements/SW_7_0_Information_Elements_DV_1_0.pdf. We likely do not have to support these fields to get it to work.

Handle More IPFIX Timestamp Schemes

Some IPFIX exporters give timestamps in relation to the initialization time of the exporter.

We use absolute timestamps to divide out the data into separate databases and provide useful information in RITA.

More discussion is needed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.