Python Flask-restx API

14 Nov 2020
by ignat

To get to know our potential clients, you need their analysis

(I am for a creative approach to the task, otherwise it is too simple. As well as the idea to take everything ready. Be sure to add your own perspective based on your own experience.)

We have a list of airports in major cities. There may be our clients from these cities who will undergo remote training courses before arriving in Spain.

We have an Artificial Intelligence network that regularly analyzes all the statements of each of our students. We are developing a REST API service in Python that will analyze the received data and identify the demand for new training courses among residents of different age groups in different cities through advertising channels.

Let’s implement the following handlers in the service:

  • a. POST /upload CSV file in this format: https://gist.githubusercontent.com/stepan-passnfly/8997fbf25ae87966e8919dc7803716bc/raw/37c07818c8498ae3d587199961233aada88ed743/airports.csv (define table and save DDL query)
  • b. PUT /create an entry in the previous table.
  • c. PATCH /update an entry in the previous table.
  • d. DELETE /delete an entry in the previous table.
  • e.OPTIONS /:entity[/:id] Gets a list of methods available at the given URI. The server should respond with 200 OK with an additional Allow header: GET, POST, ...
  • f. GET /create  – create and returns a link with confirmation
  • g. GET /update – returns a link with confirmation
  • h. GET /delete – sets a flag in the database to delete this record.
  • POST /imports
    Добавляет новую выгрузку с данными;
  • GET /imports/$import_id/citizens
    Возвращает жителей указанной выгрузки;
  • PATCH /imports/$import_id/citizens/$citizen_id
    Изменяет информацию о жителе (и его родственниках и друзьях) в указанной выгрузке;
  • GET /imports/$import_id/citizens/birthdays
    Вычисляет число курсов, которое приобретет каждый житель выгрузки своим родственникам\друзьям (первого порядка), сгруппированное по каналам рекламы;
  • GET /imports/$import_id/towns/stat/percentile/age
    Вычисляет 50-й, 75-й и 99-й перцентили возрастов (полных лет) жителей по городам в указанной выборке.

Alternative frameworks

List of the best frameworks:

https://www.techempower.com/benchmarks/#section=data-r19&hw=ph&test=fortune&l=zijzen-1r

 

Explored alternatives:

asyncpg — A fast PostgreSQL Database Client Library for Python/asyncio (1 000 000 line per second)

asyncpgsa — A python library wrapper around asyncpg for use with SQLAlchemy

aiohttp — as a library for writing an asynchronous framework service, since it generally works 2 times? faster.

aiohttp-spec — binds aiohttp handlers and schemas for data validation. As a bonus, it turned out to generate documentation in the format Swagger and display it in графическом интерфейсе.

Ancient technologies of RabbitMQ

aioamqp, a pure-Python AMQP 0-9-1 library using asyncio (source codedocs)

celery – alternative asynchronous library under Python

The tools we have chosen

flask-restx — It is a flask framework with the ability to automatically generate documentation using  Swagger.

Apache Kafka — Is an open-source distributed event streaming platform

PostgreSQL – database

SQLAlchemy allows you to decompose complex queries into parts, reuse them, generate queries with a dynamic set of fields (for example, a PATCH handler allows partial updating of an inhabitant with arbitrary fields or ElasticSearch reading) and focus directly on the business logic.

Alembic is a lightweight database migration tool for usage with the SQLAlchemy Database Toolkit for Python.

1:33:49How to write and test database migrations with Alembic – Alexander Vasin.

Docker (composer and swarm) or Kubernetes (k8s)

Ansible — For deployment, it allows declaratively describing the desired state of the server and its services, works via ssh and does not require special software.

pylint and pytest and unittest

Profilers

PyCharm

python -m cProfile -o pick.prof -s tottime app.py — Python only profiler.

snakeviz pick.prof — web visualizer.

python3 -m vmprof app.py – profiles C functions.

Threadpool – for reading files asynchronously on Linux.

Analysis of program memory consumption:

import tracemalloc

tracemalloc.start()

tracemalloc.take_snapshot()

mprof – graph of memory consumption over time.

Further optimization is to extract a fixed part from the file each time. For example 1024 lines.

uvloop — To connect data loops over the network with PostgreSQL.

Postman — generating API requests.

 

Installation node

wget http://nodejs.org/dist/latest/node-v15.2.0-linux-x64.tar.gz

tar -xzf node-v15.2.0-linux-x64.tar.gz

ls mv node-v15.2.0-linux-x64 15.2.0

ln -s 15.2.0/ current

export PATH=”/home/ubuntu/.nodes/current/bin:$PATH”

rm node-v15.2.0-linux-x64.tar.gz

node –version

npm install

How to activate Swagger

https://github.com/noirbizarre/flask-restplus/blob/master/CONTRIBUTING.rst#running-a-local-swagger-server

sudo apt install python3-pip
sudo pip3 install -e .[dev]
sudo env “PATH=/home/ubuntu/.nodes/current/bin:$PATH” inv assets

Loading data into PostgreSQL

wget https://gist.githubusercontent.com/stepan-passnfly/8997fbf25ae87966e8919dc7803716bc/raw/37c07818c8498ae3d587199961233aada88ed743/airports.csv

http://www.jennifergd.com/post/7/

sudo /etc/init.d/postgresql start

psql -U postgres -h localhost insikt

CREATE TABLE cities (id SERIAL PRIMARY KEY, name VARCHAR(256), city VARCHAR(50), country VARCHAR(50), iata VARCHAR(10), icao VARCHAR(10), latitude FLOAT, longitude FLOAT, altitude FLOAT, timezone FLOAT, dst VARCHAR(10), tz VARCHAR(50), type VARCHAR(50), source VARCHAR(50));

\copy cities(name, city, country, iata, icao, latitude, longitude, altitude, timezone, dst, tz, type, source) FROM 'airports.csv' DELIMITERS ',' CSV HEADER;

SELECT * FROM cities;

 

 

PostGIS

http://postgis.net/install/

sudo apt-get install postgis

 

psql -U postgres -h localhost insikt

CREATE EXTENSION postgis;
CREATE TABLE citiesgeo (
    point_id SERIAL PRIMARY KEY,
    location VARCHAR(30),
    latitude FLOAT,
    longitude FLOAT,
    geo geometry(POINT)
);

 

cities.csv

location, latitude, longitude
San Francisco, 37.773972, -122.43129
Seattle, 47.608013, -122.335167
Sacramento, 38.575764, -121.478851
Oakland, 37.804363, -122.271111
Los Angeles, 34.052235, -118.243683
Alameda, 37.7652, -122.2416

In PostgreSQL shell:

=# \copy citiesgeo(location, latitude, longitude) FROM ‘cities.csv’ DELIMITERS ‘,’ CSV HEADER;
COPY 6

=# SELECT * FROM citiesgeo;
point_id | location | latitude | longitude | geo
———-+—————+———–+————-+—–
1 | San Francisco | 37.773972 | -122.43129 |
2 | Seattle | 47.608013 | -122.335167 |
3 | Sacramento | 38.575764 | -121.478851 |
4 | Oakland | 37.804363 | -122.271111 |
5 | Los Angeles | 34.052235 | -118.243683 |
6 | Alameda | 37.7652 | -122.2416 |
(6 rows)

https://postgis.net/docs/ST_Point.html

=# UPDATE citiesgeo
SET geo = ST_Point(longitude, latitude);
UPDATE 6

=# SELECT * FROM citiesgeo;
point_id | location | latitude | longitude | geo
———-+—————+———–+————-+——————————————–
1 | San Francisco | 37.773972 | -122.43129 | 0101000000E1455F419A9B5EC08602B68311E34240
2 | Seattle | 47.608013 | -122.335167 | 0101000000B3EC496073955EC07C45B75ED3CD4740
3 | Sacramento | 38.575764 | -121.478851 | 01010000000B2AAA7EA55E5EC0691B7FA2B2494340
4 | Oakland | 37.804363 | -122.271111 | 01010000007FA5F3E159915EC0658EE55DF5E64240
5 | Los Angeles | 34.052235 | -118.243683 | 0101000000D6E59480988F5DC0715AF0A2AF064140
6 | Alameda | 37.7652 | -122.2416 | 0101000000ACADD85F768F5EC01973D712F2E14240
(6 rows)

We make position in WGS84 format.

 

=# INSERT INTO citiesgeo (location, latitude, longitude, geo)
VALUES (‘San Rafael’, 37.9735, -122.5311, ST_Point(-122.5311, 37.9735));
INSERT 0 1

=# INSERT INTO citiesgeo (location, latitude, longitude, geo)
VALUES (‘San Rafael’, 37.9735, -122.5311, ST_Point(-122.5311, 37.9735));
INSERT 0 1

https://postgis.net/docs/ST_DistanceSphere.html

=# SELECT * FROM citiesgeo WHERE ST_DistanceSphere(geo,(SELECT geo FROM citiesgeo WHERE location = ‘San Francisco’
)) < 83000;
point_id | location | latitude | longitude | geo
———-+—————+———–+————-+——————————————–
1 | San Francisco | 37.773972 | -122.43129 | 0101000000E1455F419A9B5EC08602B68311E34240
4 | Oakland | 37.804363 | -122.271111 | 01010000007FA5F3E159915EC0658EE55DF5E64240
6 | Alameda | 37.7652 | -122.2416 | 0101000000ACADD85F768F5EC01973D712F2E14240
7 | San Bruno | 37.6305 | -122.4111 | 0101000000AED85F764F9A5EC062105839B4D04240
8 | San Rafael | 37.9735 | -122.5311 | 0101000000F5B9DA8AFDA15EC0F853E3A59BFC4240
(5 rows)

 

ALTER TABLE cities ADD COLUMN geo geometry(POINT);

-# SET geo = ST_Point(longitude, latitude);
UPDATE 7698

 

Launching the application in test mode

sudo python3 setup.py install

http://75.126.254.59:8032/

screen -dmS abc

screen -r abc

./app.py

Ctr-a d

 

docker-compose.yml – this is an outdated part that we are changing to a more modern one

Some companies work using outdated technologies due to the fact that they hired specialists with limited experience or, more precisely, ordered their systems from outsourcers (for example, from Ukraine). Below is an example of such a deprecated docker-composer.yml MySQL + RabbitMQ configuration.

version: ‘3.4’
services:
database:
image: mysql:5.7.21
ports:
– 3306:3306
environment:
– MYSQL_ROOT_PASSWORD=test
– MYSQL_USER=test
– MYSQL_PASSWORD=test
– MYSQL_DATABASE=mysqltest
volumes:
# – data02:/var/lib/mysql
– /home/ubuntu/opt/docker-volumes/mysql:/var/lib/mysql
networks:
default:
ipv4_address: 172.18.0.4

# rabbitmq:
# image: rabbitmq:3.6.12-management
# hostname: rabbitmq
# volumes:
# – /home/ubuntu/opt/docker-volumes/rabbitmq:/var/lib/rabbitmq
# environment:
# – RABBITMQ_DEFAULT_PASS=admin
# – RABBITMQ_DEFAULT_USER=admin
# – RABBITMQ_DEFAULT_VHOST=/
# ports:
# – 5672:5672
# – 15672:15672
# networks:
# default:
# ipv4_address: 172.18.0.3

 

Apache kafka and PostgreSQL

This is how kafka containers look like in Docker. Below are the installation instructions:

https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html

Our server:

http://75.126.254.59:9021

Then we test and create an asynchronous client.

Doing a little refactoring

https://github.com/Ignat99/restx2020

Since we have accumulated many layers and functions. For example API, ORM, business logic, Model, database schema, then for ease of maintenance we will distribute these functions in different modules. Everything could be left in one file. But since we have attached asynchronous processing and work with the Kafka Broker in dedicated threads that are synchronized through Kafka, it is better to separate the modules.

SonarQube with local PostgreSQL

https://gist.github.com/RawToast/feab6b36d2592554d071

sudo psql -U postgres  postgres

CREATE DATABASE sonar;

CREATE USER sonar WITH PASSWORD ‘verysecret’;

GRANT ALL PRIVILEGES ON DATABASE sonar TO sonar;

 

Install SonarQube

cd /opt

sudo wget https://binaries.sonarsource.com/CommercialDistribution/sonarqube-developer/sonarqube-developer-8.5.1.38104.zip

sudo unzip sonarqube-developer-8.5.1.38104.zip

sudo mv sonarqube-8.5.1.38104/ sonarqube

 

or

 

git clone https://github.com/SonarSource/sonarqube

cd sonarqube/

./gradlew build

 

or

sudo docker run -d –name sonarqube -p 9000:9000 -e SONARQUBE_JDBC_USERNAME=sonar
-e SONARQUBE_JDBC_PASSWORD=verysecret -e SONARQUBE_JDBC_URL=jdbc:postgresql://localhost:5432/sonar –net mynet sonarqube:7.5-community

sudo docker exec -it -u root sonarqube /bin/bash

sudo sysctl -w vm.max_map_count=262144

cd /opt/sonarqube

sh bin/linux-x86-64/sonar.sh start

netstat -tap

tcp6 0 0 [::]:9000 [::]:* LISTEN 5819/java

 

Tests

https://github.com/Ignat99/a_proxy

https://github.com/Ignat99/JSON_Test/blob/master/test_json.py

sudo pip3 install nose coverage nosexcover pylint

sudo pip3 install pylint_flask_sqlalchemy

sudo pip3 install pylint-flask

pylint –generate-rcfile > pylintrc

pylint passnfly/

To see duplicate lines and test coverage in SonarQube:

./coverage.sh

pytest -s –junitxml=pytests.xml –cov-report xml –cov-report term –cov-branch –cov=passnfly tests/

 

Sonar-scaner

wget https://binaries.sonarsource.com/Distribution/sonar-scanner-cli/sonar-scanner-cli-4.5.0.2216-linux.zip

 

../sonar-scanner/bin/sonar-scanner -Dsonar.projectKey=passnfly1 -Dsonar.sources=. -Dsonar.host.url=http://75.126.254.59:9000 -Dsonar.login=cf56501e7eca2051eb7510911ceec0b4e8b1cf66

 

Web interface of SonarQube

http://75.126.254.59:9000/dashboard?id=passnfly1

 

Python thread support for accessing asynchronous processes

https://github.com/Ignat99/restx2020/blob/main/passnfly/api/aiokafka/todo.py

In this part of our backend, a special class of Threads in debug mode is implemented, which allows stopping the child from the parent thread based on its name.

As the identifier of the asynchronous process, we use the hash that we generated for the partition of this topic in Kafka.

https://github.com/Ignat99/restx2020/blob/main/passnfly/api/aiokafka/todo.py#L111

All this gives great flexibility up to managing the entire banned by using the REST API from one of the threads.

Here we make sure that one thread is launched per task.

https://github.com/Ignat99/restx2020/blob/main/passnfly/api/aiokafka/todo.py#L122

The duration of our stream can be up to 2 weeks and is limited by the Kafka settings. Every 2 weeks, the oldest data in our partition is released. If our service did not manage to process the data in 2 weeks (it can be reconfigured in Kafka), then this data should be stored in longer storages, such as S3 / Hadoop. There is an old driver for RabbitMQ, but it doesn’t work well.

https://docs.confluent.io/clients-confluent-kafka-python/current/index.html

https://kafka-python.readthedocs.io/en/master/usage.html

Flask RESTX API testing

https://github.com/Ignat99/restx2020/tree/main/tests/api

https://riptutorial.com/flask/example/5622/testing-a-json-api-implemented-in-flask

https://flask.palletsprojects.com/en/1.1.x/testing/

 

Kafka – Idempotent Producer and Consumer

Как Kafka стала былью / Блог компании Tinkoff / Хабр (habr.com)

Will help client.id for Producer and Consumer. It is best to use a combination of the application name and, for example, the topic name as the client.id value.

Kafka Producer has the acks parameter that allows you to configure after how many acknowledge the cluster leader should consider the message as successfully written. This parameter can take the following values:

 

0 – acknowledge will not be counted.
1 – default parameter, acknowledge only from 1 replica is required.
−1 – acknowledge from all synchronized replicas are required (cluster setting min.insync.replicas).

From the listed values, it can be seen that acks equal to -1 gives the strongest guarantees that the message will not be lost.

 

As we all know, distributed systems are unreliable. To protect against transient faults, Kafka Producer provides a retries parameter that allows you to set the number of retry attempts during delivery.timeout.ms. Since the retries parameter has a default value of Integer.MAX_VALUE (2147483647), the number of retransmissions of a message can be adjusted by changing only delivery.timeout.ms. In some cases, this will be more convenient due to the fact that Producer will make the maximum number of re-submissions within the specified time interval. We set the interval to 30 seconds.

Kafka — Idempotent Producer And Consumer | by Sheshnath Kumar | Medium

In the simplest case, for this on the Producer, you need to set the enable.idempotence parameter to true. Idempotency guarantees that only one message is written to a specific partition of one topic. The precondition for enabling idempotency is the values acks = all, retry> 0, max.in.flight.requests.per.connection ≤ 5. If these parameters are not specified by the developer, the above values will be automatically set.

Producer:

 

  1. acks = all
  2. retries > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 — для упорядоченной отправки)
  5. transactional.id = ${application-name}-${hostname}

Consumer:

  1. isolation.level = read_committed

 

KIP-98 perf testing – Google Sheets

For 1K messages and transactions lasting 100ms, the producer bandwidth is reduced by:

3% compared to its performance when it is configured for at least once in-order delivery (acks = all, max.in.flight.requests.per.connection = 1),
by 20% compared to its operation when configured to at most once without respecting message order (acks = 1, max.in.flight.requests.per.connection = 5) is used by default.

Exactly Once Delivery and Transactional Messaging in Kafka – Google Docs

KIP-98 – Exactly Once Delivery and Transactional Messaging – Apache Kafka – Apache Software Foundation

KIP-129: Streams Exactly-Once Semantics – Apache Kafka – Apache Software Foundation

Access control is configured on your cluster.

transactional.id.expiration.ms, which is configured on the Kafka cluster and has a default value of 7 days.

 

Confluent’s Python Client for Apache Kafka

This library has advanced a little further than pyKafka, it looks like it somehow implements the mechanism we need. At least there are parameters for the transaction.
This is a C wrapper on top of librdkafka. You can search it for entry points to the new version of librdkafka.
Before the next line, you can put a description of the function to set the parameter from the new version of the library.
Around this place, you need to write a number of functions that support parameter  (enable_idempotence=True)
And here for the consumer ( isolation.level = read_committed)

Install

git clone https://github.com/edenhill/librdkafka

cd librdkafka/

git checkout v1.6.0-RC1

./configure

make

sudo make install

cd ../confluent-kafka-python/

git checkout v1.6.0-PRE4

python3 setup.py build

sudo python3 setup.py install

….

Finished processing dependencies for confluent-kafka==1.6.0

Pull request

Clone repo to my github.com

git clone Ignat99/confluent-kafka-python: Confluent’s Kafka Python Client (github.com)

cd confluent-kafka-python/

git remote add upstream git://github.com/confluentinc/confluent-kafka-python

git fetch upstream

git checkout v1.6.0-PRE4

python3 setup.py build

sudo python3 setup.py install

Finished processing dependencies for confluent-kafka==1.6.0

git checkout -b enable_idempotence

This is where we make a change to the library and later return the pull request.

git push origin enable_idempotence

 

 

Minecraft Edu © 2025