linuxforhealth / connect Goto Github PK
View Code? Open in Web Editor NEWLinuxForHealth Data Flows
License: Apache License 2.0
LinuxForHealth Data Flows
License: Apache License 2.0
We just need to review this issue to see if it affects us. At PokitDok we had issues with resuming a workflow on another machine - rbarrois/xworkflows#10.
This may not be relevant to our use case since our workflow is not distributed.
When you send a "ctrl+c"/SIGINT signal to pyConnect the uvicorn process hangs/freezes and doesn't return to the terminal.
We may need to look into using gunicorn to support running/managing our local uvicorn process.
We need to remove the asynctest dependency and migrate our usage to utilize features from pytest and python 3.8.
pyconnect
to de deployed on a Kubernetes environmentdir
(s) to be available to the pyconnect
service - This would be for chains with an intermediate CA or client side/2 way ssl auth.pyConnect's "main" module has the potential to be a bit busy due to the server event handlers and api routing configuration. This PR aligns pyConnect's API configuration with the sample Fast API Big Project which clean up the main module a bit.
For test cases that exercise external integration via the "transmit" step in our workflow, it may be beneficial to create a pytest fixture used to mock the external server.
The httpx library has a nice implementation
We could make it a little simpler and opt for a "session" scope. For more info refer to the pytest docs
We could provide an implementation that returns a response, headers, status code based on the request URL and method.
Create a Pydantic model to support the LinuxForHealth metadata message. Initially we can support the "java" LFH metadata message format and then enhance it as necessary
This epic serves as a MVP definition for the initial pyconnect platform. This initial implementation supports FHIR based resources, focusing on LFH "single node operations". Message synchronization between LFH nodes is deferred to a future epic.
High Level Features include:
Implement the /data [GET] endpoint in data.py to support fetching a LFH message by topic, partition, and offset.
Create a default logging configuration for pyconnect (console logger) that is capable of logging http requests and status codes
Support use cases that require remote async processing:
The flow looks like:
client -> LFH -> JetStream stream-> remote NATS subscriber -> JetStream stream -> LFH NATS subscriber -> JetStream stream -> client NATS subscriber
Create the initial pyconnect project structure and contents which include:
Implement the /data route (routes/data.py) to call the Kafka consumer and return the stored LinuxForHealthDataRecordResponse instance at topic:partition:offset.
Python does not provide a standard configuration process or store, such as Java's keystore/truststore, to support x509 certificates, keys, etc.
yaml
s)The shutdown hook we use to gracefully close the NATS and Kafka clients has been removed from our Fast API startup. This likely occurred during a period of time when we were refactoring function names and package structure.
LinuxForHealth's Kafka deployment is as a Longitudinal Patient Record (LPR) data store. As transactions flow the system, each data message is persisted to a format specific Kafka topic such as FHIR-R4_PATIENT, HL7_ADT, etc where the topic name is comprised of the data format (FHIR-R4, HL7) and the format's message type.
We need to create a high level diagram that shows how data is transformed/normalized to FHIR R4 (US Core Profile)
The current pyConnect logging configuration creates duplicate log entries.
2021-02-27 06:38:51,356 - uvicorn.access - INFO - 127.0.0.1:65252 - "GET / HTTP/1.1" 404
2021-02-27 06:38:51,358 - uvicorn.access - INFO - 127.0.0.1:65251 - "GET /favicon.ico HTTP/1.1" 404
2021-02-27 06:38:56,290 - uvicorn.access - INFO - 127.0.0.1:65252 - "GET / HTTP/1.1" 404
2021-02-27 06:38:56,291 - uvicorn.access - INFO - 127.0.0.1:65251 - "GET /favicon.ico HTTP/1.1" 404
2021-02-27 06:38:56,554 - uvicorn.access - INFO - 127.0.0.1:65252 - "GET /docs HTTP/1.1" 200
2021-02-27 06:38:56,709 - uvicorn.access - INFO - 127.0.0.1:65252 - "GET /openapi.json HTTP/1.1" 200
2021-02-27 06:38:56,849 - uvicorn.access - INFO - 127.0.0.1:65252 - "GET /docs HTTP/1.1" 200
2021-02-27 06:38:56,892 - uvicorn.access - INFO - 127.0.0.1:65252 - "GET /openapi.json HTTP/1.1" 200
2021-02-27 06:39:02,029 - pyconnect.routes.status - INFO - Application: pyconnect.main:app
2021-02-27 06:39:02,029 - pyconnect.routes.status - INFO - Application: pyconnect.main:app
2021-02-27 06:39:02,029 - pyconnect.routes.status - INFO - Version: 0.25.0
2021-02-27 06:39:02,029 - pyconnect.routes.status - INFO - Version: 0.25.0
2021-02-27 06:39:02,031 - uvicorn.access - INFO - 127.0.0.1:65256 - "GET /status HTTP/1.1" 200
This appears to be due to Uvicorn log configuration.
It looks like we need to make some updates to the uvicorn logging configuration, specified with log_config
to uvicorn.run()
, and update our logging.yaml.
pyConnect uses mkcert to generate a root CA for local development, in addition to service specific certificates for pyconnect, nats, etc which are signed by the root CA. Some developers on the team are not able to use the process outlined in our README as an error occurs
PYCONNECT_CERT=./local-certs/lfh.pem PYCONNECT_CERT_KEY=./local-certs/lfh.key UVICORN_RELOAD=True python pyconnect/main.py
INFO: Uvicorn running on https://0.0.0.0:5000 (Press CTRL+C to quit)
INFO: Started reloader process [61580] using statreload
INFO: Started server process [61582]
INFO: Waiting for application startup.
2021-02-26 11:01:13,518 - nats.aio.client - ERROR - nats: encountered error
Traceback (most recent call last):
File "/Users/anup.puriibm.com/Documents/Projects/lfh/pyconnect/venv/lib/python3.8/site-packages/nats/aio/client.py", line 318, in connect
await self._process_connect_init()
File "/Users/anup.puriibm.com/Documents/Projects/lfh/pyconnect/venv/lib/python3.8/site-packages/nats/aio/client.py", line 1607, in _process_connect_init
transport = await asyncio.wait_for(
File "/usr/local/Cellar/[email protected]/3.8.7_2/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 494, in wait_for
return fut.result()
File "/usr/local/Cellar/[email protected]/3.8.7_2/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 1200, in start_tls
await waiter
File "/usr/local/Cellar/[email protected]/3.8.7_2/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/sslproto.py", line 529, in data_received
ssldata, appdata = self._sslpipe.feed_ssldata(data)
File "/usr/local/Cellar/[email protected]/3.8.7_2/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/sslproto.py", line 189, in feed_ssldata
self._sslobj.do_handshake()
File "/usr/local/Cellar/[email protected]/3.8.7_2/Frameworks/Python.framework/Versions/3.8/lib/python3.8/ssl.py", line 944, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1125)
Implement the LFH data format shown in the /data route for kafka persistence used by the /fhir route.
If a client IS NOT integrating with an external system, we will return the LinuxForHealth metadata message.
If a client IS integrating with an external system we will return the LinuxForHealth metadata message URI within a HTTP header such as "LFHMetaDataLocation" or something similar.
Let's see if we can update existing "app" imports in main and test paths to use the app factory function.
This will keep us from trigger side-effects on import in these modules which is a good practice to follow.
Avoid
from pyconnect.main import app
Favor
from pyconnect.main import get_app
We recently updated the core workflow's run
method to accept/hint at a dict so that we aren't tightly coupled to the Response
object.
async def run(self, response: dict):
"""
Run the workflow according to the defined states. Override to extend or exclude states
for a particular implementation.
:return: the response instance, with updated body and status_code
"""
Let's review each usage of run and see if we can support a dictionary input.
Update: this may not be feasible since the "Response" is passed into Fast API endpoints to use as a model.
Message routing includes:
Add the NATS client and integrate a health check for the NATS server(s) and client in the /status endpoint
Please update the docker-compose configuration to support a single profile, ipfs, which is associated with each ipfs related service.
Let's update the FHIR Route to support standard FHIR endpoints such as
fhir/Patient
fhir/Observation
fhir/Coverage
etc etc
We will probably need to look into Fast API "path parameters" for this.
Create a "ping" route within pyConnect to provide a means of verifying internal component integration.
At a high level the overall flow/processing is read-only. Processing steps include:
For the latter point we will have a bootstrapping process which loads a predefined message into a Kafka "STATUS" topic.
We have noticed this message in the logs. It appears that xworkflows needs to refactor some of it's code imports in order to remain compatible with Python 3.9 (we are on 3.8 right now).
We will need to reach out the current xWorkflows maintainer to determine if the repo is still active. If it's not active we will need to fork it and address the issue.
=============================== warnings summary ===============================
tests/routes/test_data.py::test_get_data_record
/Users/tdw/projects/lfh/pyconnect/venv/lib/python3.8/site-packages/xworkflows/compat.py:26: DeprecationWarning:
Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,
and in 3.9 it will stop working
return isinstance(var, collections.Callable)
-- Docs: https://docs.pytest.org/en/stable/warnings.html
Minor issue - address linting issues in the core workflow module as needed.
Add a unit test for core workflow functionality. Use mocks where necessary to simulate interaction with Kafka, NATS, and external systems.
implements design from #3
Implement the transmit workflow step in the /fhir route to send the message to an external system via http.
If a client IS NOT integrating with an external system, we will return the LinuxForHealth metadata message.
If a client IS integrating with an external system we will return the LinuxForHealth metadata message URI within a HTTP header such as "LFHMetaDataLocation" or something similar.
Update the status check ping to use a non-blocking socket/async io. This will allow us to have an awaitable function which will simplify integration with the pytest and fast api frameworks.
Implement the synchronization in the core workflow 'synchronize' step. Emit an EVENTS.responses message to the NATS EVENTS stream so listeners will be notified of LFH events for synchronization.
Integrate the Kafka client to support health checks for the kafka broker, producer, and consumer components.
The HL7 organization supports an "official" client - http://docs.smarthealthit.org/client-py/. We can take a look to see if we can use it outside of the context of a SMART on FHIR application.
If SMART on FHIR is required we can provide a setup process to support ourselves as a SMART on FHIR application.
It's hard to believe, but the README is already out of date :)
Please review the current README to ensure that setup steps are accurate, and that we have coverage for setting up the project on Windows 10 using the WSL.
When storing data in Kafka, the default partition is 0. The following is the output from posting patient data and storing in kafka using the /fhir route. Note the data_record_location field:
{"uuid":"9aee4659-28b7-44c7-bae7-15f2d8aa102f","creation_date":"2021-03-17T12:41:04+00:00","store_date":"2021-03-17T12:41:04+00:00","transmit_date":null,"consuming_endpoint_url":"/fhir","data":"eyJpZCI6ICIwMDEiLCAiYWN0aXZlIjogdHJ1ZSwgImdlbmRlciI6ICJtYWxlIiwgInJlc291cmNlVHlwZSI6ICJQYXRpZW50In0=","data_format":"PATIENT","status":"success","data_record_location":"PATIENT:0:9","target_endpoint_url":null,"elapsed_storage_time":0.009799,"elapsed_transmit_time":null,"elapsed_total_time":0.010265}
However, when testing with:
https://127.0.0.1:5000/data?dataformat=PATIENT&partition=0&offset=9
I get:
{"detail":"Init Error: No topic_name or partition information provided."}
Implement error handling in the core workflow 'handle_error' step. Log the error and emit a NATS EVENTS.errors message on the EVENTS stream.
Let's update the OpenAPI FHIR route "documentation" to include sample FHIR request and response documents. For simplicity's sake we can include the minimal patient document and its response. We may also need to include a note/comment in the docs that indicate that the endpoint accepts any valid FHIR R4 resource.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.