Giter Club home page Giter Club logo

Comments (1)

pdxjohnny avatar pdxjohnny commented on August 31, 2024

Data Flow Programming

Explainer on what data flow programming is and how it works.
Alice thinks in parallel.

We need to come up with serveral metrics to track and plot throughout.
We also need to plot in relation to other metrics for tradeoff analysis.

We could also make this like a choose your own adventure style tutorial,
if you want to do it with threads, here's your output metrics. We can
later show that we're getting these metrics by putting all the steps
into a dataflow and getting the metrics out by running them. We could then
show how we can ask the orchestrator to optimize for speed, memory, etc.
Then add in how you can have the orchestrator take those optimization
constriants from dynamic conditions such as how much memory is on the
machine you are running on, or do you have access to a k8s cluster. Also
talked about power consumption vs. speed trade off for server vs. desktop.
Could add in edge constraints like network latency.

Will need to add in metrics API and use in various places in
orchestrators and expose to operations to report out. This will be the
same APIs we'll use for stub operations to estimate time to completion,
etc.

  • Make sure to measure speed and memory useage with ProcessPoolExecutor
    ThreadPoolExecutor. Make sure we take into accout memory from all
    processes.

  • Start to finish speed

    • Plot with number of requests made
  • Memory consumed

    • Plot with number of requests made

This could be done as an IPython notebook.

  • Show basic downloader code

    • Observe speed bottleneck due to download in series
  • Parallelize download code

    • Observe increase in speed

    • Observe error handling issues

  • Add in need to call out via subprocess

    • Observe subprocess issues
  • Move to event loop

    • Observe increase in speed (? Not sure on this yet)

    • Observe successful error handling

    • Observe need to track fine grained details

  • Move to event based implemention with director (orchestrator, this file
    minus prev pointers in Base Event)

    • Observe visablity into each event state of each request

    • Observe lack of visablity into chain of events

  • Add prev pointers

    • Open Liniage
  • Move to data flow based implemention

  • Demo full DFFML data flow using execution on k8s

    • Use k8s playground as target environment
"""
License: Public Domain
Source: https://gist.github.com/f6fe1a39bd4e66e7d0c6e7802872d3b5
"""
import os
import asyncio
import pathlib
import contextlib
import dataclasses
import urllib.parse
import concurrent.futures

import bs4
import dffml
import aiohttp


DOWNLOAD_PATH = pathlib.Path(__file__).parent.joinpath("downloads")
if not DOWNLOAD_PATH.is_dir():
    DOWNLOAD_PATH.mkdir()


def parse_bs4(html_doc):
    return bs4.BeautifulSoup(html_doc, "html.parser")


def mkurl(endpoint, **kwargs):
    url = urllib.parse.urlparse(endpoint)
    url = url._replace(**kwargs)
    return urllib.parse.urlunparse(url)


@dataclasses.dataclass
class SiteContext:
    username: str
    password: str
    endpoint: str = "https://site.com/"
    headers: dict = dataclasses.field(
        default_factory=lambda: {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0",
            "Connection": "keep-alive",
        }
    )

    def mkurl(self, **kwargs):
        return mkurl(self.endpoint, **kwargs)


async def main_with_stacks(ctx: SiteContext, loop, stack, astack):
    # Create thread pool for CPU bound tasks
    pool = stack.enter_context(concurrent.futures.ThreadPoolExecutor())
    # Create aiohttp client session which pools TCP connections for reuse
    session = await astack.enter_async_context(
        aiohttp.ClientSession(headers=ctx.headers)
    )
    # Get initial web page
    async with session.get(ctx.endpoint) as response:
        # Read home page
        initial_page_body_text = await response.read()
        # Parse home page
        initial_page_body_soup = await loop.run_in_executor(
            pool, parse_bs4, initial_page_body_text
        )
        # Find authenticity_token (This is the CSRF token)
        authenticity_token = initial_page_body_soup.find(
            "input",
            {"name": "authenticity_token"},
        )
        authenticity_token = authenticity_token.get("value")
    # Login. Server sends cookie which is used to authenticate us in subsequent
    # requests. The cookie is stored in the ClientSession.
    # Cookie might be only sent if something like "rememeber" is sent.
    # Use chrome devtools Network tab to see login request data, make sure to
    # check "Preserve Log" before you trigger the login page. Incognito window
    # can be helpful for getting a logged-out session.
    await session.post(
        ctx.mkurl(path="/login"),
        data={
            "authenticity_token": authenticity_token,
            "username": ctx.username,
            "password": ctx.password,
            "remember": "on",
        },
    )


async def main():
    # Grab loop
    loop = asyncio.get_event_loop()
    # Create a context using our credentials
    ctx = SiteContext(
        username=os.environ["USERNAME"],
        password=os.environ["PASSWORD"],
    )
    with contextlib.ExitStack() as stack:
        async with contextlib.AsyncExitStack() as astack:
            await main_with_stacks(ctx, loop, stack, astack)


if __name__ == "__main__":
    asyncio.run(main())

DFFML's Current Working Data Flow Execution Model

graph TD
  subgraph dataflow_execution[Data Flow Execution]

    inputs[New Inputs]
    operations[Operations]
    opimps[Operation Implementations]

    ictx[Input Network]
    opctx[Operation Network]
    opimpctx[Operation Implementation Network]
    rctx[Redundency Checker]
    lctx[Lock Network]


    opctx_operations[Determine which Operations may have new parameter sets]
    ictx_gather_inputs[Generate Operation parameter set pairs]
    opimpctx_dispatch[Dispatch operation for running]
    opimpctx_run_operation[Run an operation using given parameter set as inputs]

    inputs --> ictx

    operations -->|Register With| opctx
    opimps -->|Register With| opimpctx

    ictx --> opctx_operations
    opctx --> opctx_operations

    opctx_operations --> ictx_gather_inputs
    ictx_gather_inputs --> rctx
    rctx --> |If operation has not been run with given parameter set before| opimpctx_dispatch

    opimpctx_dispatch --> opimpctx

    opimpctx --> lctx

    lctx --> |Lock any inputs that can't be used at the same time| opimpctx_run_operation

    opimpctx_run_operation --> |Outputs of Operation become inputs to other operations| inputs
  end

from dffml.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.