Top Swimmers SQL Table (SQLAlchemy Library)
"""
This uses Python SQLAlchemy database objects to create swimmer DB
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
database = 'sqlite:///instance/sqlite.db' # path and filename of database
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy()
# This run once per project
db.init_app(app)
import datetime
from datetime import datetime
import json
from sqlalchemy.exc import IntegrityError
from werkzeug.security import generate_password_hash, check_password_hash
# Define the Swimmer class to manage actions in the 'swimmer' table
class Swimmer(db.Model):
__tablename__ = 'swimmer'
# Define the Swimmer schema with "vars" from object
id = db.Column(db.Integer, primary_key=True)
_name = db.Column(db.String(255), unique=False, nullable=False)
_speed = db.Column(db.Float, unique=False, nullable=False)
_height = db.Column(db.Float, unique=False, nullable=False)
_weight = db.Column(db.Integer, unique=False, nullable=False)
# constructor of a Swimmer object, initializes the instance variables within object (self)
def __init__(self, name, speed, height, weight):
self._name = name
self._speed = speed
self._height = height
self._weight = weight
# set of getter and setter methods below
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
@property
def speed(self):
return self._speed
@speed.setter
def speed(self, speed):
self._speed = speed
@property
def height(self):
return self._height
@height.setter
def height(self, height):
self._height = height
@property
def weight(self):
return self._weight
@weight.setter
def weight(self, weight):
self._weight = weight
def __str__(self):
return json.dumps(self.read())
# CRUD create/add a new record to the table
# returns self or None on error
def create(self):
try:
db.session.add(self)
db.session.commit()
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
"id": self.id,
"name": self.name,
"speed": self.speed,
"height": self.height,
"weight": self.weight,
}
# CRUD update: updates swimmer
# returns none
def update(self, dictionary):
for x in dictionary:
if x == "speed":
self.speed = dictionary[x]
if x == "height":
self.height = dictionary[x]
if x == "weight":
self.weight = dictionary[x]
db.session.merge(self)
db.session.commit()
return None
# CRUD deletes: swimmer
# returns none
def delete(self):
db.session.delete(self)
db.session.commit()
return None
def initUsers():
with app.app_context():
db.create_all()
p1 = Swimmer(name='Caeleb Dressel', speed='47.02', height='6.3', weight='194')
p2 = Swimmer(name='Alain Bernard', speed='47.21', height='6.5', weight='200')
p3 = Swimmer(name='Nathan Adrian', speed='47.52', height='6.6', weight='227')
p4 = Swimmer(name='Kyle Chalmers', speed='47.58', height='6.4', weight='198')
p5 = Swimmer(name='Pieter Hoogenband', speed='48.17', height='6.4', weight='180')
p6 = Swimmer(name='Alexander Popov', speed='48.74', height='6.5', weight='192')
swimmers = [p1, p2, p3, p4, p5, p6]
for x in swimmers:
try:
object = x.create()
print(f"Created new uid {object.name}")
except:
print(f"Records exist uid {x.name}, or error.")
initUsers()
def find_by_name(name):
with app.app_context():
swimmer = Swimmer.query.filter_by(_name=name).first()
return swimmer
def create():
name = input("Enter Swimmer name:")
swimmer = find_by_name(name)
try:
print("Swimmer Found\n", name.read())
return
except:
pass
# accepts new swimmer data
speed = input("Enter Swimmer speed:")
height = input("Enter Swimmer height:")
weight = input("Enter Swimmer weight:")
swimmer = Swimmer(name=name,
speed=speed,
height=height,
weight=weight
)
# add new swimmer to table
with app.app_context():
try:
object = swimmer.create()
print("Created\n", object.read())
except:
print("Unknown error name {name}")
create()
def read():
with app.app_context():
table = Swimmer.query.all()
json_ready = [swimmer.read() for swimmer in table]
return json_ready
read()
def update():
# ask for user input
name = str(input("Who do you want to edit?"))
speed = float(input("Enter the swimmer's new speed"))
height = float(input("Enter the swimmer's new height"))
weight = int(input("Enter the swimmer's new weight"))
body = {
"name": name,
"data": {"speed": speed, "height": height, "weight": weight}
}
data = body.get('data')
swimmer = find_by_name(name)
with app.app_context():
swimmer.update(data)
db.session.commit()
return f"{swimmer.name} at id {swimmer.id} has been updated"
update()
read()
def delete():
name = str(input("Who do you want to delete?"))
swimmer = find_by_name(name)
with app.app_context():
swimmer.delete()
return f"{swimmer.name} at id {swimmer.id} has been deleted"
delete()
read()