PyJeeves/pyjeeves/repositories/company.py

129 lines
4.8 KiB
Python

# -*- coding: utf-8 -*-
from pyjeeves.models.raw import Company as CompanyModel, Customer as CustomerModel
from pyjeeves.models import db
from sqlalchemy.sql.expression import and_
from sqlalchemy.orm.strategy_options import Load
from sqlalchemy.orm.exc import NoResultFound
from pyjeeves import logging
logger = logging.getLogger("PyJeeves." + __name__)
# Relocate Jeeves modules to separate folder and let a "master" module handle imports, and setup.
class Company():
"""Handles companies in Jeeves"""
@staticmethod
def get(ftg_nr):
""" Query an article by number """
try:
return db.raw_session.query(CompanyModel).filter_by(
FtgNr=ftg_nr
).one()
except NoResultFound:
raise KeyError
@staticmethod
def get_all_active_customers():
cust = db.raw_session.query(CustomerModel).filter(and_(CustomerModel.Makulerad == 0)).all()
return [c.CompanyModel for c in cust]
@staticmethod
def get_customers(category_list=[10], class_list=[], filter_inactive=True, filters=and_()):
category_in = CustomerModel.kundkategorikod.in_(category_list) if category_list else and_()
class_in = CustomerModel.kundklass.in_(class_list) if class_list else and_()
inactive = and_(CustomerModel.Makulerad == 0) if filter_inactive else and_()
return db.raw_session.query(CustomerModel).options(
Load(CompanyModel).noload('*')).filter(
and_(category_in, class_in, inactive, filters)).all()
@staticmethod
def get_customer_numbers(category_list=[10], class_list=[], filter_inactive=True):
cust = Company.get_customers(category_list, class_list, filter_inactive)
return [c.FtgNr for c in cust]
@staticmethod
def get_list(ftg_nr=[], filter_=and_(CustomerModel.Makulerad == 0), offset=0, limit=100):
ftg_filter = and_()
if ftg_nr:
ftg_filter = CompanyModel.FtgNr.in_(ftg_nr)
return db.raw_session.query(CompanyModel).join(CustomerModel).filter(
and_(ftg_filter, filter_)).order_by(
CompanyModel.FtgNr.desc()).offset(offset).limit(limit).all()
# TODO: Should be moved to separate project with Lindvalls specific code
def update_customer_delivery_from_csv(filename='zip_codes_svhl.csv'):
SVHL_ZONES = {}
logger.info("Get customers")
customers = Company.get_customers(filters=(
and_(CustomerModel.LevSattKod == 3, CustomerModel.kundklass == None))) # noqa
logger.info("Amount of customers is %d" % len(customers))
import csv
with open(filename, newline='') as csvfile:
shelfreader = csv.reader(csvfile, delimiter=',')
headers = shelfreader.__next__()
logger.info('Found these columns: %s' % (', '.join(headers)))
for row in shelfreader:
SVHL_ZONES[row[0]] = '%s - %s' % (row[2], row[1])
logger.info('Length of zones dict is %d' % (len(SVHL_ZONES)))
customers_to_update = 0
for customer in customers:
if customer.Company.FtgLevPostNr:
FtgLevPostNr = customer.Company.FtgLevPostNr.strip().replace(" ", "")
if FtgLevPostNr and FtgLevPostNr in SVHL_ZONES:
logger.info('FtgLevPostNr: %s - %s is within SVHL zone %s' % (
customer.FtgNr, customer.Company.FtgNamn, SVHL_ZONES[FtgLevPostNr]))
customers_to_update += 1
customer.LevSattKod = 11
continue
# Return? Break?
if customer.Company.FtgPostnr:
FtgPostnr = customer.Company.FtgPostnr.strip().replace(" ", "")
if FtgPostnr and FtgPostnr in SVHL_ZONES:
logger.info('FtgPostnr: %s - %s is within SVHL zone %s' % (
customer.FtgNr, customer.Company.FtgNamn, SVHL_ZONES[FtgPostnr]))
customer.LevSattKod = 11
customers_to_update += 1
logger.info('Amount updated %d' % customers_to_update)
# db.raw_db.merge(n1)
db.raw_session.commit()
logger.info('Succesfully commited updated customers to database')
if __name__ == '__main__':
# print([column.key for column in CompanyModel.__table__.columns])
# logger.info("Starting TEST")
# session = RawSession()
# logger.info("Testing gettings a company")
# c1 = session.query(CompanyModel).filter_by(FtgNr="179580").first()
# print(CompanyModel)
# print(CompanyModel.get_list(['406569', '179580', '2440070', '179584']))
# from pprint import pprint
# pprint(CompanyModel.get('179584').to_dict())
# c1 = CompanyModel.query.filter_by(FtgNr="406569").first()
# print(c1)
# logger.info(c1.json)
# print(
# len(CompanyModel.get_all_active_customers())
# )
# print(CompanyModel.get_all_active_customers()[0].CompanyModel)
logger.info("Starting")
update_customer_delivery_from_csv()