Giter Club home page Giter Club logo

networkcodingrev's Introduction

NetworkCodingRev

Implementatioin of intra-session network coding. (This project deprecates NetworkCoding repo.)

  1. It supports both of reliable transmission and best-effort transmission.
  2. Packet order is preserved.
  3. Unlike tcp, data is not stream. Sender sends X bytes packet then receiver receives X bytes packet.
  4. It is easy to use. Please refer to main.cpp.
  5. It supports IPv4 and IPv6.
  6. Java native interface is supported.

Example code

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <iostream>
#include <thread>
#include "api.h"

using namespace std;

#define USE_RX_CALLBACK (0)

void SyncReceive(const char *port)
{
    std::cout << __FUNCTION__ << std::endl;
    FILE *p_File = nullptr;
    float mbyte_received = 0;
    float total_mbyte_received = 0;
    uint32_t samples = 0;

    uint8_t buffer[1500];
    uint16_t buffer_length = sizeof(buffer);
    union {
        sockaddr_in ip4;
        sockaddr_in6 ip6;
    } addr;
    uint32_t addr_length = sizeof(addr);

    void *handle = InitSocket(port, 500, 500, nullptr);
    const bool ret = Receive(handle, buffer, &buffer_length, &addr, &addr_length, 0);
    if (ret)
    {
        buffer[buffer_length] = 0;
        p_File = fopen((char *)buffer, "w");
        if (p_File)
        {
            std::cout << "Create File " << (char *)buffer << std::endl;
        }
        else
        {
            std::cout << "Cannot create file " << (char *)buffer << std::endl;
            exit(-1);
        }
    }
    std::thread bwchk = std::thread([&p_File, &total_mbyte_received, &mbyte_received, &samples]() {
        while (p_File)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            total_mbyte_received += mbyte_received;
            mbyte_received = 0;
            samples++;
            printf("[AVG %5.5f MB/s][%u seconds]\n", total_mbyte_received / samples, samples);
            fflush(stdout);
        }
    });
    bwchk.detach();
    do
    {
        buffer_length = sizeof(buffer);
        addr_length = sizeof(addr);
        if (Receive(handle, buffer, &buffer_length, &addr, &addr_length, 0))
        {
            if (!(buffer_length == 1 && buffer[0] == 0xff))
            {
                fwrite(buffer, 1, buffer_length, p_File);
                mbyte_received += ((float)buffer_length / (1000000.));
            }
        }

    } while (!(buffer_length == 1 && buffer[0] == 0xff));
    fclose(p_File);
    p_File = nullptr;
    FreeSocket(handle);
    handle = nullptr;
}

void AsyncReceive(const char *port)
{
    std::cout << __FUNCTION__ << std::endl;
    FILE *p_File = nullptr;
    float mbyte_received = 0;
    float total_mbyte_received = 0;
    uint32_t samples = 0;

    bool done = false;
    void *handle = InitSocket(
        port,
        500,
        500,
        [&p_File, &total_mbyte_received, &mbyte_received, &samples, &done](uint8_t *const buffer, const uint16_t length, const void *const address, const uint32_t sender_addr_len) {
            if (p_File == nullptr)
            {
                buffer[length] = 0;
                p_File = fopen((char *)buffer, "w");
                std::thread bwchk = std::thread([&total_mbyte_received, &mbyte_received, &samples, &done]() {
                    while (!done)
                    {
                        std::this_thread::sleep_for(std::chrono::seconds(1));
                        total_mbyte_received += mbyte_received;
                        mbyte_received = 0;
                        samples++;
                        printf("[AVG %5.5f MB/s][%u seconds]\n", total_mbyte_received / samples, samples);
                        fflush(stdout);
                    }
                });
                bwchk.detach();
            }
            else
            {
                if (!(length == 1 && buffer[0] == 0xff))
                {
                    fwrite(buffer, 1, length, p_File);
                    mbyte_received += ((float)length / (1000000.));
                }
                else
                {
                    done = true;
                }
            }
        });
    while (!done)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    fclose(p_File);
    p_File = nullptr;
    FreeSocket(handle);
    handle = nullptr;
}

int main(int argc, char *argv[])
{
    if (argc == 3)
    {
        std::cout << "Receive Mode" << std::endl;
        if (std::string(argv[1]).compare("Sync") == 0)
        {
            SyncReceive(argv[2]);
        }
        else if (std::string(argv[1]).compare("Async") == 0)
        {
            AsyncReceive(argv[2]);
        }
        else
        {
            std::cout << "Invalid receive mode " << argv[1] << std::endl;
        }
    }
    else if (argc == 5)
    {
        std::cout << "Send Mode" << std::endl;
        void *handle = InitSocket(argv[1], 500, 500, nullptr);
        do
        {
            std::cout << "Connect to " << argv[2] << ":" << argv[3] << "." << std::endl;
        } while (false == Connect(handle, argv[2], argv[3], 1000, 0, 32, 0));

        FILE *p_File = nullptr;
        p_File = fopen(argv[4], "r");
        if (p_File == nullptr)
        {
            std::cout << "Cannot open file " << argv[4] << std::endl;
            exit(-1);
        }

        unsigned char buffer[1424];
        size_t readbytes;
        Send(handle, argv[2], argv[3], (unsigned char *)argv[4], strlen(argv[4]));
        while ((readbytes = fread(buffer, 1, sizeof(buffer), p_File)) > 0)
        {
            Send(handle, argv[2], argv[3], buffer, readbytes);
        }
        buffer[0] = 0xff;
        Send(handle, argv[2], argv[3], buffer, 1);
        WaitUntilTxIsCompleted(handle, argv[2], argv[3]);
        fclose(p_File);
        FreeSocket(handle);
    }
    else
    {
        std::cout << "Send Mode: run localport remoteIP remoteport filename" << std::endl;
        std::cout << "Receive Mode: run {Sync, Async} localport" << std::endl;
    }
    return 0;
}

networkcodingrev's People

Contributors

dujeonglee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

networkcodingrev's Issues

Support of multicast session is required

Intra-session network coding has more gains in multicast transmission cases.
Multicast for reliable and best-effort transmission modes will be the most useful feature of this library.

Sometimes segmentation fault in rx.cpp

                if((*pp_block)->m_DecodingReady)
                {
                    p_block = (*pp_block);
                    if(p_block->Decoding())
                    {
                        c_Session->m_SequenceNumberForService++;
                    }
                }
                else
                {
                    for(u08 i = 0 ; i < (*pp_block)->m_DecodedPacketBuffer.size() ; i++)
                    {
                        if(i != reinterpret_cast<Header::Data*>((*pp_block)->m_DecodedPacketBuffer[i].get())->m_ExpectedRank-1)
                        {
                            break;
                        }
                        do
                        {
                            u08* pkt;
                            try
                            {
                                TEST_EXCEPTION(std::bad_alloc());
                                pkt = new u08[reinterpret_cast<Header::Data*>((*pp_block)->m_DecodedPacketBuffer[i].get())->m_TotalSize];
                     This Line->memcpy(pkt, (*pp_block)->m_DecodedPacketBuffer[i].get(), reinterpret_cast<Header::Data*>((*pp_block)->m_DecodedPacketBuffer[i].get())->m_TotalSize); <- This line.
                                while(c_Session->m_RxTaskQueue.Enqueue([this, pkt](){
                                    c_Reception->m_RxCallback(pkt+sizeof(Header::Data)+reinterpret_cast<Header::Data*>(pkt)->m_MaximumRank-1, ntohs(reinterpret_cast<Header::Data*>(pkt)->m_PayloadSize), &c_Session->m_SenderAddress, sizeof(c_Session->m_SenderAddress));
                                    delete [] pkt;
                                })==false);
                                reinterpret_cast<Header::Data*>((*pp_block)->m_DecodedPacketBuffer[i].get())->m_Flags |= Header::Data::DataHeaderFlag::FLAGS_CONSUMED;
                                break;
                            }
                            catch (const std::bad_alloc& ex)
                            {
                                EXCEPTION_PRINT;
                            }
                        }
                        while(1);
                    }
                    break;
                }

Stack Trace
__memcpy_avx_unaligned () at ../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S:238
238 ../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S: No such file or directory.
(gdb) bt
#0 __memcpy_avx_unaligned () at ../sysdeps/x86_64/multiarch/memcpy-avx-unaligned.S:238
#1 0x000000000041f490 in NetworkCoding::ReceptionBlock::Receive (this=0x7fffe0089ea0,
buffer=0x7ffff6f4e880 "\001\004\024\t%\t(\t5\004\004\002", <incomplete sequence \302>, length=1044, sender_addr=0x7ffff6f4e7f0,
sender_addr_len=16) at rx.cpp:492
#2 0x0000000000420101 in NetworkCoding::ReceptionSession::Receive (this=0x7fffe00008c0,
buffer=0x7ffff6f4e880 "\001\004\024\t%\t(\t5\004\004\002", <incomplete sequence \302>, length=1044, sender_addr=0x7ffff6f4e7f0,
sender_addr_len=16) at rx.cpp:615
#3 0x000000000042042b in NetworkCoding::Reception::RxHandler (this=0x65a5e0,
buffer=0x7ffff6f4e880 "\001\004\024\t%\t(\t5\004\004\002", <incomplete sequence \302>, size=1044, sender_addr=0x7ffff6f4e7f0,
sender_addr_len=16) at rx.cpp:640
#4 0x000000000042668e in NetworkCoding::NCSocket::NCSocket(unsigned short, long, long, std::function<void (unsigned char*, unsigned short, sockaddr_in const*, unsigned int)>)::{lambda()#1}::operator()() const () at ncsocket.h:124
#5 0x00000000004283be in std::_Bind_simple<NetworkCoding::NCSocket::NCSocket(unsigned short, long, long, std::function<void (unsigned char*, unsigned short, sockaddr_in const*, unsigned int)>)::{lambda()#1} ()>::_M_invoke<>(std::_Index_tuple<>) (this=0x65a6b8)
at /usr/include/c++/5/functional:1531
#6 0x0000000000428314 in std::_Bind_simple<NetworkCoding::NCSocket::NCSocket(unsigned short, long, long, std::function<void (unsigned char*, unsigned short, sockaddr_in const*, unsigned int)>)::{lambda()#1} ()>::operator()() (this=0x65a6b8) at /usr/include/c++/5/functional:1520
#7 0x00000000004282a4 in std::thread::_Impl<std::_Bind_simple<NetworkCoding::NCSocket::NCSocket(unsigned short, long, long, std::function<void (unsigned char*, unsigned short, sockaddr_in const*, unsigned int)>)::{lambda()#1} ()> >::_M_run() (this=0x65a6a0)
at /usr/include/c++/5/thread:115
#8 0x00007ffff78f0c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#9 0x00007ffff7bc16ba in start_thread (arg=0x7ffff6f4f700) at pthread_create.c:333
#10 0x00007ffff735f82d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb)

Seg fault

#0 0x0807d930 in AVLTreeElement<NetworkCoding::DataTypes::SessionKey, NetworkCoding::ReceptionSession*>::~AVLTreeElement (this=0x80bfba0, __in_chrg=)
at ../basiclibrary/avltree/AVLTree.h:207
#1 0x0807b685 in AVLTree<NetworkCoding::DataTypes::SessionKey, NetworkCoding::ReceptionSession*>::Clear (this=0x80c0764) at ../basiclibrary/avltree/AVLTree.h:918
#2 0x0807838d in NetworkCoding::Reception::~Reception (this=0x80c0760,
__in_chrg=) at rx.cpp:761
#3 0x0804f5ac in NetworkCoding::NCSocket::~NCSocket (this=0x80bfba0,
__in_chrg=) at ncsocket.h:257
#4 0x0804df78 in FreeSocket (handle=0x80bfba0) at api.cpp:61
#5 0x0804a77e in AsyncReceiveFunction (port=0xbffff8c4 "10000") at main.cpp:157
#6 0x0804a908 in main (argc=3, argv=0xbffff764) at main.cpp:172

#0 0x0807b66f in AVLTree<NetworkCoding::DataTypes::SessionKey, NetworkCoding::ReceptionSession*>::Clear (this=0x80c0764) at ../basiclibrary/avltree/AVLTree.h:917
#1 0x0807838d in NetworkCoding::Reception::~Reception (this=0x80c0760,
__in_chrg=) at rx.cpp:761
#2 0x0804f5ac in NetworkCoding::NCSocket::~NCSocket (this=0x80bfba0,
__in_chrg=) at ncsocket.h:257
#3 0x0804df78 in FreeSocket (handle=0x80bfba0) at api.cpp:61
#4 0x0804a77e in AsyncReceiveFunction (port=0xbffff8c4 "10000") at main.cpp:157
#5 0x0804a908 in main (argc=3, argv=0xbffff764) at main.cpp:172

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.