-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
Copy pathutils.py
837 lines (705 loc) · 33.4 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
from __future__ import print_function
import torch
import pickle
import numpy as np
import math
import cv2
from PIL import Image, JpegImagePlugin
from scipy import ndimage
import hashlib
import sys, os
from zipfile import ZipFile
from .imgproc import loadImage
if sys.version_info[0] == 2:
from six.moves.urllib.request import urlretrieve
else:
from urllib.request import urlretrieve
def consecutive(data, mode ='first', stepsize=1):
group = np.split(data, np.where(np.diff(data) != stepsize)[0]+1)
group = [item for item in group if len(item)>0]
if mode == 'first': result = [l[0] for l in group]
elif mode == 'last': result = [l[-1] for l in group]
return result
def word_segmentation(mat, separator_idx = {'th': [1,2],'en': [3,4]}, separator_idx_list = [1,2,3,4]):
result = []
sep_list = []
start_idx = 0
sep_lang = ''
for sep_idx in separator_idx_list:
if sep_idx % 2 == 0: mode ='first'
else: mode ='last'
a = consecutive( np.argwhere(mat == sep_idx).flatten(), mode)
new_sep = [ [item, sep_idx] for item in a]
sep_list += new_sep
sep_list = sorted(sep_list, key=lambda x: x[0])
for sep in sep_list:
for lang in separator_idx.keys():
if sep[1] == separator_idx[lang][0]: # start lang
sep_lang = lang
sep_start_idx = sep[0]
elif sep[1] == separator_idx[lang][1]: # end lang
if sep_lang == lang: # check if last entry if the same start lang
new_sep_pair = [lang, [sep_start_idx+1, sep[0]-1]]
if sep_start_idx > start_idx:
result.append( ['', [start_idx, sep_start_idx-1] ] )
start_idx = sep[0]+1
result.append(new_sep_pair)
sep_lang = ''# reset
if start_idx <= len(mat)-1:
result.append( ['', [start_idx, len(mat)-1] ] )
return result
# code is based from https://github.com/githubharald/CTCDecoder/blob/master/src/BeamSearch.py
class BeamEntry:
"information about one single beam at specific time-step"
def __init__(self):
self.prTotal = 0 # blank and non-blank
self.prNonBlank = 0 # non-blank
self.prBlank = 0 # blank
self.prText = 1 # LM score
self.lmApplied = False # flag if LM was already applied to this beam
self.labeling = () # beam-labeling
self.simplified = True # To run simplyfiy label
class BeamState:
"information about the beams at specific time-step"
def __init__(self):
self.entries = {}
def norm(self):
"length-normalise LM score"
for (k, _) in self.entries.items():
labelingLen = len(self.entries[k].labeling)
self.entries[k].prText = self.entries[k].prText ** (1.0 / (labelingLen if labelingLen else 1.0))
def sort(self):
"return beam-labelings, sorted by probability"
beams = [v for (_, v) in self.entries.items()]
sortedBeams = sorted(beams, reverse=True, key=lambda x: x.prTotal*x.prText)
return [x.labeling for x in sortedBeams]
def wordsearch(self, classes, ignore_idx, maxCandidate, dict_list):
beams = [v for (_, v) in self.entries.items()]
sortedBeams = sorted(beams, reverse=True, key=lambda x: x.prTotal*x.prText)
if len(sortedBeams) > maxCandidate: sortedBeams = sortedBeams[:maxCandidate]
for j, candidate in enumerate(sortedBeams):
idx_list = candidate.labeling
text = ''
for i,l in enumerate(idx_list):
if l not in ignore_idx and (not (i > 0 and idx_list[i - 1] == idx_list[i])):
text += classes[l]
if j == 0: best_text = text
if text in dict_list:
#print('found text: ', text)
best_text = text
break
else:
pass
#print('not in dict: ', text)
return best_text
def applyLM(parentBeam, childBeam, classes, lm):
"calculate LM score of child beam by taking score from parent beam and bigram probability of last two chars"
if lm and not childBeam.lmApplied:
c1 = classes[parentBeam.labeling[-1] if parentBeam.labeling else classes.index(' ')] # first char
c2 = classes[childBeam.labeling[-1]] # second char
lmFactor = 0.01 # influence of language model
bigramProb = lm.getCharBigram(c1, c2) ** lmFactor # probability of seeing first and second char next to each other
childBeam.prText = parentBeam.prText * bigramProb # probability of char sequence
childBeam.lmApplied = True # only apply LM once per beam entry
def simplify_label(labeling, blankIdx = 0):
labeling = np.array(labeling)
# collapse blank
idx = np.where(~((np.roll(labeling,1) == labeling) & (labeling == blankIdx)))[0]
labeling = labeling[idx]
# get rid of blank between different characters
idx = np.where( ~((np.roll(labeling,1) != np.roll(labeling,-1)) & (labeling == blankIdx)) )[0]
if len(labeling) > 0:
last_idx = len(labeling)-1
if last_idx not in idx: idx = np.append(idx, [last_idx])
labeling = labeling[idx]
return tuple(labeling)
def fast_simplify_label(labeling, c, blankIdx=0):
# Adding BlankIDX after Non-Blank IDX
if labeling and c == blankIdx and labeling[-1] != blankIdx:
newLabeling = labeling + (c,)
# Case when a nonBlankChar is added after BlankChar |len(char) - 1
elif labeling and c != blankIdx and labeling[-1] == blankIdx:
# If Blank between same character do nothing | As done by Simplify label
if labeling[-2] == c:
newLabeling = labeling + (c,)
# if blank between different character, remove it | As done by Simplify Label
else:
newLabeling = labeling[:-1] + (c,)
# if consecutive blanks : Keep the original label
elif labeling and c == blankIdx and labeling[-1] == blankIdx:
newLabeling = labeling
# if empty beam & first index is blank
elif not labeling and c == blankIdx:
newLabeling = labeling
# if empty beam & first index is non-blank
elif not labeling and c != blankIdx:
newLabeling = labeling + (c,)
elif labeling and c != blankIdx:
newLabeling = labeling + (c,)
# Cases that might still require simplyfying
else:
newLabeling = labeling + (c,)
newLabeling = simplify_label(newLabeling, blankIdx)
return newLabeling
def addBeam(beamState, labeling):
"add beam if it does not yet exist"
if labeling not in beamState.entries:
beamState.entries[labeling] = BeamEntry()
def ctcBeamSearch(mat, classes, ignore_idx, lm, beamWidth=25, dict_list = []):
blankIdx = 0
maxT, maxC = mat.shape
# initialise beam state
last = BeamState()
labeling = ()
last.entries[labeling] = BeamEntry()
last.entries[labeling].prBlank = 1
last.entries[labeling].prTotal = 1
# go over all time-steps
for t in range(maxT):
curr = BeamState()
# get beam-labelings of best beams
bestLabelings = last.sort()[0:beamWidth]
# go over best beams
for labeling in bestLabelings:
# probability of paths ending with a non-blank
prNonBlank = 0
# in case of non-empty beam
if labeling:
# probability of paths with repeated last char at the end
prNonBlank = last.entries[labeling].prNonBlank * mat[t, labeling[-1]]
# probability of paths ending with a blank
prBlank = (last.entries[labeling].prTotal) * mat[t, blankIdx]
# add beam at current time-step if needed
prev_labeling = labeling
if not last.entries[labeling].simplified:
labeling = simplify_label(labeling, blankIdx)
# labeling = simplify_label(labeling, blankIdx)
addBeam(curr, labeling)
# fill in data
curr.entries[labeling].labeling = labeling
curr.entries[labeling].prNonBlank += prNonBlank
curr.entries[labeling].prBlank += prBlank
curr.entries[labeling].prTotal += prBlank + prNonBlank
curr.entries[labeling].prText = last.entries[prev_labeling].prText
# beam-labeling not changed, therefore also LM score unchanged from
#curr.entries[labeling].lmApplied = True # LM already applied at previous time-step for this beam-labeling
# extend current beam-labeling
# char_highscore = np.argpartition(mat[t, :], -5)[-5:] # run through 5 highest probability
char_highscore = np.where(mat[t, :] >= 0.5/maxC)[0] # run through all probable characters
for c in char_highscore:
#for c in range(maxC - 1):
# add new char to current beam-labeling
# newLabeling = labeling + (c,)
# newLabeling = simplify_label(newLabeling, blankIdx)
newLabeling = fast_simplify_label(labeling, c, blankIdx)
# if new labeling contains duplicate char at the end, only consider paths ending with a blank
if labeling and labeling[-1] == c:
prNonBlank = mat[t, c] * last.entries[prev_labeling].prBlank
else:
prNonBlank = mat[t, c] * last.entries[prev_labeling].prTotal
# add beam at current time-step if needed
addBeam(curr, newLabeling)
# fill in data
curr.entries[newLabeling].labeling = newLabeling
curr.entries[newLabeling].prNonBlank += prNonBlank
curr.entries[newLabeling].prTotal += prNonBlank
# apply LM
#applyLM(curr.entries[labeling], curr.entries[newLabeling], classes, lm)
# set new beam state
last = curr
# normalise LM scores according to beam-labeling-length
last.norm()
if dict_list == []:
bestLabeling = last.sort()[0] # get most probable labeling
res = ''
for i,l in enumerate(bestLabeling):
# removing repeated characters and blank.
if l not in ignore_idx and (not (i > 0 and bestLabeling[i - 1] == bestLabeling[i])):
res += classes[l]
else:
res = last.wordsearch(classes, ignore_idx, 20, dict_list)
return res
class CTCLabelConverter(object):
""" Convert between text-label and text-index """
def __init__(self, character, separator_list = {}, dict_pathlist = {}):
# character (str): set of the possible characters.
dict_character = list(character)
self.dict = {}
for i, char in enumerate(dict_character):
self.dict[char] = i + 1
self.character = ['[blank]'] + dict_character # dummy '[blank]' token for CTCLoss (index 0)
self.separator_list = separator_list
separator_char = []
for lang, sep in separator_list.items():
separator_char += sep
self.ignore_idx = [0] + [i+1 for i,item in enumerate(separator_char)]
####### latin dict
if len(separator_list) == 0:
dict_list = []
for lang, dict_path in dict_pathlist.items():
try:
with open(dict_path, "r", encoding = "utf-8-sig") as input_file:
word_count = input_file.read().splitlines()
dict_list += word_count
except:
pass
else:
dict_list = {}
for lang, dict_path in dict_pathlist.items():
with open(dict_path, "r", encoding = "utf-8-sig") as input_file:
word_count = input_file.read().splitlines()
dict_list[lang] = word_count
self.dict_list = dict_list
def encode(self, text, batch_max_length=25):
"""convert text-label into text-index.
input:
text: text labels of each image. [batch_size]
output:
text: concatenated text index for CTCLoss.
[sum(text_lengths)] = [text_index_0 + text_index_1 + ... + text_index_(n - 1)]
length: length of each text. [batch_size]
"""
length = [len(s) for s in text]
text = ''.join(text)
text = [self.dict[char] for char in text]
return (torch.IntTensor(text), torch.IntTensor(length))
def decode_greedy(self, text_index, length):
""" convert text-index into text-label. """
texts = []
index = 0
for l in length:
t = text_index[index:index + l]
# Returns a boolean array where true is when the value is not repeated
a = np.insert(~((t[1:]==t[:-1])),0,True)
# Returns a boolean array where true is when the value is not in the ignore_idx list
b = ~np.isin(t,np.array(self.ignore_idx))
# Combine the two boolean array
c = a & b
# Gets the corresponding character according to the saved indexes
text = ''.join(np.array(self.character)[t[c.nonzero()]])
texts.append(text)
index += l
return texts
def decode_beamsearch(self, mat, beamWidth=5):
texts = []
for i in range(mat.shape[0]):
t = ctcBeamSearch(mat[i], self.character, self.ignore_idx, None, beamWidth=beamWidth)
texts.append(t)
return texts
def decode_wordbeamsearch(self, mat, beamWidth=5):
texts = []
argmax = np.argmax(mat, axis = 2)
for i in range(mat.shape[0]):
string = ''
# without separators - use space as separator
if len(self.separator_list) == 0:
space_idx = self.dict[' ']
data = np.argwhere(argmax[i]!=space_idx).flatten()
group = np.split(data, np.where(np.diff(data) != 1)[0]+1)
group = [ list(item) for item in group if len(item)>0]
for j, list_idx in enumerate(group):
matrix = mat[i, list_idx,:]
t = ctcBeamSearch(matrix, self.character, self.ignore_idx, None,\
beamWidth=beamWidth, dict_list=self.dict_list)
if j == 0: string += t
else: string += ' '+t
# with separators
else:
words = word_segmentation(argmax[i])
for word in words:
matrix = mat[i, word[1][0]:word[1][1]+1,:]
if word[0] == '': dict_list = []
else: dict_list = self.dict_list[word[0]]
t = ctcBeamSearch(matrix, self.character, self.ignore_idx, None, beamWidth=beamWidth, dict_list=dict_list)
string += t
texts.append(string)
return texts
def merge_to_free(merge_result, free_list):
merge_result_buf, mr_buf = [], []
if not free_list:
return merge_result
free_list_buf = merge_result[-len(free_list):]
merge_result = merge_result[:-len(free_list)]
for idx, r in enumerate(merge_result):
if idx == len(merge_result)-1:
mr_buf.append(r)
merge_result_buf.append(mr_buf)
mr_buf=[]
continue
if (mr_buf == []) or (mr_buf[-1][0] < r[0]):
mr_buf.append(r)
else:
merge_result_buf.append(mr_buf)
mr_buf=[]
mr_buf.append(r)
for free_pos in free_list_buf:
y_pos = len(merge_result_buf)
x_pos = len(merge_result_buf[y_pos-1])
for i, result_pos in enumerate(merge_result_buf[1:]):
if free_pos[0][0][1] < result_pos[0][0][0][1]:
y_pos = i
break
for i, result_pos in enumerate(merge_result_buf[y_pos]):
if free_pos[0][0][0] < result_pos[0][0][0]:
x_pos = i
break
merge_result_buf[y_pos].insert(x_pos, free_pos)
merge_result = []
[merge_result.extend(r) for r in merge_result_buf]
return merge_result
def four_point_transform(image, rect):
(tl, tr, br, bl) = rect
widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))
widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))
maxWidth = max(int(widthA), int(widthB))
# compute the height of the new image, which will be the
# maximum distance between the top-right and bottom-right
# y-coordinates or the top-left and bottom-left y-coordinates
heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))
heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))
maxHeight = max(int(heightA), int(heightB))
dst = np.array([[0, 0],[maxWidth - 1, 0],[maxWidth - 1, maxHeight - 1],[0, maxHeight - 1]], dtype = "float32")
# compute the perspective transform matrix and then apply it
M = cv2.getPerspectiveTransform(rect, dst)
warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
return warped
def group_text_box(polys, slope_ths = 0.1, ycenter_ths = 0.5, height_ths = 0.5, width_ths = 1.0, add_margin = 0.05, sort_output = True):
# poly top-left, top-right, low-right, low-left
horizontal_list, free_list,combined_list, merged_list = [],[],[],[]
for poly in polys:
slope_up = (poly[3]-poly[1])/np.maximum(10, (poly[2]-poly[0]))
slope_down = (poly[5]-poly[7])/np.maximum(10, (poly[4]-poly[6]))
if max(abs(slope_up), abs(slope_down)) < slope_ths:
x_max = max([poly[0],poly[2],poly[4],poly[6]])
x_min = min([poly[0],poly[2],poly[4],poly[6]])
y_max = max([poly[1],poly[3],poly[5],poly[7]])
y_min = min([poly[1],poly[3],poly[5],poly[7]])
horizontal_list.append([x_min, x_max, y_min, y_max, 0.5*(y_min+y_max), y_max-y_min])
else:
height = np.linalg.norm([poly[6]-poly[0],poly[7]-poly[1]])
width = np.linalg.norm([poly[2]-poly[0],poly[3]-poly[1]])
margin = int(1.44*add_margin*min(width, height))
theta13 = abs(np.arctan( (poly[1]-poly[5])/np.maximum(10, (poly[0]-poly[4]))))
theta24 = abs(np.arctan( (poly[3]-poly[7])/np.maximum(10, (poly[2]-poly[6]))))
# do I need to clip minimum, maximum value here?
x1 = poly[0] - np.cos(theta13)*margin
y1 = poly[1] - np.sin(theta13)*margin
x2 = poly[2] + np.cos(theta24)*margin
y2 = poly[3] - np.sin(theta24)*margin
x3 = poly[4] + np.cos(theta13)*margin
y3 = poly[5] + np.sin(theta13)*margin
x4 = poly[6] - np.cos(theta24)*margin
y4 = poly[7] + np.sin(theta24)*margin
free_list.append([[x1,y1],[x2,y2],[x3,y3],[x4,y4]])
if sort_output:
horizontal_list = sorted(horizontal_list, key=lambda item: item[4])
# combine box
new_box = []
for poly in horizontal_list:
if len(new_box) == 0:
b_height = [poly[5]]
b_ycenter = [poly[4]]
new_box.append(poly)
else:
# comparable height and comparable y_center level up to ths*height
if abs(np.mean(b_ycenter) - poly[4]) < ycenter_ths*np.mean(b_height):
b_height.append(poly[5])
b_ycenter.append(poly[4])
new_box.append(poly)
else:
b_height = [poly[5]]
b_ycenter = [poly[4]]
combined_list.append(new_box)
new_box = [poly]
combined_list.append(new_box)
# merge list use sort again
for boxes in combined_list:
if len(boxes) == 1: # one box per line
box = boxes[0]
margin = int(add_margin*min(box[1]-box[0],box[5]))
merged_list.append([box[0]-margin,box[1]+margin,box[2]-margin,box[3]+margin])
else: # multiple boxes per line
boxes = sorted(boxes, key=lambda item: item[0])
merged_box, new_box = [],[]
for box in boxes:
if len(new_box) == 0:
b_height = [box[5]]
x_max = box[1]
new_box.append(box)
else:
if (abs(np.mean(b_height) - box[5]) < height_ths*np.mean(b_height)) and ((box[0]-x_max) < width_ths *(box[3]-box[2])): # merge boxes
b_height.append(box[5])
x_max = box[1]
new_box.append(box)
else:
b_height = [box[5]]
x_max = box[1]
merged_box.append(new_box)
new_box = [box]
if len(new_box) >0: merged_box.append(new_box)
for mbox in merged_box:
if len(mbox) != 1: # adjacent box in same line
# do I need to add margin here?
x_min = min(mbox, key=lambda x: x[0])[0]
x_max = max(mbox, key=lambda x: x[1])[1]
y_min = min(mbox, key=lambda x: x[2])[2]
y_max = max(mbox, key=lambda x: x[3])[3]
box_width = x_max - x_min
box_height = y_max - y_min
margin = int(add_margin * (min(box_width, box_height)))
merged_list.append([x_min-margin, x_max+margin, y_min-margin, y_max+margin])
else: # non adjacent box in same line
box = mbox[0]
box_width = box[1] - box[0]
box_height = box[3] - box[2]
margin = int(add_margin * (min(box_width, box_height)))
merged_list.append([box[0]-margin,box[1]+margin,box[2]-margin,box[3]+margin])
# may need to check if box is really in image
return merged_list, free_list
def calculate_ratio(width,height):
'''
Calculate aspect ratio for normal use case (w>h) and vertical text (h>w)
'''
ratio = width/height
if ratio<1.0:
ratio = 1./ratio
return ratio
def compute_ratio_and_resize(img,width,height,model_height):
'''
Calculate ratio and resize correctly for both horizontal text
and vertical case
'''
ratio = width/height
if ratio<1.0:
ratio = calculate_ratio(width,height)
img = cv2.resize(img,(model_height,int(model_height*ratio)), interpolation=Image.Resampling.LANCZOS)
else:
img = cv2.resize(img,(int(model_height*ratio),model_height),interpolation=Image.Resampling.LANCZOS)
return img,ratio
def get_image_list(horizontal_list, free_list, img, model_height = 64, sort_output = True):
image_list = []
maximum_y,maximum_x = img.shape
max_ratio_hori, max_ratio_free = 1,1
for box in free_list:
rect = np.array(box, dtype = "float32")
transformed_img = four_point_transform(img, rect)
ratio = calculate_ratio(transformed_img.shape[1],transformed_img.shape[0])
new_width = int(model_height*ratio)
if new_width == 0:
pass
else:
crop_img,ratio = compute_ratio_and_resize(transformed_img,transformed_img.shape[1],transformed_img.shape[0],model_height)
image_list.append( (box,crop_img) ) # box = [[x1,y1],[x2,y2],[x3,y3],[x4,y4]]
max_ratio_free = max(ratio, max_ratio_free)
max_ratio_free = math.ceil(max_ratio_free)
for box in horizontal_list:
x_min = max(0,box[0])
x_max = min(box[1],maximum_x)
y_min = max(0,box[2])
y_max = min(box[3],maximum_y)
crop_img = img[y_min : y_max, x_min:x_max]
width = x_max - x_min
height = y_max - y_min
ratio = calculate_ratio(width,height)
new_width = int(model_height*ratio)
if new_width == 0:
pass
else:
crop_img,ratio = compute_ratio_and_resize(crop_img,width,height,model_height)
image_list.append( ( [[x_min,y_min],[x_max,y_min],[x_max,y_max],[x_min,y_max]] ,crop_img) )
max_ratio_hori = max(ratio, max_ratio_hori)
max_ratio_hori = math.ceil(max_ratio_hori)
max_ratio = max(max_ratio_hori, max_ratio_free)
max_width = math.ceil(max_ratio)*model_height
if sort_output:
image_list = sorted(image_list, key=lambda item: item[0][0][1]) # sort by vertical position
return image_list, max_width
def download_and_unzip(url, filename, model_storage_directory, verbose=True):
zip_path = os.path.join(model_storage_directory, 'temp.zip')
reporthook = printProgressBar(prefix='Progress:', suffix='Complete', length=50) if verbose else None
urlretrieve(url, zip_path, reporthook=reporthook)
with ZipFile(zip_path, 'r') as zipObj:
zipObj.extract(filename, model_storage_directory)
os.remove(zip_path)
def calculate_md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def diff(input_list):
return max(input_list)-min(input_list)
def get_paragraph(raw_result, x_ths=1, y_ths=0.5, mode = 'ltr'):
# create basic attributes
box_group = []
for box in raw_result:
all_x = [int(coord[0]) for coord in box[0]]
all_y = [int(coord[1]) for coord in box[0]]
min_x = min(all_x)
max_x = max(all_x)
min_y = min(all_y)
max_y = max(all_y)
height = max_y - min_y
box_group.append([box[1], min_x, max_x, min_y, max_y, height, 0.5*(min_y+max_y), 0,box[-1]]) # last element indicates group
# cluster boxes into paragraph
current_group = 1
while len([box for box in box_group if box[7]==0]) > 0:
box_group0 = [box for box in box_group if box[7]==0] # group0 = non-group
# new group
if len([box for box in box_group if box[7]==current_group]) == 0:
box_group0[0][7] = current_group # assign first box to form new group
# try to add group
else:
current_box_group = [box for box in box_group if box[7]==current_group]
mean_height = np.mean([box[5] for box in current_box_group])
min_gx = min([box[1] for box in current_box_group]) - x_ths*mean_height
max_gx = max([box[2] for box in current_box_group]) + x_ths*mean_height
min_gy = min([box[3] for box in current_box_group]) - y_ths*mean_height
max_gy = max([box[4] for box in current_box_group]) + y_ths*mean_height
add_box = False
for box in box_group0:
same_horizontal_level = (min_gx<=box[1]<=max_gx) or (min_gx<=box[2]<=max_gx)
same_vertical_level = (min_gy<=box[3]<=max_gy) or (min_gy<=box[4]<=max_gy)
if same_horizontal_level and same_vertical_level:
box[7] = current_group
add_box = True
break
# cannot add more box, go to next group
if add_box==False:
current_group += 1
# arrage order in paragraph
result = []
for i in set(box[7] for box in box_group):
current_box_group = [box for box in box_group if box[7]==i]
group_confidence = sum([box[8] for box in current_box_group])/len(current_box_group)
mean_height = np.mean([box[5] for box in current_box_group])
min_gx = min([box[1] for box in current_box_group])
max_gx = max([box[2] for box in current_box_group])
min_gy = min([box[3] for box in current_box_group])
max_gy = max([box[4] for box in current_box_group])
text = ''
while len(current_box_group) > 0:
highest = min([box[6] for box in current_box_group])
candidates = [box for box in current_box_group if box[6]<highest+0.4*mean_height]
# get the far left
if mode == 'ltr':
most_left = min([box[1] for box in candidates])
for box in candidates:
if box[1] == most_left: best_box = box
elif mode == 'rtl':
most_right = max([box[2] for box in candidates])
for box in candidates:
if box[2] == most_right: best_box = box
text += ' '+best_box[0]
current_box_group.remove(best_box)
result.append([ [[min_gx,min_gy],[max_gx,min_gy],[max_gx,max_gy],[min_gx,max_gy]], text[1:],group_confidence])
return result
def printProgressBar(prefix='', suffix='', decimals=1, length=100, fill='█'):
"""
Call in a loop to create terminal progress bar
@params:
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
def progress_hook(count, blockSize, totalSize):
progress = count * blockSize / totalSize
percent = ("{0:." + str(decimals) + "f}").format(progress * 100)
filledLength = int(length * progress)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end='')
return progress_hook
def reformat_input(image):
if type(image) == str:
if image.startswith('http://') or image.startswith('https://'):
tmp, _ = urlretrieve(image , reporthook=printProgressBar(prefix = 'Progress:', suffix = 'Complete', length = 50))
img_cv_grey = cv2.imread(tmp, cv2.IMREAD_GRAYSCALE)
os.remove(tmp)
else:
img_cv_grey = cv2.imread(image, cv2.IMREAD_GRAYSCALE)
image = os.path.expanduser(image)
img = loadImage(image) # can accept URL
elif type(image) == bytes:
nparr = np.frombuffer(image, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img_cv_grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
elif type(image) == np.ndarray:
if len(image.shape) == 2: # grayscale
img_cv_grey = image
img = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif len(image.shape) == 3 and image.shape[2] == 1:
img_cv_grey = np.squeeze(image)
img = cv2.cvtColor(img_cv_grey, cv2.COLOR_GRAY2BGR)
elif len(image.shape) == 3 and image.shape[2] == 3: # BGRscale
img = image
img_cv_grey = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
elif len(image.shape) == 3 and image.shape[2] == 4: # RGBAscale
img = image[:,:,:3]
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
img_cv_grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
elif type(image) == JpegImagePlugin.JpegImageFile:
image_array = np.array(image)
img = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
img_cv_grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
else:
raise ValueError('Invalid input type. Supporting format = string(file path or url), bytes, numpy array')
return img, img_cv_grey
def reformat_input_batched(image, n_width=None, n_height=None):
"""
reformats an image or list of images or a 4D numpy image array &
returns a list of corresponding img, img_cv_grey nd.arrays
image:
[file path, numpy-array, byte stream object,
list of file paths, list of numpy-array, 4D numpy array,
list of byte stream objects]
"""
if ((isinstance(image, np.ndarray) and len(image.shape) == 4) or isinstance(image, list)):
# process image batches if image is list of image np arr, paths, bytes
img, img_cv_grey = [], []
for single_img in image:
clr, gry = reformat_input(single_img)
if n_width is not None and n_height is not None:
clr = cv2.resize(clr, (n_width, n_height))
gry = cv2.resize(gry, (n_width, n_height))
img.append(clr)
img_cv_grey.append(gry)
img, img_cv_grey = np.array(img), np.array(img_cv_grey)
# ragged tensors created when all input imgs are not of the same size
if len(img.shape) == 1 and len(img_cv_grey.shape) == 1:
raise ValueError("The input image array contains images of different sizes. " +
"Please resize all images to same shape or pass n_width, n_height to auto-resize")
else:
img, img_cv_grey = reformat_input(image)
return img, img_cv_grey
def make_rotated_img_list(rotationInfo, img_list):
result_img_list = img_list[:]
# add rotated images to original image_list
max_ratio=1
for angle in rotationInfo:
for img_info in img_list :
rotated = ndimage.rotate(img_info[1], angle, reshape=True)
height,width = rotated.shape
ratio = calculate_ratio(width,height)
max_ratio = max(max_ratio,ratio)
result_img_list.append((img_info[0], rotated))
return result_img_list
def set_result_with_confidence(results):
""" Select highest confidence augmentation for TTA
Given a list of lists of results (outer list has one list per augmentation,
inner lists index the images being recognized), choose the best result
according to confidence level.
Each "result" is of the form (box coords, text, confidence)
A final_result is returned which contains one result for each image
"""
final_result = []
for col_ix in range(len(results[0])):
# Take the row_ix associated with the max confidence
best_row = max(
[(row_ix, results[row_ix][col_ix][2]) for row_ix in range(len(results))],
key=lambda x: x[1])[0]
final_result.append(results[best_row][col_ix])
return final_result