使用Python实现的差分进化算法,引用到Numpy和Scipy,可以支持多核与集群并行计算。使用时只需继承DESolver并定义函数def error_func(self, indiv, *args)作为目标函数即可。具体代码如下:

import numpy

import scipy.optimize

import sys

try:

import pp

HAS_PP = True

except ImportError:

HAS_PP = False

# set up the enumerated DE method types

DE_RAND_1 = 0

DE_BEST_1 = 1

DE_BEST_2 = 2

DE_BEST_1_JITTER = 3

DE_LOCAL_TO_BEST_1 = 4

# Functions to set up and control remote worker

def _update_solver(in_solver):

global solver

solver = in_solver

def _call_error_func(ind):

global solver

error = solver.error_func(solver.population[ind,:],*(solver.args))

return error

class DESolver(object):

"""

Genetic minimization based on Differential Evolution.

"""

def __init__(self, param_ranges, population_size, max_generations,

method = DE_RAND_1, args=None,

scale=0.8, crossover_prob=0.9,

goal_error=1e-3, polish=True, verbose=True,

use_pp=True, pp_depfuncs=None, pp_modules=None):

"""

"""

# set the internal vars

self.param_ranges = param_ranges

self.num_params = len(self.param_ranges)

self.population_size = population_size

self.max_generations = max_generations

self.method = method

#self.func = func

# prepend self to the args

if args is None:

args = ()

self.args = args

self.scale = scale

self.crossover_prob = crossover_prob

self.goal_error = goal_error

self.polish = polish

self.verbose = verbose

# set helper vars

self.rot_ind = numpy.arange(self.population_size)

# set status vars

self.generation = 0

# set up the population

# eventually we can allow for unbounded min/max values with None

self.population = numpy.hstack([numpy.random.uniform(p[0],p[1],

size=[self.population_size,1])

for p in param_ranges])

self.population_errors = numpy.empty(self.population_size)

# check for pp

if use_pp and not HAS_PP:

print "WARNING: PP was not found on your system, so no "\

"parallelization will be performed."

use_pp = False

if use_pp:

# auto-detects number of SMP CPU cores (will detect 1 core on

# single-CPU systems)

job_server = pp.Server()

if self.verbose:

print "Setting up %d pp_cpus" % (job_server.get_ncpus())

# set up lists of depfuncs and modules

depfuncs = []

if not pp_depfuncs is None:

depfuncs.extend(pp_depfuncs)

depfuncs = tuple(depfuncs)

modules = ['desolver']

if not pp_modules is None:

modules.extend(pp_modules)

modules = tuple(modules)

# give each worker a copy of this object and the required

# depfuncs and modules

for i in range(job_server.get_ncpus()):

job_server.submit(_update_solver,

args=(self,), depfuncs=depfuncs,

modules=modules)

job_server.wait()

else:

job_server = None

# the rest is now in a try block

# try/finally block is to ensure remote worker processes are

# killed if they were started

try:

# eval the initial population

self._eval_population(job_server)

# set the index of the best individual

best_ind = self.population_errors.argmin()

self.best_error = self.population_errors[best_ind]

self.best_individual = numpy.copy(self.population[best_ind,:])

self.best_generation = self.generation

if self.verbose:

print "Best generation: %g" % (self.best_generation)

print "Best Error: %g" % (self.best_error)

print "Best Indiv: " + str(self.best_individual)

print

# now solve

self._solve(job_server)

finally:

# destroy the server if it was started

if use_pp:

job_server.destroy()

def _eval_population(self, job_server=None):

"""

Evals the provided population, returning the errors from the

function.

"""

# see if use job_server

if self.verbose:

print "Generation: %d (%d)" % (self.generation,self.max_generations)

sys.stdout.write('Evaluating population (%d): ' % (self.population_size))

if not job_server:

# eval the function for the initial population

for i in xrange(self.population_size):

if self.verbose:

sys.stdout.write('%d ' % (i))

sys.stdout.flush()

#self.population_errors[i] = self.func(self.population[i,:],*(self.args))

self.population_errors[i] = self.error_func(self.population[i,:],*(self.args))

else:

# update the workers

for i in range(job_server.get_ncpus()):

job_server.submit(_update_solver, (self,), (), ())

job_server.wait()

# submit the functions to the job server

jobs = []

for i in xrange(self.population_size):

jobs.append(job_server.submit(_call_error_func, (i,), (), ()))

for i,job in enumerate(jobs):

if self.verbose:

sys.stdout.write('%d ' % (i))

sys.stdout.flush()

error = job()

self.population_errors[i] = error

#self.population_errors[i] = job()

if self.verbose:

sys.stdout.write('\n')

sys.stdout.flush()

def error_func(self, indiv, *args):

raise NotImplementedError

def _evolve_population(self):

"""

Evolove to new generation of population.

"""

# save the old population

self.old_population = self.population.copy()

self.old_population_errors = self.population_errors.copy()

# index pointers

rind = numpy.random.permutation(4)+1

# shuffle the locations of the individuals

ind1 = numpy.random.permutation(self.population_size)

pop1 = self.old_population[ind1,:]

# rotate for remaining indices

rot = numpy.remainder(self.rot_ind + rind[0], self.population_size)

ind2 = ind1[rot,:]

pop2 = self.old_population[ind2,:]

rot = numpy.remainder(self.rot_ind + rind[1], self.population_size)

ind3 = ind2[rot,:]

pop3 = self.old_population[ind3,:]

rot = numpy.remainder(self.rot_ind + rind[2], self.population_size)

ind4 = ind3[rot,:]

pop4 = self.old_population[ind4,:]

rot = numpy.remainder(self.rot_ind + rind[3], self.population_size)

ind5 = ind4[rot,:]

pop5 = self.old_population[ind5,:]

# population filled with best individual

best_population = self.best_individual[numpy.newaxis,:].repeat(self.population_size,axis=0)

# figure out the crossover ind

xold_ind = numpy.random.rand(self.population_size,self.num_params) >= \

self.crossover_prob

# get new population based on desired strategy

# DE/rand/1

if self.method == DE_RAND_1:

population = pop3 + self.scale*(pop1 - pop2)

population_orig = pop3

# DE/BEST/1

if self.method == DE_BEST_1:

population = best_population + self.scale*(pop1 - pop2)

population_orig = best_population

# DE/best/2

elif self.method == DE_BEST_2:

population = best_population + self.scale * \

(pop1 + pop2 - pop3 - pop4)

population_orig = best_population

# DE/BEST/1/JITTER

elif self.method == DE_BEST_1_JITTER:

population = best_population + (pop1 - pop2) * \

((1.0-0.9999) * \

numpy.random.rand(self.population_size,self.num_params) + \

self.scale)

population_orig = best_population

# DE/LOCAL_TO_BEST/1

elif self.method == DE_LOCAL_TO_BEST_1:

population = self.old_population + \

self.scale*(best_population - self.old_population) + \

self.scale*(pop1 - pop2)

population_orig = self.old_population

# crossover

population[xold_ind] = self.old_population[xold_ind]

# apply the boundary constraints

for p in xrange(self.num_params):

# get min and max

min_val = self.param_ranges[p][0]

max_val = self.param_ranges[p][1]

# find where exceeded max

ind = population[:,p] > max_val

if ind.sum() > 0:

# bounce back

population[ind,p] = max_val + \

numpy.random.rand(ind.sum())*\

(population_orig[ind,p]-max_val)

# find where below min

ind = population[:,p] < min_val

if ind.sum() > 0:

# bounce back

population[ind,p] = min_val + \

numpy.random.rand(ind.sum())*\

(population_orig[ind,p]-min_val)

# set the class members

self.population = population

self.population_orig = population

def _solve(self, job_server=None):

"""

Optimize the parameters of the function.

"""

# loop over generations

for g in xrange(1,self.max_generations):

# set the generation

self.generation = g

# update the population

self._evolve_population()

# evaluate the population

self._eval_population(job_server)

# decide what stays

ind = self.population_errors > self.old_population_errors

self.population[ind,:] = self.old_population[ind,:]

self.population_errors[ind] = self.old_population_errors[ind]

# set the index of the best individual

best_ind = self.population_errors.argmin()

# update what is best

if self.population_errors[best_ind] < self.best_error:

self.best_error = self.population_errors[best_ind]

self.best_individual = numpy.copy(self.population[best_ind,:])

self.best_generation = self.generation

if self.verbose:

print "Best generation: %g" % (self.best_generation)

print "Best Error: %g" % (self.best_error)

print "Best Indiv: " + str(self.best_individual)

print

# see if done

if self.best_error < self.goal_error:

break

# see if polish with fmin search after the first generation

if self.polish:

if self.verbose:

print "Polishing best result: %g" % (self.population_errors[best_ind])

iprint = 1

else:

iprint = -1

# polish with bounded min search

polished_individual, polished_error, details = \

scipy.optimize.fmin_l_bfgs_b(self.error_func,

self.population[best_ind,:],

args=self.args,

bounds=self.param_ranges,

approx_grad=True,

iprint=iprint)

if self.verbose:

print "Polished Result: %g" % (polished_error)

print "Polished Indiv: " + str(polished_individual)

if polished_error < self.population_errors[best_ind]:

# it's better, so keep it

self.population[best_ind,:] = polished_individual

self.population_errors[best_ind] = polished_error

# update what is best

self.best_error = self.population_errors[best_ind]

self.best_individual = numpy.copy(self.population[best_ind,:])

if job_server:

self.pp_stats = job_server.get_stats()

发表评论

You must be logged in to post a comment.

python差分进化算法_Python实现的差分进化算法源代码相关推荐

  1. python中什么是算法_python自动开发之(算法)第二十七天

    1.什么是算法? 算法(Algorithm):一个计算过程,解决问题的方法 2.复习:递归 递归的两个特点:(1) 调用自身 (2)结束条件 deffunc1(x):print(x) func1(x- ...

  2. python实现排序算法_python实现·十大排序算法之插入排序(Insertion Sort)

    简介 插入排序(Insertion Sort)是一种简单直观的排序算法.它的工作原理是:通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入. 算法实现步骤 从第一个元素开 ...

  3. python代码颜色不同_Python填充任意颜色,不同算法时间差异分析说明

    我就废话不多说了,大家还是直接看代码吧! import time import numpy as np import cv2 #方法一 start = time.time() for i in ran ...

  4. python 下标 遍历列表_python中的数据结构与算法(1):列表、元组与字符串

    列表是 python 中最常用.最重要的数据结构之一. 一般来说,我们用列表来管理一组数据对象,这些对象可能是同一类型,也可能是不同类型.列表不仅记录了这些数据对象,而且也记录了它们之间的一种顺序关系 ...

  5. python分类算法_python数据挖掘中的分类算法有哪些?

    一直以来,对于机器学习领域,Python都是人们津津乐道的话题,大家知道我们在用python学习机器技术时候,用到的方法内容和一般情况下的是一样的吗?想必,了解过的小伙伴一定是知道有哪些的,不知道的小 ...

  6. python sort 逆序_python实现·十大排序算法之插入排序(Insertion Sort)

    简介 插入排序(Insertion Sort)是一种简单直观的排序算法.它的工作原理是:通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入. 算法实现步骤 从第一个元素开 ...

  7. python数据分类聚类案例_Python实现的KMeans聚类算法实例分析

    本文实例讲述了Python实现的KMeans聚类算法.分享给大家供大家参考,具体如下: 菜鸟一枚,编程初学者,最近想使用Python3实现几个简单的机器学习分析方法,记录一下自己的学习过程. 关于KM ...

  8. python几种排序_Python实现几种排序算法

    Python实现几种排序算法 2018年11月12日 09:39 1.二分查找 import random def binary_search(li,target): start = 0 end = ...

  9. python分配红包程序_Python版微信红包分配算法

    Python版微信红包分配算法 发布于 2015-05-08 10:54:23 | 151 次阅读 | 评论: 0 | 来源: 网友投递 Python编程语言Python 是一种面向对象.解释型计算机 ...

最新文章

  1. Firefox下http协议监测工具LiveHttpHeaders
  2. c++ qt 编译问题求解答
  3. 配置Trunk实现相同Vlan通信
  4. wifi网络结构(上)
  5. CentOS6.5 gcc升级到4.8.2
  6. autoencoder自编码器原理以及在mnist数据集上的实现
  7. ios 构建版本一直在处理中_iOS构建版本不显示
  8. java 嵌套调用_Java嵌套类的使用
  9. 架构选型,究竟啥时候选redis?
  10. python基础 网络编程
  11. [CI、CD入门]maven打包可执行程序之微服务-服务提供者篇
  12. html+默认ie11,IE11浏览器怎么设置默认浏览器
  13. python is not defined
  14. python下载vip素材_Python素材下载爬虫,多线程rar下载爬虫
  15. 站在产业互联网爆发的风口,定推一折购跃上云端服务中小企业发展
  16. h5中返回上一页常见的问题
  17. 新手玩转Linux Kernel漏洞之Null Pointer Dereference
  18. 基于Qt的房贷计算器
  19. [Practical.Vim(2012.9)].Drew.Neil.Tip12学习摘要
  20. vue-router 路由传参

热门文章

  1. 2021年安全员-A证考试题库及安全员-A证试题及解析
  2. QT在使用protocal的问题解决记录之std :: string和QByteArray之间的无损转换
  3. Pixhawk ardupilot 手机地面站 DroidPlanner Tower
  4. 物价上涨对普通百姓的影响
  5. 由双引号与单引号引起基础知识总结
  6. IE6 IE7 IE8 IE9 IE10 Css hack及IE条件注释法
  7. 中国银行网银登陆无法输入密码的问题
  8. ByteBuffer中的flip()、clear()、compact()
  9. 协方差分析 | ANCOVA (Analysis of Covariance) | R代码
  10. 特斯拉(纯电动)在全球销售了131.4万辆占据世界第一