142 lines
4.9 KiB
Python
142 lines
4.9 KiB
Python
from pyjeeves.models.raw import ArticleShelf, ItemReplenishmentLevels
|
|
from pyjeeves.models import db
|
|
from sqlalchemy.sql.expression import and_
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
|
|
from pyjeeves.repositories import Article
|
|
|
|
from pyjeeves import logging
|
|
logger = logging.getLogger("PyJeeves." + __name__)
|
|
|
|
|
|
# Relocate Jeeves modules to separate folder and let a "master" module handle imports, and setup.
|
|
class Warehouse():
|
|
"""Handles articles in Jeeves, currently filters out all articles with class = 2"""
|
|
|
|
@staticmethod
|
|
def get_shelf(shelf_id):
|
|
""" Query a shelf by id """
|
|
try:
|
|
return db.raw_session.query(ArticleShelf).filter_by(
|
|
LagPlats=str(shelf_id)
|
|
).one()
|
|
except NoResultFound:
|
|
raise KeyError
|
|
|
|
@staticmethod
|
|
def get_all_shelfs(filter_=and_()):
|
|
# .filter_by(ItemStatusCode=0, ArtKod=2)
|
|
return db.raw_session.query(ArticleShelf).filter(filter_).all()
|
|
|
|
@staticmethod
|
|
def delete_empty_shelfs():
|
|
shelfs = db.raw_session.query(ArticleShelf).filter(and_(
|
|
ArticleShelf.LagStalle == 0,
|
|
ArticleShelf.JAPP_EWMS_zoneid.in_(['U', 'K', 'S']),
|
|
ArticleShelf.LagSaldo == 0)).all()
|
|
|
|
for shelf in shelfs:
|
|
db.raw_session.delete(shelf)
|
|
|
|
db.raw_session.commit()
|
|
logger.info('Deleted %s shelfs' % (len(shelfs)))
|
|
|
|
@staticmethod
|
|
def delete_replenish():
|
|
replenish = db.raw_session.query(ItemReplenishmentLevels).all()
|
|
|
|
for repl in replenish:
|
|
db.raw_session.delete(repl)
|
|
|
|
db.raw_session.commit()
|
|
logger.info('Deleted old replenishment levels')
|
|
|
|
@staticmethod
|
|
def add_replenish(shelfs=[]):
|
|
Warehouse.delete_replenish()
|
|
|
|
min_level = 54 # Should be how low we'd like the level to get. One layer on pallet?
|
|
max_level = 270 # How high is the roof?
|
|
|
|
# with db.raw_session.no_autoflush:
|
|
for shelf in shelfs:
|
|
art = Article.get(shelf['article_no'])
|
|
|
|
default_alt_unit = [unit for unit in art.ArticleUnit if unit.AltEnhetOrderStd == '1']
|
|
if default_alt_unit and len(default_alt_unit) == 1:
|
|
if default_alt_unit[0].AltEnhetOmrFaktor is not None:
|
|
unit_multiple = default_alt_unit[0].AltEnhetOmrFaktor
|
|
else:
|
|
unit_multiple = default_alt_unit[0].ArticleAlternativeUnit.AltEnhetOmrFaktor
|
|
else:
|
|
unit_multiple = 1
|
|
|
|
new_level = ItemReplenishmentLevels(
|
|
LagStalle='0', LagPlats=shelf['shelf'], ArtNr=shelf['article_no'],
|
|
JAPP_EWMS_minnoofitemsonbin=min_level,
|
|
MaxNoOfItemsOnBin=max_level,
|
|
JAPP_EWMS_multipel=unit_multiple,
|
|
ForetagKod=1)
|
|
|
|
db.raw_session.add(new_level)
|
|
db.raw_session.commit()
|
|
|
|
|
|
# TODO: Should be moved to separate project with Lindvalls specific code
|
|
def update_shelfs_from_csv(filename='shelf_numbers_20211217.csv'):
|
|
SHELF_TYPES = {
|
|
'Hyllplats': 'HP',
|
|
'Pallplats': 'PP',
|
|
'Pallrad': 'PR'
|
|
}
|
|
SKIP_ZONES = ('U', 'K', 'KB', 'S', 'G')
|
|
|
|
import csv
|
|
with open(filename, newline='') as csvfile:
|
|
shelfreader = csv.reader(csvfile, delimiter=',')
|
|
headers = shelfreader.__next__()
|
|
logger.info('Found these columns: %s' % (', '.join(headers)))
|
|
for row in shelfreader:
|
|
if row[3] in SKIP_ZONES:
|
|
continue
|
|
maxkg = int(row[4]) if row[4] else 0
|
|
multiitems = '1' if row[2] == 'Pallrad' else '0'
|
|
n1 = ArticleShelf(
|
|
LagPlats=row[1], JAPP_EWMS_zoneid=row[3], LagStalle='0',
|
|
LagPlatsTyp=SHELF_TYPES.get(row[2]), MaxNoOfItemsOnBin=maxkg,
|
|
MultiItemsOnBin=multiitems, JAPP_EWMS_AllowMultipleBatchesOnBin=multiitems,
|
|
ForetagKod=1)
|
|
|
|
db.raw_session.merge(n1)
|
|
db.raw_session.commit()
|
|
logger.info('Succesfully commited shelfs to database')
|
|
|
|
|
|
# TODO: Should be moved to separate project with Lindvalls specific code
|
|
def update_replenishment_levels(filename='repl_shelfs.csv'):
|
|
import csv
|
|
|
|
repl_shelfs = []
|
|
with open(filename, newline='') as csvfile:
|
|
shelfreader = csv.reader(csvfile, delimiter=',')
|
|
headers = shelfreader.__next__()
|
|
logger.info('Found these columns: %s' % (', '.join(headers)))
|
|
for row in shelfreader:
|
|
repl_shelfs.append({
|
|
'shelf': row[0],
|
|
'article_no': str(row[1])
|
|
})
|
|
logger.info('Updating %d replenishment levels' % (len(repl_shelfs)))
|
|
Warehouse.add_replenish(repl_shelfs)
|
|
logger.info('Succesfully commited levels to database')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# from pprint import pprint
|
|
# all_shelfs = Warehouse.get_all_shelfs()
|
|
# pprint([shelf.to_dict() for shelf in all_shelfs])
|
|
|
|
# Warehouse.delete_empty_shelfs()
|
|
update_shelfs_from_csv()
|
|
|
|
# update_replenishment_levels()
|